dm_mqtt.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. /*
  2. * Copyright (C) 2012-2019 UCloud. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #include "uiot_defs.h"
  16. #include "uiot_export_mqtt.h"
  17. #include "dm_config.h"
  18. #include "dm_internal.h"
  19. #include "lite-utils.h"
  20. DM_MQTT_CB_t g_dm_mqtt_cb[] = {
  21. {PROPERTY_RESTORE, PROPERTY_RESTORE_TOPIC_TEMPLATE, PROPERTY_RESTORE_REPLY_TOPIC_TEMPLATE, dm_mqtt_property_restore_cb},
  22. {PROPERTY_POST, PROPERTY_POST_TOPIC_TEMPLATE, PROPERTY_POST_REPLY_TOPIC_TEMPLATE, dm_mqtt_property_post_cb},
  23. {PROPERTY_SET, PROPERTY_SET_REPLY_TOPIC_TEMPLATE, PROPERTY_SET_TOPIC_TEMPLATE, dm_mqtt_property_set_cb},
  24. {PROPERTY_DESIRED_GET, PROPERTY_DESIRED_GET_TOPIC_TEMPLATE, PROPERTY_DESIRED_GET_REPLY_TOPIC_TEMPLATE, dm_mqtt_property_desired_get_cb},
  25. {PROPERTY_DESIRED_DELETE, PROPERTY_DESIRED_DELETE_TOPIC_TEMPLATE, PROPERTY_DESIRED_DELETE_REPLY_TOPIC_TEMPLATE, dm_mqtt_property_desired_delete_cb},
  26. {EVENT_POST, EVENT_POST_TOPIC_TEMPLATE, EVENT_POST_REPLY_TOPIC_TEMPLATE, dm_mqtt_event_post_cb},
  27. {COMMAND, COMMAND_REPLY_TOPIC_TEMPLATE, COMMAND_TOPIC_TEMPLATE, dm_mqtt_command_cb}
  28. };
  29. static int _dm_mqtt_gen_topic_name(char *buf, size_t buf_len, const char *topic_template, const char *product_sn,
  30. const char *device_sn) {
  31. FUNC_ENTRY;
  32. int ret;
  33. ret = HAL_Snprintf(buf, buf_len, topic_template, product_sn, device_sn);
  34. if (ret < 0 || ret >= buf_len) {
  35. LOG_ERROR("HAL_Snprintf failed");
  36. FUNC_EXIT_RC(FAILURE_RET);
  37. }
  38. FUNC_EXIT_RC(SUCCESS_RET);
  39. }
  40. static int _dm_mqtt_gen_property_payload(char *buf, size_t buf_len, DM_Type type, int request_id, const char *payload) {
  41. FUNC_ENTRY;
  42. int ret;
  43. switch (type) {
  44. case PROPERTY_RESTORE:
  45. ret = HAL_Snprintf(buf, buf_len, "{\"RequestID\": \"%d\"}", request_id);
  46. break;
  47. case PROPERTY_POST:
  48. ret = HAL_Snprintf(buf, buf_len, "{\"RequestID\": \"%d\", \"Property\": %s}", request_id, payload);
  49. break;
  50. case PROPERTY_DESIRED_GET:
  51. ret = HAL_Snprintf(buf, buf_len, "{\"RequestID\": \"%d\", \"Require\": %s}", request_id, payload);
  52. break;
  53. case PROPERTY_DESIRED_DELETE:
  54. ret = HAL_Snprintf(buf, buf_len, "{\"RequestID\": \"%d\", \"Delete\": %s}", request_id, payload);
  55. break;
  56. default: FUNC_EXIT_RC(FAILURE_RET);
  57. }
  58. if (ret < 0 || ret >= buf_len) {
  59. LOG_ERROR("generate property payload failed");
  60. FUNC_EXIT_RC(FAILURE_RET);
  61. }
  62. FUNC_EXIT_RC(SUCCESS_RET);
  63. }
  64. static int _dm_mqtt_gen_event_payload(char *buf, size_t buf_len, int request_id, const char *identifier, const char *payload) {
  65. FUNC_ENTRY;
  66. int ret;
  67. ret = HAL_Snprintf(buf, buf_len, "{\"RequestID\": \"%d\", \"Identifier\": \"%s\", \"Output\": %s}", request_id,
  68. identifier, payload);
  69. if (ret < 0 || ret >= buf_len) {
  70. LOG_ERROR("generate event payload failed");
  71. FUNC_EXIT_RC(FAILURE_RET);
  72. }
  73. FUNC_EXIT_RC(SUCCESS_RET);
  74. }
  75. static int _dm_mqtt_publish(DM_MQTT_Struct_t *handle, char *topic_name, int qos, const char *msg) {
  76. FUNC_ENTRY;
  77. int ret;
  78. PublishParams pub_params = DEFAULT_PUB_PARAMS;
  79. //暂不支持QOS2
  80. if (0 == qos) {
  81. pub_params.qos = QOS0;
  82. } else {
  83. pub_params.qos = QOS1;
  84. }
  85. pub_params.payload = (void *) msg;
  86. pub_params.payload_len = strlen(msg);
  87. ret = IOT_MQTT_Publish(handle->mqtt, topic_name, &pub_params);
  88. if (ret < 0) {
  89. LOG_ERROR("publish to topic: %s failed", topic_name);
  90. FUNC_EXIT_RC(FAILURE_RET);
  91. }
  92. FUNC_EXIT_RC(ret);
  93. }
  94. static void _dm_mqtt_common_reply_cb(MQTTMessage *message, CommonReplyCB cb) {
  95. FUNC_ENTRY;
  96. int8_t ret_code;
  97. char *request_id = NULL;
  98. char *ret_code_char = NULL;
  99. char *msg = NULL;
  100. if (NULL == (msg = HAL_Malloc(message->payload_len + 1))) {
  101. LOG_ERROR("allocate for message failed");
  102. FUNC_EXIT;
  103. }
  104. HAL_Snprintf(msg, message->payload_len + 1, "%s", message->payload);
  105. if (NULL == (ret_code_char = LITE_json_value_of((char *) "RetCode", msg))) {
  106. LOG_ERROR("allocate for ret_code_char failed");
  107. goto do_exit;
  108. }
  109. if (SUCCESS_RET != LITE_get_int8(&ret_code, ret_code_char)) {
  110. LOG_ERROR("get ret_code failed");
  111. goto do_exit;
  112. }
  113. if (NULL == (request_id = LITE_json_value_of((char *) "RequestID", msg))) {
  114. LOG_ERROR("allocate for request_id failed");
  115. goto do_exit;
  116. }
  117. cb(request_id, ret_code);
  118. do_exit:
  119. HAL_Free(msg);
  120. HAL_Free(request_id);
  121. HAL_Free(ret_code_char);
  122. FUNC_EXIT;
  123. }
  124. void dm_mqtt_property_restore_cb(void *pClient, MQTTMessage *message, void *pContext) {
  125. FUNC_ENTRY;
  126. LOG_DEBUG("topic=%s", message->topic);
  127. LOG_INFO("len=%u, topic_msg=%.*s", message->payload_len, message->payload_len, (char *)message->payload);
  128. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  129. if (NULL == handle->callbacks[PROPERTY_RESTORE]) {
  130. FUNC_EXIT;
  131. }
  132. PropertyRestoreCB cb;
  133. int8_t ret_code;
  134. char *request_id = NULL;
  135. char *ret_code_char = NULL;
  136. char *property = NULL;
  137. char *msg = NULL;
  138. if (NULL == (msg = HAL_Malloc(message->payload_len + 1))) {
  139. LOG_ERROR("allocate for message failed");
  140. FUNC_EXIT;
  141. }
  142. HAL_Snprintf(msg, message->payload_len + 1, "%s", message->payload);
  143. if (NULL == (ret_code_char = LITE_json_value_of((char *) "RetCode", msg))) {
  144. LOG_ERROR("allocate for ret_code_char failed");
  145. goto do_exit;
  146. }
  147. if (SUCCESS_RET != LITE_get_int8(&ret_code, ret_code_char)) {
  148. LOG_ERROR("get for ret_code failed");
  149. goto do_exit;
  150. }
  151. if (NULL == (request_id = LITE_json_value_of((char *) "RequestID", msg))) {
  152. LOG_ERROR("allocate for request_id failed");
  153. goto do_exit;
  154. }
  155. if (NULL == (property = LITE_json_value_of((char *) "Property", msg))) {
  156. LOG_ERROR("allocate for property failed");
  157. goto do_exit;
  158. }
  159. cb = (PropertyRestoreCB) handle->callbacks[PROPERTY_RESTORE];
  160. cb(request_id, ret_code, property);
  161. do_exit:
  162. HAL_Free(msg);
  163. HAL_Free(ret_code_char);
  164. HAL_Free(property);
  165. HAL_Free(request_id);
  166. FUNC_EXIT;
  167. }
  168. void dm_mqtt_property_post_cb(void *pClient, MQTTMessage *message, void *pContext) {
  169. FUNC_ENTRY;
  170. LOG_DEBUG("topic=%s", message->topic);
  171. LOG_INFO("len=%u, topic_msg=%.*s", message->payload_len, message->payload_len, (char *)message->payload);
  172. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  173. if (NULL == handle->callbacks[PROPERTY_POST]) {
  174. FUNC_EXIT;
  175. }
  176. _dm_mqtt_common_reply_cb(message, (CommonReplyCB) handle->callbacks[PROPERTY_POST]);
  177. FUNC_EXIT;
  178. }
  179. void dm_mqtt_property_set_cb(void *pClient, MQTTMessage *message, void *pContext) {
  180. FUNC_ENTRY;
  181. LOG_DEBUG("topic=%s", message->topic);
  182. LOG_INFO("len=%u, topic_msg=%.*s", message->payload_len, message->payload_len, (char *)message->payload);
  183. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  184. if (NULL == handle->callbacks[PROPERTY_SET]) {
  185. FUNC_EXIT;
  186. }
  187. int ret;
  188. PropertySetCB cb;
  189. int cb_ret;
  190. char *request_id = NULL;
  191. char *property = NULL;
  192. char *msg_reply = NULL;
  193. char *topic = NULL;
  194. char *msg = NULL;
  195. if (NULL == (msg = HAL_Malloc(message->payload_len + 1))) {
  196. LOG_ERROR("allocate for message failed");
  197. FUNC_EXIT;
  198. }
  199. HAL_Snprintf(msg, message->payload_len + 1, "%s", message->payload);
  200. if (NULL == (request_id = LITE_json_value_of((char *) "RequestID", msg))) {
  201. LOG_ERROR("allocate for request_id failed");
  202. goto do_exit;
  203. }
  204. if (NULL == (property = LITE_json_value_of((char *) "Property", msg))) {
  205. LOG_ERROR("allocate for property failed");
  206. goto do_exit;
  207. }
  208. cb = (PropertySetCB) handle->callbacks[PROPERTY_SET];
  209. cb_ret = cb(request_id, property);
  210. if (NULL == (msg_reply = HAL_Malloc(DM_MSG_REPLY_BUF_LEN))) {
  211. LOG_ERROR("allocate for msg_reply failed");
  212. goto do_exit;
  213. }
  214. if (NULL == (topic = HAL_Malloc(DM_TOPIC_BUF_LEN))) {
  215. LOG_ERROR("allocate for topic failed");
  216. goto do_exit;
  217. }
  218. if (SUCCESS_RET != _dm_mqtt_gen_topic_name(topic, DM_TOPIC_BUF_LEN, handle->upstream_topic_templates[PROPERTY_SET],
  219. handle->product_sn, handle->device_sn)) {
  220. LOG_ERROR("generate topic name failed");
  221. goto do_exit;
  222. }
  223. ret = HAL_Snprintf(msg_reply, DM_MSG_REPLY_BUF_LEN, "{\"RequestID\": \"%s\", \"RetCode\": %d}", request_id, cb_ret);
  224. if (ret < 0 || ret >= DM_MSG_REPLY_BUF_LEN) {
  225. LOG_ERROR("HAL_Snprintf msg_reply failed");
  226. goto do_exit;
  227. }
  228. ret = _dm_mqtt_publish(handle, topic, 1, msg_reply);
  229. if (ret < 0) {
  230. LOG_ERROR("mqtt publish msg failed");
  231. goto do_exit;
  232. }
  233. do_exit:
  234. HAL_Free(msg);
  235. HAL_Free(request_id);
  236. HAL_Free(property);
  237. HAL_Free(msg_reply);
  238. HAL_Free(topic);
  239. FUNC_EXIT;
  240. }
  241. void dm_mqtt_property_desired_get_cb(void *pClient, MQTTMessage *message, void *pContext) {
  242. FUNC_ENTRY;
  243. LOG_DEBUG("topic=%s", message->topic);
  244. LOG_INFO("len=%u, topic_msg=%.*s", message->payload_len, message->payload_len, (char *)message->payload);
  245. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  246. if (NULL == handle->callbacks[PROPERTY_DESIRED_GET]) {
  247. FUNC_EXIT;
  248. }
  249. PropertyDesiredGetCB cb;
  250. int8_t ret_code;
  251. char *request_id = NULL;
  252. char *ret_code_char = NULL;
  253. char *desired = NULL;
  254. char *msg = NULL;
  255. if (NULL == (msg = HAL_Malloc(message->payload_len + 1))) {
  256. LOG_ERROR("allocate for message failed");
  257. goto do_exit;
  258. }
  259. HAL_Snprintf(msg, message->payload_len + 1, "%s", message->payload);
  260. if (NULL == (ret_code_char = LITE_json_value_of((char *) "RetCode", msg))) {
  261. LOG_ERROR("allocate for ret_code_char failed");
  262. goto do_exit;
  263. }
  264. if (SUCCESS_RET != LITE_get_int8(&ret_code, ret_code_char)) {
  265. LOG_ERROR("get ret_code failed");
  266. goto do_exit;
  267. }
  268. if (NULL == (request_id = LITE_json_value_of((char *) "RequestID", msg))) {
  269. LOG_ERROR("allocate for request_id failed");
  270. goto do_exit;
  271. }
  272. if (NULL == (desired = LITE_json_value_of((char *) "Desired", msg))) {
  273. LOG_ERROR("allocate for desired failed");
  274. goto do_exit;
  275. }
  276. cb = (PropertyDesiredGetCB) handle->callbacks[PROPERTY_DESIRED_GET];
  277. cb(request_id, ret_code, desired);
  278. do_exit:
  279. HAL_Free(msg);
  280. HAL_Free(ret_code_char);
  281. HAL_Free(request_id);
  282. HAL_Free(desired);
  283. FUNC_EXIT;
  284. }
  285. void dm_mqtt_property_desired_delete_cb(void *pClient, MQTTMessage *message, void *pContext) {
  286. FUNC_ENTRY;
  287. LOG_DEBUG("topic=%s", message->topic);
  288. LOG_INFO("len=%u, topic_msg=%.*s", message->payload_len, message->payload_len, (char *)message->payload);
  289. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  290. if (NULL == handle->callbacks[PROPERTY_DESIRED_DELETE]) {
  291. FUNC_EXIT;
  292. }
  293. _dm_mqtt_common_reply_cb(message, (CommonReplyCB) handle->callbacks[PROPERTY_DESIRED_DELETE]);
  294. FUNC_EXIT;
  295. }
  296. void dm_mqtt_event_post_cb(void *pClient, MQTTMessage *message, void *pContext) {
  297. FUNC_ENTRY;
  298. LOG_DEBUG("topic=%s", message->topic);
  299. LOG_INFO("len=%u, topic_msg=%.*s", strlen(message->payload), message->payload_len, (char *)message->payload);
  300. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  301. if (NULL == handle->callbacks[EVENT_POST]) {
  302. FUNC_EXIT;
  303. }
  304. _dm_mqtt_common_reply_cb(message, (CommonReplyCB) handle->callbacks[EVENT_POST]);
  305. FUNC_EXIT;
  306. }
  307. void dm_mqtt_command_cb(void *pClient, MQTTMessage *message, void *pContext) {
  308. FUNC_ENTRY;
  309. LOG_DEBUG("topic=%s", message->topic);
  310. LOG_INFO("len=%u, topic_msg=%.*s", message->payload_len, message->payload_len, (char *)message->payload);
  311. DM_MQTT_Struct_t *handle = (DM_MQTT_Struct_t *) pContext;
  312. if (NULL == handle->callbacks[COMMAND]) {
  313. FUNC_EXIT;
  314. }
  315. CommandCB cb;
  316. int cb_ret;
  317. char *request_id = NULL;
  318. char *identifier = NULL;
  319. char *input = NULL;
  320. char *output = NULL;
  321. char *topic = NULL;
  322. char *cmd_reply = NULL;
  323. char *msg = NULL;
  324. if (NULL == (msg = HAL_Malloc(message->payload_len + 1))) {
  325. LOG_ERROR("allocate for message failed");
  326. FUNC_EXIT;
  327. }
  328. HAL_Snprintf(msg, message->payload_len + 1, "%s", message->payload);
  329. if (NULL == (request_id = LITE_json_value_of((char *) "RequestID", msg))) {
  330. LOG_ERROR("allocate for request_id failed");
  331. goto do_exit;
  332. }
  333. if (NULL == (identifier = LITE_json_value_of((char *) "Identifier", msg))) {
  334. LOG_ERROR("allocate for identifier failed");
  335. goto do_exit;
  336. }
  337. if (NULL == (input = LITE_json_value_of((char *) "Input", msg))) {
  338. LOG_ERROR("allocate for input failed");
  339. goto do_exit;
  340. }
  341. cb = (CommandCB) handle->callbacks[COMMAND];
  342. cb_ret = cb(request_id, identifier, input, &output);
  343. if (NULL == output) {
  344. LOG_ERROR("output error");
  345. goto do_exit;
  346. }
  347. if (NULL == (cmd_reply = HAL_Malloc(DM_CMD_REPLY_BUF_LEN))) {
  348. LOG_ERROR("allocate for cmd_reply failed");
  349. goto do_exit;
  350. }
  351. if (NULL == (topic = HAL_Malloc(DM_TOPIC_BUF_LEN))) {
  352. LOG_ERROR("allocate for topic failed");
  353. goto do_exit;
  354. }
  355. int ret = HAL_Snprintf(topic, DM_TOPIC_BUF_LEN, handle->upstream_topic_templates[COMMAND], handle->product_sn,
  356. handle->device_sn, request_id);
  357. if (ret < 0 || ret > DM_TOPIC_BUF_LEN) {
  358. LOG_ERROR("topic error");
  359. goto do_exit;
  360. }
  361. ret = HAL_Snprintf(cmd_reply, DM_CMD_REPLY_BUF_LEN,
  362. "{\"RequestID\": \"%s\", \"RetCode\": %d, \"Identifier\": \"%s\", \"Output\": %s}",
  363. request_id, cb_ret, identifier, output);
  364. if (ret < 0 || ret > DM_CMD_REPLY_BUF_LEN) {
  365. LOG_ERROR("generate cmd_reply msg failed");
  366. goto do_exit;
  367. }
  368. ret = _dm_mqtt_publish(handle, topic, 1, cmd_reply);
  369. if (ret < 0) {
  370. LOG_ERROR("mqtt publish msg failed");
  371. goto do_exit;
  372. }
  373. do_exit:
  374. HAL_Free(msg);
  375. HAL_Free(request_id);
  376. HAL_Free(identifier);
  377. HAL_Free(input);
  378. HAL_Free(output);
  379. HAL_Free(topic);
  380. HAL_Free(cmd_reply);
  381. FUNC_EXIT;
  382. }
  383. int _dsc_mqtt_register_callback(DM_MQTT_Struct_t *handle, DM_Type dm_type, void *callback) {
  384. FUNC_ENTRY;
  385. int ret;
  386. if (NULL == handle || callback == NULL) {
  387. LOG_ERROR("params error!");
  388. FUNC_EXIT_RC(FAILURE_RET);
  389. }
  390. handle->callbacks[dm_type] = callback;
  391. handle->upstream_topic_templates[dm_type] = g_dm_mqtt_cb[dm_type].upstream_topic_template;
  392. handle->downstream_topic_templates[dm_type] = g_dm_mqtt_cb[dm_type].downstream_topic_template;
  393. char topic[DM_TOPIC_BUF_LEN];
  394. ret = _dm_mqtt_gen_topic_name(topic, DM_TOPIC_BUF_LEN, handle->downstream_topic_templates[dm_type],
  395. handle->product_sn, handle->device_sn);
  396. if (ret < 0) {
  397. LOG_ERROR("generate topic name failed");
  398. FUNC_EXIT_RC(FAILURE_RET);
  399. }
  400. SubscribeParams sub_params = DEFAULT_SUB_PARAMS;
  401. sub_params.on_message_handler = g_dm_mqtt_cb[dm_type].callback;
  402. sub_params.qos = QOS1;
  403. sub_params.user_data = handle;
  404. ret = IOT_MQTT_Subscribe(handle->mqtt, topic, &sub_params);
  405. if (ret < 0) {
  406. LOG_ERROR("mqtt subscribe failed!");
  407. FUNC_EXIT_RC(FAILURE_RET);
  408. }
  409. FUNC_EXIT_RC(SUCCESS_RET);
  410. }
  411. DEFINE_DM_CALLBACK(PROPERTY_RESTORE, PropertyRestoreCB)
  412. DEFINE_DM_CALLBACK(PROPERTY_POST, CommonReplyCB)
  413. DEFINE_DM_CALLBACK(PROPERTY_SET, PropertySetCB)
  414. DEFINE_DM_CALLBACK(PROPERTY_DESIRED_GET, PropertyDesiredGetCB)
  415. DEFINE_DM_CALLBACK(PROPERTY_DESIRED_DELETE, CommonReplyCB)
  416. DEFINE_DM_CALLBACK(EVENT_POST, CommonReplyCB)
  417. DEFINE_DM_CALLBACK(COMMAND, CommandCB)
  418. void *dsc_init(const char *product_sn, const char *device_sn, void *channel, void *context) {
  419. FUNC_ENTRY;
  420. DM_MQTT_Struct_t *h_dsc = NULL;
  421. if (NULL == (h_dsc = HAL_Malloc(sizeof(DM_MQTT_Struct_t)))) {
  422. LOG_ERROR("allocate for h_dsc failed");
  423. FUNC_EXIT_RC(NULL);
  424. }
  425. memset(h_dsc, 0, sizeof(DM_MQTT_Struct_t));
  426. h_dsc->mqtt = channel;
  427. h_dsc->product_sn = product_sn;
  428. h_dsc->device_sn = device_sn;
  429. h_dsc->context = context;
  430. FUNC_EXIT_RC(h_dsc);
  431. }
  432. int dsc_deinit(void *handle) {
  433. FUNC_ENTRY;
  434. if (NULL != handle) {
  435. HAL_Free(handle);
  436. }
  437. FUNC_EXIT_RC(SUCCESS_RET);
  438. }
  439. int dm_mqtt_property_report_publish(DM_MQTT_Struct_t *handle, DM_Type type, int request_id, const char *payload) {
  440. FUNC_ENTRY;
  441. int ret = FAILURE_RET;
  442. char *msg_report = NULL;
  443. char *topic = NULL;
  444. if (NULL == (msg_report = HAL_Malloc(DM_MSG_REPORT_BUF_LEN))) {
  445. LOG_ERROR("allocate for msg_report failed");
  446. goto do_exit;
  447. }
  448. if (NULL == (topic = HAL_Malloc(DM_TOPIC_BUF_LEN))) {
  449. LOG_ERROR("allocate for topic failed");
  450. goto do_exit;
  451. }
  452. if (SUCCESS_RET != _dm_mqtt_gen_topic_name(topic, DM_TOPIC_BUF_LEN, handle->upstream_topic_templates[type],
  453. handle->product_sn, handle->device_sn)) {
  454. LOG_ERROR("generate topic failed");
  455. goto do_exit;
  456. }
  457. ret = _dm_mqtt_gen_property_payload(msg_report, DM_MSG_REPORT_BUF_LEN, type, request_id, payload);
  458. if (ret < 0) {
  459. LOG_ERROR("generate msg_report failed");
  460. ret = FAILURE_RET;
  461. goto do_exit;
  462. }
  463. ret = _dm_mqtt_publish(handle, topic, 1, msg_report);
  464. if (ret < 0) {
  465. LOG_ERROR("mqtt publish msg failed");
  466. }
  467. do_exit:
  468. HAL_Free(topic);
  469. HAL_Free(msg_report);
  470. FUNC_EXIT_RC(ret);
  471. }
  472. int dm_mqtt_event_publish(DM_MQTT_Struct_t *handle, int request_id, const char *identifier, const char *payload) {
  473. FUNC_ENTRY;
  474. int ret = FAILURE_RET;
  475. char *msg_report = NULL;
  476. char *topic = NULL;
  477. if (NULL == (msg_report = HAL_Malloc(DM_EVENT_POST_BUF_LEN))) {
  478. LOG_ERROR("allocate for msg_report failed");
  479. goto do_exit;
  480. }
  481. if (NULL == (topic = HAL_Malloc(DM_TOPIC_BUF_LEN))) {
  482. LOG_ERROR("allocate for topic failed");
  483. goto do_exit;
  484. }
  485. if (SUCCESS_RET != _dm_mqtt_gen_topic_name(topic, DM_TOPIC_BUF_LEN, handle->upstream_topic_templates[EVENT_POST],
  486. handle->product_sn, handle->device_sn)) {
  487. LOG_ERROR("generate topic failed");
  488. goto do_exit;
  489. }
  490. ret = _dm_mqtt_gen_event_payload(msg_report, DM_EVENT_POST_BUF_LEN, request_id, identifier, payload);
  491. if (ret < 0) {
  492. LOG_ERROR("generate msg_report failed");
  493. goto do_exit;
  494. }
  495. ret = _dm_mqtt_publish(handle, topic, 1, msg_report);
  496. if (ret < 0) {
  497. LOG_ERROR("mqtt publish msg failed");
  498. }
  499. do_exit:
  500. HAL_Free(topic);
  501. HAL_Free(msg_report);
  502. FUNC_EXIT_RC(ret);
  503. }