mqtt_test.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. /*
  2. * File : mqtt_test.c
  3. * COPYRIGHT (C) 2012-2018, Shanghai Real-Thread Technology Co., Ltd
  4. *
  5. * Change Logs:
  6. * Date Author Notes
  7. * 2018-03-12 armink the first version
  8. */
  9. #include <stdlib.h>
  10. #include <string.h>
  11. #include <stdint.h>
  12. #include <rtthread.h>
  13. #include "mqtt_client.h"
  14. #ifdef PKG_USING_MYMQTT_TEST
  15. /**
  16. * MQTT URI farmat:
  17. * domain mode
  18. * tcp://iot.eclipse.org:1883
  19. *
  20. * ipv4 mode
  21. * tcp://192.168.10.1:1883
  22. * ssl://192.168.10.1:1884
  23. *
  24. * ipv6 mode
  25. * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
  26. * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
  27. */
  28. #define MQTT_TEST_SERVER_URI "tcp://iot.eclipse.org:1883"
  29. #define MQTT_CLIENTID "rtthread-mqtt"
  30. #define MQTT_USERNAME "admin"
  31. #define MQTT_PASSWORD "admin"
  32. #define MQTT_SUBTOPIC "/mqtt/test"
  33. #define MQTT_PUBTOPIC "/mqtt/test"
  34. #define MQTT_WILLMSG "Goodbye!"
  35. #define MQTT_TEST_QOS 1
  36. #define MQTT_PUB_SUB_BUF_SIZE 1024
  37. #define CMD_INFO "'mqtt_test <start|stop>'"
  38. #define TEST_DATA_SIZE 256
  39. #define PUB_CYCLE_TM 1000
  40. static rt_thread_t pub_thread_tid = RT_NULL;
  41. static char *pub_data = RT_NULL;
  42. /* define MQTT client context */
  43. static MQTTClient client;
  44. static rt_uint32_t pub_count = 0, sub_count = 0;
  45. static int recon_count = -1;
  46. static int test_start_tm = 0;
  47. static int test_is_started = 0;
  48. static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
  49. {
  50. sub_count ++;
  51. return;
  52. }
  53. static void mqtt_connect_callback(MQTTClient *c)
  54. {
  55. return;
  56. }
  57. static void mqtt_online_callback(MQTTClient *c)
  58. {
  59. recon_count ++;
  60. return;
  61. }
  62. static void mqtt_offline_callback(MQTTClient *c)
  63. {
  64. return;
  65. }
  66. /**
  67. * This function create and config a mqtt client.
  68. *
  69. * @param void
  70. *
  71. * @return none
  72. */
  73. static void mq_start(void)
  74. {
  75. /* init condata param by using MQTTPacket_connectData_initializer */
  76. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  77. rt_memset(&client, 0, sizeof(MQTTClient));
  78. /* config MQTT context param */
  79. {
  80. client.uri = MQTT_TEST_SERVER_URI;
  81. /* config connect param */
  82. memcpy(&client.condata, &condata, sizeof(condata));
  83. client.condata.clientID.cstring = MQTT_CLIENTID;
  84. client.condata.keepAliveInterval = 60;
  85. client.condata.cleansession = 1;
  86. client.condata.username.cstring = MQTT_USERNAME;
  87. client.condata.password.cstring = MQTT_PASSWORD;
  88. /* config MQTT will param. */
  89. client.condata.willFlag = 1;
  90. client.condata.will.qos = MQTT_TEST_QOS;
  91. client.condata.will.retained = 0;
  92. client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
  93. client.condata.will.message.cstring = MQTT_WILLMSG;
  94. /* malloc buffer. */
  95. client.buf_size = client.readbuf_size = MQTT_PUB_SUB_BUF_SIZE;
  96. client.buf = rt_malloc(client.buf_size);
  97. client.readbuf = rt_malloc(client.readbuf_size);
  98. if (!(client.buf && client.readbuf))
  99. {
  100. rt_kprintf("no memory for MQTT client buffer!\n");
  101. goto _exit;
  102. }
  103. /* set event callback function */
  104. client.connect_callback = mqtt_connect_callback;
  105. client.online_callback = mqtt_online_callback;
  106. client.offline_callback = mqtt_offline_callback;
  107. /* set subscribe table and event callback */
  108. client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
  109. client.messageHandlers[0].callback = mqtt_sub_callback;
  110. client.messageHandlers[0].qos = MQTT_TEST_QOS;
  111. /* set default subscribe event callback */
  112. client.defaultMessageHandler = mqtt_sub_callback;
  113. }
  114. /* run mqtt client */
  115. paho_mqtt_start(&client);
  116. return;
  117. _exit:
  118. if (client.buf)
  119. {
  120. rt_free(client.buf);
  121. client.buf = RT_NULL;
  122. }
  123. if (client.readbuf)
  124. {
  125. rt_free(client.readbuf);
  126. client.readbuf = RT_NULL;
  127. }
  128. return;
  129. }
  130. static void show_test_info(void)
  131. {
  132. char temp[50] = {0};
  133. rt_kprintf("\r==== MQTT Stability test ====\n");
  134. rt_kprintf("Server: "MQTT_TEST_SERVER_URI"\n");
  135. rt_kprintf("QoS : %d\n", MQTT_TEST_QOS);
  136. rt_kprintf("Test duration(sec) : %d\n", time((time_t *)RT_NULL) - test_start_tm);
  137. rt_kprintf("Number of published packages : %d\n", pub_count);
  138. rt_kprintf("Number of subscribed packages : %d\n", sub_count);
  139. snprintf(temp, sizeof(temp), "Packet loss rate : %.2f%% \n", (float)((float)(pub_count - sub_count) * 100.0f / pub_count));
  140. rt_kprintf(temp);
  141. rt_kprintf("Number of reconnections : %d\n", recon_count);
  142. /* up the cursor 8 line */
  143. rt_kprintf("\033[8A");
  144. }
  145. static void thread_pub(void *parameter)
  146. {
  147. pub_data = rt_malloc(TEST_DATA_SIZE * sizeof(char));
  148. if (!pub_data)
  149. {
  150. rt_kprintf("no memory for pub_data\n");
  151. return;
  152. }
  153. rt_memset(pub_data, '*', TEST_DATA_SIZE * sizeof(char));
  154. test_start_tm = time((time_t *)RT_NULL);
  155. rt_kprintf("test start at '%d'\r\n", test_start_tm);
  156. while (1)
  157. {
  158. if (!paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, pub_data))
  159. {
  160. ++ pub_count;
  161. }
  162. rt_thread_delay(PUB_CYCLE_TM);
  163. show_test_info();
  164. }
  165. }
  166. static void mqtt_test_start(void)
  167. {
  168. if (test_is_started)
  169. {
  170. return;
  171. }
  172. mq_start();
  173. while (!client.isconnected)
  174. {
  175. rt_kprintf("Waiting for mqtt connection...\n");
  176. rt_thread_delay(1000);
  177. }
  178. pub_thread_tid = rt_thread_create("pub_thread", thread_pub, RT_NULL, 1024, 8, 100);
  179. if (pub_thread_tid != RT_NULL)
  180. {
  181. rt_thread_startup(pub_thread_tid);
  182. }
  183. test_is_started = 1;
  184. return;
  185. }
  186. static void mqtt_test_stop(void)
  187. {
  188. MQTTClient *local_client = &client;
  189. if (pub_thread_tid)
  190. {
  191. rt_thread_delete(pub_thread_tid);
  192. }
  193. if (pub_data)
  194. {
  195. rt_free(pub_data);
  196. pub_data = RT_NULL;
  197. }
  198. if (local_client)
  199. {
  200. paho_mqtt_stop(local_client);
  201. }
  202. /* up the cursor 1 line */
  203. rt_kprintf("\033[1A");
  204. show_test_info();
  205. /* down the cursor 10 line */
  206. rt_kprintf("\033[10B");
  207. pub_count = sub_count = recon_count = 0;
  208. test_is_started = 0;
  209. rt_kprintf("==== MQTT Stability test stop ====\n");
  210. }
  211. static void mqtt_test(uint8_t argc, char **argv)
  212. {
  213. if (argc >= 2)
  214. {
  215. if (!strcmp(argv[1], "start"))
  216. {
  217. mqtt_test_start();
  218. }
  219. else if (!strcmp(argv[1], "stop"))
  220. {
  221. mqtt_test_stop();
  222. }
  223. else
  224. {
  225. rt_kprintf("Please input "CMD_INFO"\n");
  226. }
  227. }
  228. else
  229. {
  230. rt_kprintf("Please input "CMD_INFO"\n");
  231. }
  232. }
  233. MSH_CMD_EXPORT(mqtt_test, MQTT test CMD_INFO);
  234. #endif /* PKG_USING_PAHOMQTT_TEST */