iotsharp_client.c 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <stdint.h>
  4. #include <rtthread.h>
  5. #define DBG_ENABLE
  6. #define DBG_SECTION_NAME "iotsharp"
  7. #define DBG_LEVEL DBG_LOG
  8. #define DBG_COLOR
  9. #include <rtdbg.h>
  10. #include "paho_mqtt.h"
  11. /**
  12. * MQTT URI farmat:
  13. * domain mode
  14. * tcp://iot.eclipse.org:1883
  15. *
  16. * ipv4 mode
  17. * tcp://192.168.10.1:1883
  18. * ssl://192.168.10.1:1884
  19. *
  20. * ipv6 mode
  21. * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
  22. * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
  23. */
  24. #define MQTT_URI "tcp://iot.eclipse.org:1883"
  25. #define MQTT_USERNAME "admin"
  26. #define MQTT_PASSWORD "admin"
  27. #define MQTT_SUBTOPIC "/mqtt/test"
  28. #define MQTT_PUBTOPIC "/mqtt/test"
  29. #define MQTT_WILLMSG "Goodbye!"
  30. /* define MQTT client context */
  31. static MQTTClient client;
  32. static int is_started = 0;
  33. static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
  34. {
  35. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  36. LOG_D("mqtt sub callback: %.*s %.*s",
  37. msg_data->topicName->lenstring.len,
  38. msg_data->topicName->lenstring.data,
  39. msg_data->message->payloadlen,
  40. (char *)msg_data->message->payload);
  41. }
  42. static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
  43. {
  44. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  45. LOG_D("mqtt sub default callback: %.*s %.*s",
  46. msg_data->topicName->lenstring.len,
  47. msg_data->topicName->lenstring.data,
  48. msg_data->message->payloadlen,
  49. (char *)msg_data->message->payload);
  50. }
  51. static void mqtt_connect_callback(MQTTClient *c)
  52. {
  53. LOG_D("inter mqtt_connect_callback!");
  54. }
  55. static void mqtt_online_callback(MQTTClient *c)
  56. {
  57. LOG_D("inter mqtt_online_callback!");
  58. }
  59. static void mqtt_offline_callback(MQTTClient *c)
  60. {
  61. LOG_D("inter mqtt_offline_callback!");
  62. }
  63. static int mqtt_start(int argc, char **argv)
  64. {
  65. /* init condata param by using MQTTPacket_connectData_initializer */
  66. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  67. static char cid[20] = { 0 };
  68. if (argc != 1)
  69. {
  70. rt_kprintf("mqtt_start --start a mqtt worker thread.\n");
  71. return -1;
  72. }
  73. if (is_started)
  74. {
  75. LOG_E("mqtt client is already connected.");
  76. return -1;
  77. }
  78. /* config MQTT context param */
  79. {
  80. client.isconnected = 0;
  81. client.uri = MQTT_URI;
  82. /* generate the random client ID */
  83. rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
  84. /* config connect param */
  85. memcpy(&client.condata, &condata, sizeof(condata));
  86. client.condata.clientID.cstring = cid;
  87. client.condata.keepAliveInterval = 30;
  88. client.condata.cleansession = 1;
  89. client.condata.username.cstring = MQTT_USERNAME;
  90. client.condata.password.cstring = MQTT_PASSWORD;
  91. /* config MQTT will param. */
  92. client.condata.willFlag = 1;
  93. client.condata.will.qos = 1;
  94. client.condata.will.retained = 0;
  95. client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
  96. client.condata.will.message.cstring = MQTT_WILLMSG;
  97. /* malloc buffer. */
  98. client.buf_size = client.readbuf_size = 1024;
  99. client.buf = rt_calloc(1, client.buf_size);
  100. client.readbuf = rt_calloc(1, client.readbuf_size);
  101. if (!(client.buf && client.readbuf))
  102. {
  103. LOG_E("no memory for MQTT client buffer!");
  104. return -1;
  105. }
  106. /* set event callback function */
  107. client.connect_callback = mqtt_connect_callback;
  108. client.online_callback = mqtt_online_callback;
  109. client.offline_callback = mqtt_offline_callback;
  110. /* set subscribe table and event callback */
  111. client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
  112. client.messageHandlers[0].callback = mqtt_sub_callback;
  113. client.messageHandlers[0].qos = QOS1;
  114. /* set default subscribe event callback */
  115. client.defaultMessageHandler = mqtt_sub_default_callback;
  116. }
  117. /* run mqtt client */
  118. paho_mqtt_start(&client);
  119. is_started = 1;
  120. return 0;
  121. }
  122. static int mqtt_stop(int argc, char **argv)
  123. {
  124. if (argc != 1)
  125. {
  126. rt_kprintf("mqtt_stop --stop mqtt worker thread and free mqtt client object.\n");
  127. }
  128. is_started = 0;
  129. return paho_mqtt_stop(&client);
  130. }
  131. static int mqtt_publish(int argc, char **argv)
  132. {
  133. if (is_started == 0)
  134. {
  135. LOG_E("mqtt client is not connected.");
  136. return -1;
  137. }
  138. if (argc == 2)
  139. {
  140. paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, argv[1]);
  141. }
  142. else if (argc == 3)
  143. {
  144. paho_mqtt_publish(&client, QOS1, argv[1], argv[2]);
  145. }
  146. else
  147. {
  148. rt_kprintf("mqtt_publish <topic> [message] --mqtt publish message to specified topic.\n");
  149. return -1;
  150. }
  151. return 0;
  152. }
  153. static void mqtt_new_sub_callback(MQTTClient *client, MessageData *msg_data)
  154. {
  155. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  156. LOG_D("mqtt new subscribe callback: %.*s %.*s",
  157. msg_data->topicName->lenstring.len,
  158. msg_data->topicName->lenstring.data,
  159. msg_data->message->payloadlen,
  160. (char *)msg_data->message->payload);
  161. }
  162. static int mqtt_subscribe(int argc, char **argv)
  163. {
  164. if (argc != 2)
  165. {
  166. rt_kprintf("mqtt_subscribe [topic] --send an mqtt subscribe packet and wait for suback before returning.\n");
  167. return -1;
  168. }
  169. if (is_started == 0)
  170. {
  171. LOG_E("mqtt client is not connected.");
  172. return -1;
  173. }
  174. return paho_mqtt_subscribe(&client, QOS1, argv[1], mqtt_new_sub_callback);
  175. }
  176. static int mqtt_unsubscribe(int argc, char **argv)
  177. {
  178. if (argc != 2)
  179. {
  180. rt_kprintf("mqtt_unsubscribe [topic] --send an mqtt unsubscribe packet and wait for suback before returning.\n");
  181. return -1;
  182. }
  183. if (is_started == 0)
  184. {
  185. LOG_E("mqtt client is not connected.");
  186. return -1;
  187. }
  188. return paho_mqtt_unsubscribe(&client, argv[1]);
  189. }
  190. #ifdef FINSH_USING_MSH
  191. MSH_CMD_EXPORT(mqtt_start, startup mqtt client);
  192. MSH_CMD_EXPORT(mqtt_stop, stop mqtt client);
  193. MSH_CMD_EXPORT(mqtt_publish, mqtt publish message to specified topic);
  194. MSH_CMD_EXPORT(mqtt_subscribe, mqtt subscribe topic);
  195. MSH_CMD_EXPORT(mqtt_unsubscribe, mqtt unsubscribe topic);
  196. #endif /* FINSH_USING_MSH */