gagent_mqtt.c 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. /*
  2. * File : gagent_mqtt.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2018, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2018-01-03 flyingcys first version
  23. */
  24. #include "gagent_def.h"
  25. #include "gagent_cloud.h"
  26. static MQTTClient gagent_mqtt;
  27. #ifdef PKG_GAGENT_CLOUD_DEBUG
  28. #define MQTT_RECV_DEBUG
  29. #define MQTT_SEND_DEBUG
  30. #endif
  31. static int mqtt_push_ota(cloud_st *cloud, char *packet)
  32. {
  33. return RT_EOK;
  34. }
  35. static int mqtt_trans_data(cloud_st *cloud, char *packet)
  36. {
  37. rt_uint16_t len, cmd;
  38. char *index, *kv;
  39. rt_uint16_t kv_len;
  40. rt_uint8_t length_len, action;
  41. action = 0;
  42. kv = 0;
  43. kv_len = 0;
  44. index = packet;
  45. len = gagent_parse_rem_len((const uint8_t *)index + 4);
  46. length_len = gagent_num_rem_len_bytes((const uint8_t *)index + 4);
  47. index += (HEAD_LEN + length_len);
  48. rt_memcpy(&cmd, index, 2);
  49. index += 2;
  50. cmd = ntohs(cmd);
  51. gagent_dbg("cmd:%x\n", cmd);
  52. if(cmd == 0x90)
  53. {
  54. action = *index ++;
  55. kv = index;
  56. kv_len = len - 4;
  57. cloud->sn = -1;
  58. }
  59. else if(cmd == 0x93)
  60. {
  61. memcpy(&cloud->sn, index, 4);
  62. index += 4;
  63. gagent_dbg("mqtt_sn:%d\n", cloud->sn);
  64. //
  65. action = *index ++;
  66. kv = index;
  67. kv_len = len - 8;
  68. }
  69. else
  70. return -RT_ERROR;
  71. return gagent_cloud_recv_packet(CMD_FROM_MQTT, action, (rt_uint8_t *)kv, kv_len);
  72. }
  73. static void gagent_mqtt_callback(MQTTClient *c, MessageData *msg_data)
  74. {
  75. extern cloud_st *cloud;
  76. char *one_packet;
  77. rt_uint32_t data_len;
  78. rt_uint32_t total_len;
  79. rt_uint16_t cmd;
  80. rt_uint8_t len_num;
  81. int rc = RT_EOK;
  82. #ifdef MQTT_RECV_DEBUG
  83. {
  84. size_t len;
  85. char *data = (char *)msg_data->message->payload;
  86. gagent_dbg("mqtt_callback topic_name: %d %s\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);
  87. rt_kprintf("mqtt recv_len:%d\n", msg_data->message->payloadlen);
  88. for(len = 0; len < msg_data->message->payloadlen; len ++)
  89. rt_kprintf("%02x ", data[len]);
  90. rt_kprintf("\r\n");
  91. }
  92. #endif
  93. one_packet = (char *)msg_data->message->payload;
  94. total_len = msg_data->message->payloadlen;
  95. while(gagent_get_one_packet(one_packet, (int *)&data_len, &len_num, total_len) == RT_EOK)
  96. {
  97. memcpy(&cmd, one_packet + 6, 2);
  98. cmd = ntohs(cmd);
  99. gagent_dbg("mqtt_cmd:%x\n", cmd);
  100. switch(cmd)
  101. {
  102. case 0x10: //log on/off
  103. break;
  104. case 0x90: //trans data
  105. case 0x93:
  106. rc = mqtt_trans_data(cloud, one_packet);
  107. break;
  108. case 0x210: //app number change
  109. break;
  110. case 0x211: //push ota
  111. rc = mqtt_push_ota(cloud, one_packet);
  112. break;
  113. default:
  114. break;
  115. }
  116. one_packet += (data_len + len_num + HEAD_LEN);
  117. total_len -= (data_len + len_num + HEAD_LEN);
  118. if(rc < RT_EOK)
  119. break;
  120. }
  121. }
  122. static void gagent_mqtt_connect_callback(MQTTClient *c)
  123. {
  124. gagent_dbg("%s\n", __FUNCTION__);
  125. }
  126. static void gagent_mqtt_online_callback(MQTTClient *c)
  127. {
  128. gagent_dbg("%s\n", __FUNCTION__);
  129. }
  130. static void gagent_mqtt_offline_callback(MQTTClient *c)
  131. {
  132. gagent_dbg("%s\n", __FUNCTION__);
  133. }
  134. static int gagent_mqtt_client_publish(const char *topic, const char *msg, size_t msg_len)
  135. {
  136. int rc = RT_EOK;
  137. MQTTMessage message;
  138. message.qos = QOS0;
  139. message.retained = 0;
  140. message.payload = (void *)msg;
  141. message.payloadlen = msg_len;
  142. rc = MQTTPublish(&gagent_mqtt, topic, &message);
  143. return rc;
  144. }
  145. int gagent_mqtt_send_packet(cloud_st *cloud, rt_uint8_t action, rt_uint8_t *buf, rt_uint16_t buf_len)
  146. {
  147. int rc = RT_EOK;
  148. char topic[128];
  149. memset(topic, 0, sizeof(topic));
  150. rt_snprintf(topic, sizeof(topic), "dev2app/%s", cloud->con->did);
  151. gagent_dbg("pub_topic:%s\n", topic);
  152. memset(cloud->send_buf, 0, sizeof(cloud->send_buf));
  153. cloud->send_len = 0;
  154. cloud->send_len = gagent_set_one_packet(cloud->send_buf, action, buf, buf_len);
  155. #ifdef MQTT_SEND_DEBUG
  156. {
  157. uint32_t i;
  158. rt_kprintf("mqtt send_len:%d\n", cloud->send_len);
  159. for(i = 0; i < cloud->send_len; i ++)
  160. {
  161. rt_kprintf("%02x ", cloud->send_buf[i]);
  162. }
  163. rt_kprintf("\r\n");
  164. }
  165. #endif
  166. rc = gagent_mqtt_client_publish(topic, cloud->send_buf, cloud->send_len);
  167. return rc;
  168. }
  169. int gagent_mqtt_init(cloud_st *cloud)
  170. {
  171. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  172. RT_ASSERT(cloud != RT_NULL);
  173. memset(&gagent_mqtt, 0, sizeof(gagent_mqtt));
  174. gagent_mqtt.host = cloud->mqtt_server;
  175. gagent_mqtt.port = cloud->mqtt_port;
  176. memcpy(&gagent_mqtt.condata, &data, sizeof(MQTTPacket_connectData));
  177. gagent_mqtt.condata.keepAliveInterval = 30;
  178. gagent_mqtt.condata.MQTTVersion = 3;
  179. gagent_mqtt.condata.cleansession = 1;
  180. gagent_mqtt.condata.clientID.cstring = cloud->con->did;
  181. gagent_mqtt.condata.username.cstring = cloud->con->did;
  182. gagent_mqtt.condata.password.cstring = cloud->con->passcode;
  183. gagent_mqtt.buf_size = gagent_mqtt.readbuf_size = 1024;
  184. gagent_mqtt.buf = malloc(gagent_mqtt.buf_size);
  185. gagent_mqtt.readbuf = malloc(gagent_mqtt.readbuf_size);
  186. if(!(gagent_mqtt.readbuf && gagent_mqtt.readbuf))
  187. {
  188. gagent_err("mqtt malloc failed!\n");
  189. return -RT_ENOMEM;
  190. }
  191. gagent_mqtt.connect_callback = gagent_mqtt_connect_callback;
  192. gagent_mqtt.online_callback = gagent_mqtt_online_callback;
  193. gagent_mqtt.offline_callback = gagent_mqtt_offline_callback;
  194. memset(cloud->sub_topic[0], 0, sizeof(cloud->sub_topic[0]));
  195. snprintf(cloud->sub_topic[0], 128, "ser2cli_res/%s", cloud->con->did);
  196. gagent_mqtt.messageHandlers[0].topicFilter = cloud->sub_topic[0];
  197. gagent_mqtt.messageHandlers[0].callback = gagent_mqtt_callback;
  198. memset(cloud->sub_topic[1], 0,sizeof(cloud->sub_topic[1]));
  199. snprintf(cloud->sub_topic[1], 128, "app2dev/%s/#", cloud->con->did);
  200. gagent_mqtt.messageHandlers[1].topicFilter = cloud->sub_topic[1];
  201. gagent_mqtt.messageHandlers[1].callback = gagent_mqtt_callback;
  202. gagent_mqtt.defaultMessageHandler = gagent_mqtt_callback;
  203. gagent_dbg("host:%s port:%d\n", gagent_mqtt.host, gagent_mqtt.port);
  204. gagent_dbg("clientID:%s username:%s password:%s\n",
  205. gagent_mqtt.condata.clientID.cstring,
  206. gagent_mqtt.condata.username.cstring,
  207. gagent_mqtt.condata.password.cstring);
  208. gagent_dbg("topic:%s\n", gagent_mqtt.messageHandlers[0].topicFilter);
  209. return paho_mqtt_start(&gagent_mqtt);
  210. }