mqtt_sample.c 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <stdint.h>
  4. #include <rtthread.h>
  5. #define DBG_ENABLE
  6. #define DBG_SECTION_NAME "mqtt.sample"
  7. #define DBG_LEVEL DBG_LOG
  8. #define DBG_COLOR
  9. #include <rtdbg.h>
  10. #include "mqtt_client.h"
  11. /**
  12. * MQTT URI farmat:
  13. * domain mode
  14. * tcp://iot.eclipse.org:1883
  15. *
  16. * ipv4 mode
  17. * tcp://192.168.10.1:1883
  18. * ssl://192.168.10.1:1884
  19. *
  20. * ipv6 mode
  21. * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
  22. * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
  23. */
  24. #define MQTT_URI "tcp://test.mosquitto.org:1883" //"tcp://mq.tongxinmao.com:18831"
  25. #define MQTT_SUBTOPIC "/mqtt/test"
  26. #define MQTT_PUBTOPIC "/mqtt/test"
  27. #define MQTT_WILLMSG "Goodbye!"
  28. /* define MQTT client context */
  29. static mqtt_client client;
  30. static int is_started = 0;
  31. static void mqtt_sub_callback(mqtt_client *c, message_data *msg_data)
  32. {
  33. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  34. LOG_D("mqtt sub callback: %.*s %.*s",
  35. msg_data->topic_name->lenstring.len,
  36. msg_data->topic_name->lenstring.data,
  37. msg_data->message->payloadlen,
  38. (char *)msg_data->message->payload);
  39. }
  40. static void mqtt_sub_default_callback(mqtt_client *c, message_data *msg_data)
  41. {
  42. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  43. LOG_D("mqtt sub default callback: %.*s %.*s",
  44. msg_data->topic_name->lenstring.len,
  45. msg_data->topic_name->lenstring.data,
  46. msg_data->message->payloadlen,
  47. (char *)msg_data->message->payload);
  48. }
  49. static void mqtt_connect_callback(mqtt_client *c)
  50. {
  51. LOG_D("inter mqtt_connect_callback!");
  52. }
  53. static void mqtt_online_callback(mqtt_client *c)
  54. {
  55. LOG_D("inter mqtt_online_callback!");
  56. }
  57. static void mqtt_offline_callback(mqtt_client *c)
  58. {
  59. LOG_D("inter mqtt_offline_callback!");
  60. }
  61. static int mqtt_start(int argc, char **argv)
  62. {
  63. /* init condata param by using MQTTPacket_connectData_initializer */
  64. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  65. static char cid[20] = { 0 };
  66. if (argc != 1)
  67. {
  68. rt_kprintf("mqtt_start --start a mqtt worker thread.\n");
  69. return -1;
  70. }
  71. if (is_started)
  72. {
  73. LOG_E("mqtt client is already connected.");
  74. return -1;
  75. }
  76. /* config MQTT context param */
  77. {
  78. client.isconnected = 0;
  79. client.uri = MQTT_URI;
  80. /* generate the random client ID */
  81. rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
  82. /* config connect param */
  83. memcpy(&client.condata, &condata, sizeof(condata));
  84. client.condata.clientID.cstring = cid;
  85. // client.condata.username.cstring = "huaning";
  86. // client.condata.password.cstring = "sjhn20140916";
  87. client.condata.keepAliveInterval = 30;
  88. client.condata.cleansession = 1;
  89. /* config MQTT will param. */
  90. client.condata.willFlag = 1;
  91. client.condata.will.qos = 1;
  92. client.condata.will.retained = 0;
  93. client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
  94. client.condata.will.message.cstring = MQTT_WILLMSG;
  95. /* malloc buffer. */
  96. client.buf_size = client.readbuf_size = 1024;
  97. client.buf = rt_calloc(1, client.buf_size);
  98. client.readbuf = rt_calloc(1, client.readbuf_size);
  99. if (!(client.buf && client.readbuf))
  100. {
  101. LOG_E("no memory for MQTT client buffer!");
  102. return -1;
  103. }
  104. /* set event callback function */
  105. client.connect_callback = mqtt_connect_callback;
  106. client.online_callback = mqtt_online_callback;
  107. client.offline_callback = mqtt_offline_callback;
  108. /* set subscribe table and event callback */
  109. client.message_handlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
  110. client.message_handlers[0].callback = mqtt_sub_callback;
  111. client.message_handlers[0].qos = QOS1;
  112. /* set default subscribe event callback */
  113. client.default_message_handlers = mqtt_sub_default_callback;
  114. }
  115. {
  116. int value;
  117. uint16_t u16Value;
  118. value = 5;
  119. paho_mqtt_control(&client, MQTT_CTRL_SET_CONN_TIMEO, &value);
  120. value = 5;
  121. paho_mqtt_control(&client, MQTT_CTRL_SET_MSG_TIMEO, &value);
  122. value = 5;
  123. paho_mqtt_control(&client, MQTT_CTRL_SET_RECONN_INTERVAL, &value);
  124. value = 30;
  125. paho_mqtt_control(&client, MQTT_CTRL_SET_KEEPALIVE_INTERVAL, &value);
  126. u16Value = 3;
  127. paho_mqtt_control(&client, MQTT_CTRL_SET_KEEPALIVE_COUNT, &u16Value);
  128. }
  129. /* run mqtt client */
  130. paho_mqtt_start(&client, 8196, 20);
  131. is_started = 1;
  132. return 0;
  133. }
  134. static int mqtt_stop(int argc, char **argv)
  135. {
  136. if (argc != 1)
  137. {
  138. rt_kprintf("mqtt_stop --stop mqtt worker thread and free mqtt client object.\n");
  139. }
  140. is_started = 0;
  141. return paho_mqtt_stop(&client);
  142. }
  143. static int mqtt_publish(int argc, char **argv)
  144. {
  145. if (is_started == 0)
  146. {
  147. LOG_E("mqtt client is not connected.");
  148. return -1;
  149. }
  150. if (argc == 2)
  151. {
  152. paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, argv[1], strlen(argv[1]));
  153. }
  154. else if (argc == 3)
  155. {
  156. paho_mqtt_publish(&client, QOS1, argv[1], argv[2],strlen(argv[2]));
  157. }
  158. else
  159. {
  160. rt_kprintf("mqtt_publish <topic> [message] --mqtt publish message to specified topic.\n");
  161. return -1;
  162. }
  163. return 0;
  164. }
  165. static void mqtt_new_sub_callback(mqtt_client *client, message_data *msg_data)
  166. {
  167. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  168. LOG_D("mqtt new subscribe callback: %.*s %.*s",
  169. msg_data->topic_name->lenstring.len,
  170. msg_data->topic_name->lenstring.data,
  171. msg_data->message->payloadlen,
  172. (char *)msg_data->message->payload);
  173. }
  174. static int mqtt_subscribe(int argc, char **argv)
  175. {
  176. if (argc != 2)
  177. {
  178. rt_kprintf("mqtt_subscribe [topic] --send an mqtt subscribe packet and wait for suback before returning.\n");
  179. return -1;
  180. }
  181. if (is_started == 0)
  182. {
  183. LOG_E("mqtt client is not connected.");
  184. return -1;
  185. }
  186. return paho_mqtt_subscribe(&client, QOS1, argv[1], mqtt_new_sub_callback);
  187. }
  188. static int mqtt_unsubscribe(int argc, char **argv)
  189. {
  190. if (argc != 2)
  191. {
  192. rt_kprintf("mqtt_unsubscribe [topic] --send an mqtt unsubscribe packet and wait for suback before returning.\n");
  193. return -1;
  194. }
  195. if (is_started == 0)
  196. {
  197. LOG_E("mqtt client is not connected.");
  198. return -1;
  199. }
  200. return paho_mqtt_unsubscribe(&client, argv[1]);
  201. }
  202. #ifdef FINSH_USING_MSH
  203. MSH_CMD_EXPORT(mqtt_start, startup mqtt client);
  204. MSH_CMD_EXPORT(mqtt_stop, stop mqtt client);
  205. MSH_CMD_EXPORT(mqtt_publish, mqtt publish message to specified topic);
  206. MSH_CMD_EXPORT(mqtt_subscribe, mqtt subscribe topic);
  207. MSH_CMD_EXPORT(mqtt_unsubscribe, mqtt unsubscribe topic);
  208. #endif /* FINSH_USING_MSH */