mqtt_sample.c 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <stdint.h>
  4. #include <rtthread.h>
  5. #define DBG_ENABLE
  6. #define DBG_SECTION_NAME "mqtt.sample"
  7. #define DBG_LEVEL DBG_LOG
  8. #define DBG_COLOR
  9. #include <rtdbg.h>
  10. #include "paho_mqtt.h"
  11. /**
  12. * MQTT URI farmat:
  13. * domain mode
  14. * tcp://iot.eclipse.org:1883
  15. *
  16. * ipv4 mode
  17. * tcp://192.168.10.1:1883
  18. * ssl://192.168.10.1:1884
  19. *
  20. * ipv6 mode
  21. * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
  22. * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
  23. */
  24. #define MQTT_URI "tcp://mq.tongxinmao.com:18831"
  25. #define MQTT_SUBTOPIC "/mqtt/test"
  26. #define MQTT_PUBTOPIC "/mqtt/test"
  27. #define MQTT_WILLMSG "Goodbye!"
  28. /* define MQTT client context */
  29. static MQTTClient client;
  30. static int is_started = 0;
  31. static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
  32. {
  33. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  34. LOG_D("mqtt sub callback: %.*s %.*s",
  35. msg_data->topicName->lenstring.len,
  36. msg_data->topicName->lenstring.data,
  37. msg_data->message->payloadlen,
  38. (char *)msg_data->message->payload);
  39. }
  40. static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
  41. {
  42. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  43. LOG_D("mqtt sub default callback: %.*s %.*s",
  44. msg_data->topicName->lenstring.len,
  45. msg_data->topicName->lenstring.data,
  46. msg_data->message->payloadlen,
  47. (char *)msg_data->message->payload);
  48. }
  49. static void mqtt_connect_callback(MQTTClient *c)
  50. {
  51. LOG_D("inter mqtt_connect_callback!");
  52. }
  53. static void mqtt_online_callback(MQTTClient *c)
  54. {
  55. LOG_D("inter mqtt_online_callback!");
  56. }
  57. static void mqtt_offline_callback(MQTTClient *c)
  58. {
  59. LOG_D("inter mqtt_offline_callback!");
  60. }
  61. static int mqtt_start(int argc, char **argv)
  62. {
  63. /* init condata param by using MQTTPacket_connectData_initializer */
  64. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  65. static char cid[20] = { 0 };
  66. if (argc != 1)
  67. {
  68. rt_kprintf("mqtt_start --start a mqtt worker thread.\n");
  69. return -1;
  70. }
  71. if (is_started)
  72. {
  73. LOG_E("mqtt client is already connected.");
  74. return -1;
  75. }
  76. /* config MQTT context param */
  77. {
  78. client.isconnected = 0;
  79. client.uri = MQTT_URI;
  80. /* generate the random client ID */
  81. rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
  82. /* config connect param */
  83. rt_memcpy(&client.condata, &condata, sizeof(condata));
  84. client.condata.clientID.cstring = cid;
  85. client.condata.keepAliveInterval = 30;
  86. client.condata.cleansession = 1;
  87. /* config MQTT will param. */
  88. client.condata.willFlag = 1;
  89. client.condata.will.qos = 1;
  90. client.condata.will.retained = 0;
  91. client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
  92. client.condata.will.message.cstring = MQTT_WILLMSG;
  93. /* rt_malloc buffer. */
  94. client.buf_size = client.readbuf_size = 1024;
  95. client.buf = rt_calloc(1, client.buf_size);
  96. client.readbuf = rt_calloc(1, client.readbuf_size);
  97. if (!(client.buf && client.readbuf))
  98. {
  99. LOG_E("no memory for MQTT client buffer!");
  100. return -1;
  101. }
  102. /* set event callback function */
  103. client.connect_callback = mqtt_connect_callback;
  104. client.online_callback = mqtt_online_callback;
  105. client.offline_callback = mqtt_offline_callback;
  106. /* set subscribe table and event callback */
  107. client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
  108. client.messageHandlers[0].callback = mqtt_sub_callback;
  109. client.messageHandlers[0].qos = QOS1;
  110. /* set default subscribe event callback */
  111. client.defaultMessageHandler = mqtt_sub_default_callback;
  112. }
  113. /* run mqtt client */
  114. paho_mqtt_start(&client);
  115. is_started = 1;
  116. return 0;
  117. }
  118. static int mqtt_stop(int argc, char **argv)
  119. {
  120. if (argc != 1)
  121. {
  122. rt_kprintf("mqtt_stop --stop mqtt worker thread and free mqtt client object.\n");
  123. }
  124. is_started = 0;
  125. return paho_mqtt_stop(&client);
  126. }
  127. static int mqtt_publish(int argc, char **argv)
  128. {
  129. if (is_started == 0)
  130. {
  131. LOG_E("mqtt client is not connected.");
  132. return -1;
  133. }
  134. if (argc == 2)
  135. {
  136. paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, argv[1]);
  137. }
  138. else if (argc == 3)
  139. {
  140. paho_mqtt_publish(&client, QOS1, argv[1], argv[2]);
  141. }
  142. else
  143. {
  144. rt_kprintf("mqtt_publish <topic> [message] --mqtt publish message to specified topic.\n");
  145. return -1;
  146. }
  147. return 0;
  148. }
  149. static void mqtt_new_sub_callback(MQTTClient *client, MessageData *msg_data)
  150. {
  151. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  152. LOG_D("mqtt new subscribe callback: %.*s %.*s",
  153. msg_data->topicName->lenstring.len,
  154. msg_data->topicName->lenstring.data,
  155. msg_data->message->payloadlen,
  156. (char *)msg_data->message->payload);
  157. }
  158. static int mqtt_subscribe(int argc, char **argv)
  159. {
  160. if (argc != 2)
  161. {
  162. rt_kprintf("mqtt_subscribe [topic] --send an mqtt subscribe packet and wait for suback before returning.\n");
  163. return -1;
  164. }
  165. if (is_started == 0)
  166. {
  167. LOG_E("mqtt client is not connected.");
  168. return -1;
  169. }
  170. return paho_mqtt_subscribe(&client, QOS1, argv[1], mqtt_new_sub_callback);
  171. }
  172. static int mqtt_unsubscribe(int argc, char **argv)
  173. {
  174. if (argc != 2)
  175. {
  176. rt_kprintf("mqtt_unsubscribe [topic] --send an mqtt unsubscribe packet and wait for suback before returning.\n");
  177. return -1;
  178. }
  179. if (is_started == 0)
  180. {
  181. LOG_E("mqtt client is not connected.");
  182. return -1;
  183. }
  184. return paho_mqtt_unsubscribe(&client, argv[1]);
  185. }
  186. #ifdef FINSH_USING_MSH
  187. MSH_CMD_EXPORT(mqtt_start, startup mqtt client);
  188. MSH_CMD_EXPORT(mqtt_stop, stop mqtt client);
  189. MSH_CMD_EXPORT(mqtt_publish, mqtt publish message to specified topic);
  190. MSH_CMD_EXPORT(mqtt_subscribe, mqtt subscribe topic);
  191. MSH_CMD_EXPORT(mqtt_unsubscribe, mqtt unsubscribe topic);
  192. #endif /* FINSH_USING_MSH */