mqtt_sample.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. /*
  2. * Copyright (C) 2012-2019 UCloud. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #include <stdio.h>
  16. #include <stdlib.h>
  17. #include <limits.h>
  18. #include <stdbool.h>
  19. #include <string.h>
  20. #include <signal.h>
  21. #include "uiot_export.h"
  22. #include "uiot_import.h"
  23. #define MAX_SIZE_OF_TOPIC_CONTENT 100
  24. static int sg_count = 0;
  25. static int sg_sub_packet_id = -1;
  26. static int running_state = 0;
  27. void event_handler(void *pClient, void *handle_context, MQTTEventMsg *msg) {
  28. MQTTMessage* mqtt_message = (MQTTMessage*)msg->msg;
  29. uintptr_t packet_id = (uintptr_t)msg->msg;
  30. switch(msg->event_type) {
  31. case MQTT_EVENT_UNDEF:
  32. HAL_Printf("undefined event occur.\n");
  33. break;
  34. case MQTT_EVENT_DISCONNECT:
  35. HAL_Printf("MQTT disconnect.\n");
  36. break;
  37. case MQTT_EVENT_RECONNECT:
  38. HAL_Printf("MQTT reconnect.\n");
  39. break;
  40. case MQTT_EVENT_PUBLISH_RECEIVED:
  41. HAL_Printf("topic message arrived but without any related handle: topic=%.*s, topic_msg=%.*s\n",
  42. mqtt_message->topic_len,
  43. mqtt_message->topic,
  44. mqtt_message->payload_len,
  45. mqtt_message->payload);
  46. break;
  47. case MQTT_EVENT_SUBSCRIBE_SUCCESS:
  48. HAL_Printf("subscribe success, packet-id=%u\n", (unsigned int)packet_id);
  49. sg_sub_packet_id = packet_id;
  50. break;
  51. case MQTT_EVENT_SUBSCRIBE_TIMEOUT:
  52. HAL_Printf("subscribe wait ack timeout, packet-id=%u\n", (unsigned int)packet_id);
  53. sg_sub_packet_id = packet_id;
  54. break;
  55. case MQTT_EVENT_SUBSCRIBE_NACK:
  56. HAL_Printf("subscribe nack, packet-id=%u\n", (unsigned int)packet_id);
  57. sg_sub_packet_id = packet_id;
  58. break;
  59. case MQTT_EVENT_UNSUBSCRIBE_SUCCESS:
  60. HAL_Printf("unsubscribe success, packet-id=%u\n", (unsigned int)packet_id);
  61. break;
  62. case MQTT_EVENT_UNSUBSCRIBE_TIMEOUT:
  63. HAL_Printf("unsubscribe timeout, packet-id=%u\n", (unsigned int)packet_id);
  64. break;
  65. case MQTT_EVENT_UNSUBSCRIBE_NACK:
  66. HAL_Printf("unsubscribe nack, packet-id=%u\n", (unsigned int)packet_id);
  67. break;
  68. case MQTT_EVENT_PUBLISH_SUCCESS:
  69. HAL_Printf("publish success, packet-id=%u\n", (unsigned int)packet_id);
  70. break;
  71. case MQTT_EVENT_PUBLISH_TIMEOUT:
  72. HAL_Printf("publish timeout, packet-id=%u\n", (unsigned int)packet_id);
  73. break;
  74. case MQTT_EVENT_PUBLISH_NACK:
  75. HAL_Printf("publish nack, packet-id=%u\n", (unsigned int)packet_id);
  76. break;
  77. default:
  78. HAL_Printf("Should NOT arrive here.\n");
  79. break;
  80. }
  81. }
  82. /**
  83. * MQTT消息接收处理函数
  84. *
  85. * @param topicName topic主题
  86. * @param topicNameLen topic长度
  87. * @param message 已订阅消息的结构
  88. * @param userData 消息负载
  89. */
  90. static void on_message_callback(void *pClient, MQTTMessage *message, void *userData) {
  91. if (message == NULL) {
  92. return;
  93. }
  94. HAL_Printf("Receive Message With topicName:%.*s, payload:%.*s\n",
  95. (int) message->topic_len, message->topic, (int) message->payload_len, (char *) message->payload);
  96. }
  97. /**
  98. * 设置MQTT connet初始化参数
  99. *
  100. * @param initParams MQTT connet初始化参数
  101. *
  102. * @return 0: 参数初始化成功 非0: 失败
  103. */
  104. static int _setup_connect_init_params(MQTTInitParams* initParams)
  105. {
  106. initParams->device_sn = PKG_USING_UCLOUD_IOT_SDK_DEVICE_SN;
  107. initParams->product_sn = PKG_USING_UCLOUD_IOT_SDK_PRODUCT_SN;
  108. initParams->device_secret = PKG_USING_UCLOUD_IOT_SDK_DEVICE_SECRET;
  109. initParams->command_timeout = UIOT_MQTT_COMMAND_TIMEOUT;
  110. initParams->keep_alive_interval = UIOT_MQTT_KEEP_ALIVE_INTERNAL;
  111. initParams->auto_connect_enable = 1;
  112. initParams->event_handler.h_fp = event_handler;
  113. initParams->event_handler.context = NULL;
  114. return SUCCESS_RET;
  115. }
  116. /**
  117. * 发送topic消息
  118. *
  119. */
  120. static int _publish_msg(void *client)
  121. {
  122. char topicName[128] = {0};
  123. HAL_Snprintf(topicName, 128, "/%s/%s/upload", PKG_USING_UCLOUD_IOT_SDK_PRODUCT_SN, PKG_USING_UCLOUD_IOT_SDK_DEVICE_SN);
  124. PublishParams pub_params = DEFAULT_PUB_PARAMS;
  125. pub_params.qos = QOS1;
  126. char topic_content[MAX_SIZE_OF_TOPIC_CONTENT + 1] = {0};
  127. int size = HAL_Snprintf(topic_content, sizeof(topic_content), "{\"test\": \"%d\"}", sg_count++);
  128. if (size < 0 || size > sizeof(topic_content) - 1)
  129. {
  130. HAL_Printf("payload content length not enough! content size:%d buf size:%d\n", size, (int)sizeof(topic_content));
  131. return -3;
  132. }
  133. pub_params.payload = topic_content;
  134. pub_params.payload_len = strlen(topic_content);
  135. return IOT_MQTT_Publish(client, topicName, &pub_params);
  136. }
  137. /**
  138. * 订阅关注topic和注册相应回调处理
  139. *
  140. */
  141. static int _register_subscribe_topics(void *client)
  142. {
  143. static char topic_name[128] = {0};
  144. int size = HAL_Snprintf(topic_name, sizeof(topic_name), "/%s/%s/set", PKG_USING_UCLOUD_IOT_SDK_PRODUCT_SN, PKG_USING_UCLOUD_IOT_SDK_DEVICE_SN);
  145. if (size < 0 || size > sizeof(topic_name) - 1)
  146. {
  147. HAL_Printf("topic content length not enough! content size:%d buf size:%d\n", size, (int)sizeof(topic_name));
  148. return FAILURE_RET;
  149. }
  150. SubscribeParams sub_params = DEFAULT_SUB_PARAMS;
  151. sub_params.on_message_handler = on_message_callback;
  152. return IOT_MQTT_Subscribe(client, topic_name, &sub_params);
  153. }
  154. static void mqtt_test_thread(void) {
  155. int rc;
  156. //init connection
  157. MQTTInitParams init_params = DEFAULT_MQTT_INIT_PARAMS;
  158. rc = _setup_connect_init_params(&init_params);
  159. if (rc != SUCCESS_RET) {
  160. return;
  161. }
  162. void *client = IOT_MQTT_Construct(&init_params);
  163. if (client != NULL) {
  164. HAL_Printf("Cloud Device Construct Success");
  165. } else {
  166. HAL_Printf("Cloud Device Construct Failed");
  167. return;
  168. }
  169. //register subscribe topics here
  170. rc = _register_subscribe_topics(client);
  171. if (rc < 0) {
  172. HAL_Printf("Client Subscribe Topic Failed: %d", rc);
  173. return;
  174. }
  175. rc = IOT_MQTT_Yield(client, 200);
  176. do {
  177. // 等待订阅结果
  178. if (sg_sub_packet_id > 0) {
  179. for(int loop = 0; loop < 10; loop++)
  180. {
  181. rc = _publish_msg(client);
  182. if (rc < 0) {
  183. HAL_Printf("client publish topic failed :%d.", rc);
  184. }
  185. rc = IOT_MQTT_Yield(client, 200);
  186. }
  187. }
  188. } while (sg_sub_packet_id < 0);
  189. IOT_MQTT_Destroy(&client);
  190. return;
  191. }
  192. static int mqtt_test_example(int argc, char **argv)
  193. {
  194. rt_thread_t tid;
  195. int stack_size = 8192;
  196. if (2 == argc)
  197. {
  198. if (!strcmp("start", argv[1]))
  199. {
  200. if (1 == running_state)
  201. {
  202. HAL_Printf("mqtt_test_example is already running\n");
  203. return 0;
  204. }
  205. }
  206. else if (!strcmp("stop", argv[1]))
  207. {
  208. if (0 == running_state)
  209. {
  210. HAL_Printf("mqtt_test_example is already stopped\n");
  211. return 0;
  212. }
  213. running_state = 0;
  214. return 0;
  215. }
  216. else
  217. {
  218. HAL_Printf("Usage: mqtt_test_example start/stop");
  219. return 0;
  220. }
  221. }
  222. else
  223. {
  224. HAL_Printf("Para err, usage: mqtt_test_example start/stop");
  225. return 0;
  226. }
  227. tid = rt_thread_create("mqtt_test", (void (*)(void *))mqtt_test_thread,
  228. NULL, stack_size, RT_THREAD_PRIORITY_MAX / 2 - 1, 100);
  229. if (tid != RT_NULL)
  230. {
  231. rt_thread_startup(tid);
  232. running_state = 1;
  233. }
  234. return 0;
  235. }
  236. #ifdef RT_USING_FINSH
  237. #include <finsh.h>
  238. FINSH_FUNCTION_EXPORT(mqtt_test_example, startup mqtt basic example);
  239. #endif
  240. #ifdef FINSH_USING_MSH
  241. MSH_CMD_EXPORT(mqtt_test_example, startup mqtt basic example);
  242. #endif