mqtt_multi_thread-example.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*
  2. * Copyright (c) 2014-2016 Alibaba Group. All rights reserved.
  3. * License-Identifier: Apache-2.0
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. * not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #ifdef _PLATFORM_IS_LINUX_
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <string.h>
  22. #include <stdarg.h>
  23. #include "iot_import.h"
  24. #include "iot_export.h"
  25. #include <pthread.h>
  26. #include <assert.h>
  27. #define PRODUCT_KEY "yfTuLfBJTiL"
  28. #define DEVICE_NAME "TestDeviceForDemo"
  29. #define DEVICE_SECRET "fSCl9Ns5YPnYN8Ocg0VEel1kXFnRlV6c"
  30. /* These are pre-defined topics */
  31. #define TOPIC_UPDATE "/"PRODUCT_KEY"/"DEVICE_NAME"/update"
  32. #define TOPIC_ERROR "/"PRODUCT_KEY"/"DEVICE_NAME"/update/error"
  33. #define TOPIC_GET "/"PRODUCT_KEY"/"DEVICE_NAME"/get"
  34. #define TOPIC_DATA "/"PRODUCT_KEY"/"DEVICE_NAME"/data"
  35. #define MQTT_MSGLEN (1024)
  36. char g_product_key[PRODUCT_KEY_LEN + 1];
  37. char g_product_secret[PRODUCT_SECRET_LEN + 1];
  38. char g_device_name[DEVICE_NAME_LEN + 1];
  39. char g_device_secret[DEVICE_SECRET_LEN + 1];
  40. #define EXAMPLE_TRACE(fmt, ...) \
  41. do { \
  42. HAL_Printf("%s|%03d :: ", __func__, __LINE__); \
  43. HAL_Printf(fmt, ##__VA_ARGS__); \
  44. HAL_Printf("%s", "\r\n"); \
  45. } while(0)
  46. void event_handle(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
  47. {
  48. uintptr_t packet_id = (uintptr_t)msg->msg;
  49. iotx_mqtt_topic_info_pt topic_info = (iotx_mqtt_topic_info_pt)msg->msg;
  50. switch (msg->event_type) {
  51. case IOTX_MQTT_EVENT_UNDEF:
  52. EXAMPLE_TRACE("undefined event occur.");
  53. break;
  54. case IOTX_MQTT_EVENT_DISCONNECT:
  55. EXAMPLE_TRACE("MQTT disconnect.");
  56. break;
  57. case IOTX_MQTT_EVENT_RECONNECT:
  58. EXAMPLE_TRACE("MQTT reconnect.");
  59. break;
  60. case IOTX_MQTT_EVENT_SUBCRIBE_SUCCESS:
  61. EXAMPLE_TRACE("subscribe success, packet-id=%u", (unsigned int)packet_id);
  62. break;
  63. case IOTX_MQTT_EVENT_SUBCRIBE_TIMEOUT:
  64. EXAMPLE_TRACE("subscribe wait ack timeout, packet-id=%u", (unsigned int)packet_id);
  65. break;
  66. case IOTX_MQTT_EVENT_SUBCRIBE_NACK:
  67. EXAMPLE_TRACE("subscribe nack, packet-id=%u", (unsigned int)packet_id);
  68. break;
  69. case IOTX_MQTT_EVENT_UNSUBCRIBE_SUCCESS:
  70. EXAMPLE_TRACE("unsubscribe success, packet-id=%u", (unsigned int)packet_id);
  71. break;
  72. case IOTX_MQTT_EVENT_UNSUBCRIBE_TIMEOUT:
  73. EXAMPLE_TRACE("unsubscribe timeout, packet-id=%u", (unsigned int)packet_id);
  74. break;
  75. case IOTX_MQTT_EVENT_UNSUBCRIBE_NACK:
  76. EXAMPLE_TRACE("unsubscribe nack, packet-id=%u", (unsigned int)packet_id);
  77. break;
  78. case IOTX_MQTT_EVENT_PUBLISH_SUCCESS:
  79. EXAMPLE_TRACE("publish success, packet-id=%u", (unsigned int)packet_id);
  80. break;
  81. case IOTX_MQTT_EVENT_PUBLISH_TIMEOUT:
  82. EXAMPLE_TRACE("publish timeout, packet-id=%u", (unsigned int)packet_id);
  83. break;
  84. case IOTX_MQTT_EVENT_PUBLISH_NACK:
  85. EXAMPLE_TRACE("publish nack, packet-id=%u", (unsigned int)packet_id);
  86. break;
  87. case IOTX_MQTT_EVENT_PUBLISH_RECVEIVED:
  88. EXAMPLE_TRACE("topic message arrived but without any related handle: topic=%.*s, topic_msg=%.*s",
  89. topic_info->topic_len,
  90. topic_info->ptopic,
  91. topic_info->payload_len,
  92. topic_info->payload);
  93. break;
  94. default:
  95. EXAMPLE_TRACE("Should NOT arrive here.");
  96. break;
  97. }
  98. }
  99. static void _demo_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
  100. {
  101. iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
  102. /* print topic name and topic message */
  103. EXAMPLE_TRACE("----");
  104. EXAMPLE_TRACE("Topic: '%.*s' (Length: %d)",
  105. ptopic_info->topic_len,
  106. ptopic_info->ptopic,
  107. ptopic_info->topic_len);
  108. EXAMPLE_TRACE("Payload: '%.*s' (Length: %d)",
  109. ptopic_info->payload_len,
  110. ptopic_info->payload,
  111. ptopic_info->payload_len);
  112. EXAMPLE_TRACE("----");
  113. }
  114. void* thread_subscribe1(void *pclient)
  115. {
  116. int ret = -1;
  117. int cnt = 400;
  118. while(--cnt) {
  119. HAL_SleepMs(100);
  120. ret = IOT_MQTT_Subscribe(pclient, TOPIC_DATA, IOTX_MQTT_QOS1, _demo_message_arrive, NULL);
  121. if (ret < 0) {
  122. printf("subscribe error");
  123. return NULL;
  124. }
  125. HAL_SleepMs(20);
  126. ret = IOT_MQTT_Unsubscribe(pclient, TOPIC_GET);
  127. if (ret < 0) {
  128. printf("subscribe error");
  129. return NULL;
  130. }
  131. HAL_SleepMs(100);
  132. }
  133. return NULL;
  134. }
  135. void* thread_subscribe2(void *pclient)
  136. {
  137. int ret = -1;
  138. int cnt = 400;
  139. while(--cnt) {
  140. HAL_SleepMs(300);
  141. ret = IOT_MQTT_Unsubscribe(pclient, TOPIC_DATA);
  142. if (ret < 0) {
  143. printf("subscribe error");
  144. return NULL;
  145. }
  146. HAL_SleepMs(30);
  147. ret = IOT_MQTT_Subscribe(pclient, TOPIC_GET, IOTX_MQTT_QOS1, _demo_message_arrive, NULL);
  148. if (ret < 0) {
  149. printf("subscribe error");
  150. return NULL;
  151. }
  152. HAL_SleepMs(30);
  153. }
  154. return NULL;
  155. }
  156. // 多线程subscribe
  157. void CASE2(void * pclient)
  158. {
  159. int ret = -1;
  160. pthread_t pid1;
  161. pthread_t pid2;
  162. if (pclient == NULL) {
  163. printf("param error");
  164. return;
  165. }
  166. ret = pthread_create(&pid1, NULL, thread_subscribe1, (void*)pclient);
  167. if (ret != 0) {
  168. printf("pthread_create failed!\n");
  169. return;
  170. }
  171. ret = pthread_create(&pid2, NULL, thread_subscribe2, (void*)pclient);
  172. if (ret != 0) {
  173. printf("pthread_create failed!\n");
  174. return;
  175. }
  176. }
  177. void* thread_publish1(void *pclient)
  178. {
  179. int cnt = 400;
  180. int ret = -1;
  181. char msg_pub[MQTT_MSGLEN] = {0};
  182. iotx_mqtt_topic_info_t topic_msg;
  183. strcpy(msg_pub, "11111 message: hello! start!");
  184. topic_msg.qos = IOTX_MQTT_QOS1;
  185. topic_msg.retain = 0; topic_msg.dup = 0;
  186. topic_msg.payload = (void *)msg_pub;
  187. topic_msg.payload_len = strlen(msg_pub);
  188. while(--cnt) {
  189. ret = IOT_MQTT_Publish(pclient, TOPIC_DATA, &topic_msg);
  190. printf("thread<%d>:ret = %d\n", (int)pthread_self(), ret);
  191. HAL_SleepMs(300);
  192. }
  193. return NULL;
  194. }
  195. void* thread_publish2(void *pclient)
  196. {
  197. int cnt = 600;
  198. int ret = -1;
  199. char msg_pub[MQTT_MSGLEN] = {0};
  200. iotx_mqtt_topic_info_t topic_msg;
  201. strcpy(msg_pub, "22222 message: hello! start!");
  202. topic_msg.qos = IOTX_MQTT_QOS1;
  203. topic_msg.retain = 0; topic_msg.dup = 0;
  204. topic_msg.payload = (void *)msg_pub;
  205. topic_msg.payload_len = strlen(msg_pub);
  206. while(--cnt) {
  207. ret = IOT_MQTT_Publish(pclient, TOPIC_DATA, &topic_msg);
  208. printf("thread<%d>:ret = %d\n", (int)pthread_self(), ret);
  209. HAL_SleepMs(200);
  210. }
  211. return NULL;
  212. }
  213. // 多线程publish
  214. void CASE1(void * pclient)
  215. {
  216. int ret = -1;
  217. pthread_t pid1;
  218. pthread_t pid2;
  219. if (pclient == NULL) {
  220. printf("param error");
  221. return;
  222. }
  223. ret = IOT_MQTT_Subscribe(pclient, TOPIC_DATA, IOTX_MQTT_QOS1, _demo_message_arrive, NULL);
  224. if (ret < 0) {
  225. printf("subscribe error");
  226. return;
  227. }
  228. ret = pthread_create(&pid1, NULL, thread_publish1, (void*)pclient);
  229. if (ret != 0) {
  230. printf("pthread_create failed!\n");
  231. return;
  232. }
  233. ret = pthread_create(&pid2, NULL, thread_publish2, (void*)pclient);
  234. if (ret != 0) {
  235. printf("pthread_create failed!\n");
  236. return;
  237. }
  238. }
  239. // yield thread
  240. static int yield_exit = 0;
  241. void *thread_yield(void *pclient)
  242. {
  243. while(yield_exit == 0) {
  244. IOT_MQTT_Yield(pclient, 200);
  245. HAL_SleepMs(200);
  246. }
  247. return NULL;
  248. }
  249. int mqtt_client(void)
  250. {
  251. int rc = 0;//, msg_len, cnt = 0;
  252. void *pclient;
  253. iotx_conn_info_pt pconn_info;
  254. iotx_mqtt_param_t mqtt_params;
  255. char *msg_buf = NULL, *msg_readbuf = NULL;
  256. pthread_t pid1;
  257. if (NULL == (msg_buf = (char *)HAL_Malloc(MQTT_MSGLEN))) {
  258. EXAMPLE_TRACE("not enough memory");
  259. rc = -1;
  260. goto do_exit;
  261. }
  262. if (NULL == (msg_readbuf = (char *)HAL_Malloc(MQTT_MSGLEN))) {
  263. EXAMPLE_TRACE("not enough memory");
  264. rc = -1;
  265. goto do_exit;
  266. }
  267. /**< get device info*/
  268. HAL_GetProductKey(g_product_key);
  269. HAL_GetDeviceName(g_device_name);
  270. HAL_GetDeviceSecret(g_device_secret);
  271. /**< end*/
  272. /* Device AUTH */
  273. if (0 != IOT_SetupConnInfo(g_product_key, g_device_name, g_device_secret, (void **)&pconn_info)) {
  274. EXAMPLE_TRACE("AUTH request failed!");
  275. rc = -1;
  276. goto do_exit;
  277. }
  278. /* Initialize MQTT parameter */
  279. memset(&mqtt_params, 0x0, sizeof(mqtt_params));
  280. mqtt_params.port = pconn_info->port;
  281. mqtt_params.host = pconn_info->host_name;
  282. mqtt_params.client_id = pconn_info->client_id;
  283. mqtt_params.username = pconn_info->username;
  284. mqtt_params.password = pconn_info->password;
  285. mqtt_params.pub_key = pconn_info->pub_key;
  286. mqtt_params.request_timeout_ms = 2000;
  287. mqtt_params.clean_session = 0;
  288. mqtt_params.keepalive_interval_ms = 60000;
  289. mqtt_params.pread_buf = msg_readbuf;
  290. mqtt_params.read_buf_size = MQTT_MSGLEN;
  291. mqtt_params.pwrite_buf = msg_buf;
  292. mqtt_params.write_buf_size = MQTT_MSGLEN;
  293. mqtt_params.handle_event.h_fp = event_handle;
  294. mqtt_params.handle_event.pcontext = NULL;
  295. /* Construct a MQTT client with specify parameter */
  296. pclient = IOT_MQTT_Construct(&mqtt_params);
  297. if (NULL == pclient) {
  298. EXAMPLE_TRACE("MQTT construct failed");
  299. rc = -1;
  300. goto do_exit;
  301. }
  302. EXAMPLE_TRACE("TEST CASE");
  303. pthread_create(&pid1, NULL, thread_yield, (void*)pclient);
  304. // mutli thread publish
  305. CASE1(pclient);
  306. // mutli thread subscribe
  307. CASE2(pclient);
  308. HAL_SleepMs(100000);
  309. IOT_MQTT_Unsubscribe(pclient, TOPIC_DATA);
  310. IOT_MQTT_Unsubscribe(pclient, TOPIC_GET);
  311. HAL_SleepMs(200);
  312. yield_exit = 1;
  313. HAL_SleepMs(200);
  314. IOT_MQTT_Destroy(&pclient);
  315. do_exit:
  316. if (NULL != msg_buf) {
  317. HAL_Free(msg_buf);
  318. }
  319. if (NULL != msg_readbuf) {
  320. HAL_Free(msg_readbuf);
  321. }
  322. return rc;
  323. }
  324. int main(int argc, char **argv)
  325. {
  326. IOT_OpenLog("mqtt");
  327. IOT_SetLogLevel(IOT_LOG_DEBUG);
  328. /**< set device info*/
  329. HAL_SetProductKey(PRODUCT_KEY);
  330. HAL_SetDeviceName(DEVICE_NAME);
  331. HAL_SetDeviceSecret(DEVICE_SECRET);
  332. /**< end*/
  333. mqtt_client();
  334. IOT_DumpMemoryStats(IOT_LOG_DEBUG);
  335. IOT_CloseLog();
  336. EXAMPLE_TRACE("out of sample!");
  337. return 0;
  338. }
  339. #else
  340. int main(int argc, char **argv)
  341. {
  342. /*EXAMPLE_TRACE("not support, pleasae support multi-thread first!!!");
  343. EXAMPLE_TRACE("Linux is support, please build in ubuntu.x86");*/
  344. return 0;
  345. }
  346. #endif