iotsharp_client.c 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <stdint.h>
  4. #include <rtthread.h>
  5. #define DBG_ENABLE
  6. #define DBG_SECTION_NAME "iotsharp"
  7. #define DBG_LEVEL DBG_LOG
  8. #define DBG_COLOR
  9. #include <rtdbg.h>
  10. #include "paho_mqtt.h"
  11. #define MQTT_URI PKG_USING_IOTSHARP_DEVICE_SERVER
  12. #define MQTT_USERNAME PKG_USING_IOTSHARP_DEVICE_NAME
  13. #define MQTT_PASSWORD PKG_USING_IOTSHARP_DEVICE_SECRET
  14. #define MQTT_WILLMSG "Goodbye!"
  15. /* define MQTT client context */
  16. static MQTTClient client;
  17. static int is_started = 0;
  18. void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
  19. {
  20. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  21. LOG_D("mqtt sub callback: %.*s %.*s",
  22. msg_data->topicName->lenstring.len,
  23. msg_data->topicName->lenstring.data,
  24. msg_data->message->payloadlen,
  25. (char *)msg_data->message->payload);
  26. }
  27. void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
  28. {
  29. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  30. LOG_D("mqtt sub default callback: %.*s %.*s",
  31. msg_data->topicName->lenstring.len,
  32. msg_data->topicName->lenstring.data,
  33. msg_data->message->payloadlen,
  34. (char *)msg_data->message->payload);
  35. }
  36. static void mqtt_connect_callback(MQTTClient *c)
  37. {
  38. LOG_D("inter mqtt_connect_callback!");
  39. }
  40. static void mqtt_new_sub_callback(MQTTClient *client, MessageData *msg_data)
  41. {
  42. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  43. LOG_D("mqtt new subscribe callback: %.*s %.*s",
  44. msg_data->topicName->lenstring.len,
  45. msg_data->topicName->lenstring.data,
  46. msg_data->message->payloadlen,
  47. (char *)msg_data->message->payload);
  48. }
  49. static void mqtt_online_callback(MQTTClient *c)
  50. {
  51. LOG_D("inter mqtt_online_callback!");
  52. char _rpc_topic[200] = { 0 };
  53. char _attup_topic[200] = { 0 };
  54. sprintf(_rpc_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/rpc/request/+/+");
  55. sprintf(_attup_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/attributes/update/");
  56. paho_mqtt_subscribe(&client, QOS1, _rpc_topic, mqtt_new_sub_callback);
  57. paho_mqtt_subscribe(&client, QOS1, _rpc_topic, mqtt_new_sub_callback);
  58. }
  59. static void mqtt_offline_callback(MQTTClient *c)
  60. {
  61. LOG_D("inter mqtt_offline_callback!");
  62. char _rpc_topic[200] = { 0 };
  63. char _attup_topic[200] = { 0 };
  64. sprintf(_rpc_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/rpc/request/+/+");
  65. sprintf(_attup_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/attributes/update/");
  66. paho_mqtt_unsubscribe(&client, _rpc_topic);
  67. paho_mqtt_unsubscribe(&client, _rpc_topic);
  68. }
  69. int iotsharp_start(void)
  70. {
  71. /* init condata param by using MQTTPacket_connectData_initializer */
  72. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  73. static char cid[20] = { 0 };
  74. if (is_started)
  75. {
  76. LOG_E("mqtt client is already connected.");
  77. return -1;
  78. }
  79. /* config MQTT context param */
  80. {
  81. client.isconnected = 0;
  82. client.uri = MQTT_URI;
  83. /* generate the random client ID */
  84. rt_snprintf(cid, sizeof(cid), "iotsharp%d", rt_tick_get());
  85. /* config connect param */
  86. memcpy(&client.condata, &condata, sizeof(condata));
  87. client.condata.clientID.cstring = cid;
  88. client.condata.keepAliveInterval = 30;
  89. client.condata.cleansession = 1;
  90. client.condata.username.cstring = MQTT_USERNAME;
  91. client.condata.password.cstring = MQTT_PASSWORD;
  92. /* config MQTT will param. */
  93. client.condata.willFlag = 1;
  94. client.condata.will.qos = 1;
  95. client.condata.will.retained = 0;
  96. client.condata.will.topicName.cstring = "/device/me/disconnect";
  97. client.condata.will.message.cstring = MQTT_WILLMSG;
  98. /* malloc buffer. */
  99. client.buf_size = client.readbuf_size = 1024;
  100. client.buf = rt_calloc(1, client.buf_size);
  101. client.readbuf = rt_calloc(1, client.readbuf_size);
  102. if (!(client.buf && client.readbuf))
  103. {
  104. LOG_E("no memory for MQTT client buffer!");
  105. return -1;
  106. }
  107. /* set event callback function */
  108. client.connect_callback = mqtt_connect_callback;
  109. client.online_callback = mqtt_online_callback;
  110. client.offline_callback = mqtt_offline_callback;
  111. /* set subscribe table and event callback */
  112. // client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
  113. //client.messageHandlers[0].callback = mqtt_sub_callback;
  114. // client.messageHandlers[0].qos = QOS1;
  115. /* set default subscribe event callback */
  116. client.defaultMessageHandler = mqtt_sub_default_callback;
  117. }
  118. /* run mqtt client */
  119. paho_mqtt_start(&client);
  120. is_started = 1;
  121. return 0;
  122. }
  123. int iotsahrp_stop(void)
  124. {
  125. is_started = 0;
  126. return paho_mqtt_stop(&client);
  127. }
  128. static int mqtt_publish_for_gateway(char* subdevicename, int datatype, char *playload)
  129. {
  130. char _telemetry_topic[200] = { 0 };
  131. char attributes_topic[200] = { 0 };
  132. sprintf(_telemetry_topic, "devices/%s/telemetry",subdevicename);
  133. sprintf(attributes_topic, "devices/%s/attributes",subdevicename);
  134. if (is_started == 0)
  135. {
  136. LOG_E("mqtt client is not connected.");
  137. return -1;
  138. }
  139. if (datatype == 1)
  140. {
  141. paho_mqtt_publish(&client, QOS1, _telemetry_topic, playload);
  142. }
  143. else if (datatype== 2)
  144. {
  145. paho_mqtt_publish(&client, QOS1, attributes_topic, playload);
  146. }
  147. else
  148. {
  149. rt_kprintf("publish message to specified datatype.\n");
  150. return -1;
  151. }
  152. return 0;
  153. }
  154. int iotsharp_upload_telemetry_for_gateway(char* _devname,char* playload)
  155. {
  156. return mqtt_publish_for_gateway(_devname,1,playload);
  157. }
  158. int iotsharp_upload_telemetry_to_device(char* playload)
  159. {
  160. return mqtt_publish_for_gateway("me",1,playload);
  161. }
  162. int iotsharp_upload_attribute_for_gateway(char* _devname,char* playload)
  163. {
  164. return mqtt_publish_for_gateway(_devname,2,playload);
  165. }
  166. int iotsharp_upload_attribute_to_device(char* playload)
  167. {
  168. return mqtt_publish_for_gateway("me",2,playload);
  169. }
  170. #ifdef FINSH_USING_MSH
  171. #endif /* FINSH_USING_MSH */