gagent_mqtt.c 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /*
  2. * File : gagent_mqtt.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2018, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2018-01-03 flyingcys first version
  23. */
  24. #include "gagent_def.h"
  25. #include "gagent_cloud.h"
  26. static MQTTClient gagent_mqtt;
  27. #if (PKG_GAGENT_CLOUD_DEBUG == 1)
  28. #define MQTT_RECV_DEBUG
  29. #define MQTT_SEND_DEBUG
  30. #endif
  31. static int mqtt_push_ota(cloud_st *cloud, char *packet)
  32. {
  33. return RT_EOK;
  34. }
  35. static int mqtt_trans_data(cloud_st *cloud, char *packet)
  36. {
  37. uint16_t len, cmd;
  38. char *index, *kv;
  39. uint16_t kv_len;
  40. uint8_t length_len, action;
  41. action = 0;
  42. kv = 0;
  43. kv_len = 0;
  44. index = packet;
  45. len = gagent_parse_rem_len((const uint8_t *)index + 4);
  46. length_len = gagent_num_rem_len_bytes((const uint8_t *)index + 4);
  47. index += (HEAD_LEN + length_len);
  48. rt_memcpy(&cmd, index, 2);
  49. index += 2;
  50. cmd = ntohs(cmd);
  51. gagent_dbg("cmd:%x\n", cmd);
  52. if(cmd == 0x90)
  53. {
  54. action = *index ++;
  55. kv = index;
  56. kv_len = len - 4;
  57. cloud->sn = -1;
  58. }
  59. else if(cmd == 0x93)
  60. {
  61. memcpy(&cloud->sn, index, 4);
  62. index += 4;
  63. gagent_dbg("mqtt_sn:%d\n", cloud->sn);
  64. //
  65. action = *index ++;
  66. kv = index;
  67. kv_len = len - 8;
  68. }
  69. else
  70. return -RT_ERROR;
  71. return gagent_cloud_recv_packet(CMD_FROM_MQTT, action, (rt_uint8_t *)kv, kv_len);
  72. }
  73. static void gagent_mqtt_callback(MQTTClient *c, MessageData *msg_data)
  74. {
  75. extern cloud_st *cloud;
  76. char *one_packet;
  77. rt_uint32_t data_len;
  78. rt_uint32_t total_len;
  79. rt_uint16_t cmd;
  80. rt_uint8_t len_num;
  81. int rc = RT_EOK;
  82. #ifdef MQTT_RECV_DEBUG
  83. {
  84. size_t len;
  85. char *data = (char *)msg_data->message->payload;
  86. gagent_dbg("mqtt_callback topic_name: %d %s\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);
  87. rt_kprintf("mqtt recv_len:%d\n", msg_data->message->payloadlen);
  88. for(len = 0; len < msg_data->message->payloadlen; len ++)
  89. {
  90. rt_kprintf("%02x ", data[len]);
  91. }
  92. rt_kprintf("\r\n");
  93. }
  94. #endif
  95. one_packet = (char *)msg_data->message->payload;
  96. total_len = msg_data->message->payloadlen;
  97. while(gagent_get_one_packet(one_packet, (int *)&data_len, &len_num, total_len) == RT_EOK)
  98. {
  99. memcpy(&cmd, one_packet + 6, 2);
  100. cmd = ntohs(cmd);
  101. gagent_dbg("mqtt_cmd:%x\n", cmd);
  102. switch(cmd)
  103. {
  104. case 0x10: //log on/off
  105. break;
  106. case 0x90: //trans data
  107. case 0x93:
  108. rc = mqtt_trans_data(cloud, one_packet);
  109. break;
  110. case 0x210: //app number change
  111. break;
  112. case 0x211: //push ota
  113. rc = mqtt_push_ota(cloud, one_packet);
  114. break;
  115. default:
  116. break;
  117. }
  118. one_packet += (data_len + len_num + HEAD_LEN);
  119. total_len -= (data_len + len_num + HEAD_LEN);
  120. if(rc < RT_EOK)
  121. break;
  122. }
  123. }
  124. static void gagent_mqtt_connect_callback(MQTTClient *c)
  125. {
  126. gagent_dbg("%s\n", __FUNCTION__);
  127. }
  128. static void gagent_mqtt_online_callback(MQTTClient *c)
  129. {
  130. gagent_dbg("%s\n", __FUNCTION__);
  131. }
  132. static void gagent_mqtt_offline_callback(MQTTClient *c)
  133. {
  134. gagent_dbg("%s\n", __FUNCTION__);
  135. }
  136. static int gagent_mqtt_client_publish(const char *topic, const char *msg, size_t msg_len)
  137. {
  138. int rc = RT_EOK;
  139. MQTTMessage message;
  140. message.qos = QOS0;
  141. message.retained = 0;
  142. message.payload = (void *)msg;
  143. message.payloadlen = msg_len;
  144. rc = MQTTPublish(&gagent_mqtt, topic, &message);
  145. return rc;
  146. }
  147. int gagent_mqtt_send_packet(cloud_st *cloud, rt_uint8_t action, rt_uint8_t *buf, rt_uint16_t buf_len)
  148. {
  149. int rc = RT_EOK;
  150. char topic[128];
  151. memset(topic, 0, sizeof(topic));
  152. rt_snprintf(topic, sizeof(topic), "dev2app/%s", cloud->con->did);
  153. gagent_dbg("pub_topic:%s\n", topic);
  154. memset(cloud->send_buf, 0, sizeof(cloud->send_buf));
  155. cloud->send_len = 0;
  156. cloud->send_len = gagent_set_one_packet(cloud->send_buf, action, buf, buf_len);
  157. #ifdef MQTT_SEND_DEBUG
  158. {
  159. uint32_t i;
  160. rt_kprintf("mqtt send_len:%d\n", cloud->send_len);
  161. for(i = 0; i < cloud->send_len; i ++)
  162. {
  163. rt_kprintf("%02x ", cloud->send_buf[i]);
  164. }
  165. rt_kprintf("\r\n");
  166. }
  167. #endif
  168. rc = gagent_mqtt_client_publish(topic, cloud->send_buf, cloud->send_len);
  169. return rc;
  170. }
  171. int gagent_mqtt_init(cloud_st *cloud)
  172. {
  173. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  174. RT_ASSERT(cloud != RT_NULL);
  175. memset(&gagent_mqtt, 0, sizeof(gagent_mqtt));
  176. gagent_mqtt.host = cloud->mqtt_server;
  177. gagent_mqtt.port = cloud->mqtt_port;
  178. memcpy(&gagent_mqtt.condata, &data, sizeof(MQTTPacket_connectData));
  179. gagent_mqtt.condata.keepAliveInterval = 30;
  180. gagent_mqtt.condata.MQTTVersion = 3;
  181. gagent_mqtt.condata.cleansession = 1;
  182. gagent_mqtt.condata.clientID.cstring = cloud->con->did;
  183. gagent_mqtt.condata.username.cstring = cloud->con->did;
  184. gagent_mqtt.condata.password.cstring = cloud->con->passcode;
  185. gagent_mqtt.buf_size = gagent_mqtt.readbuf_size = 1024;
  186. gagent_mqtt.buf = malloc(gagent_mqtt.buf_size);
  187. gagent_mqtt.readbuf = malloc(gagent_mqtt.readbuf_size);
  188. if(!(gagent_mqtt.readbuf && gagent_mqtt.readbuf))
  189. {
  190. gagent_err("mqtt malloc failed!\n");
  191. return -RT_ENOMEM;
  192. }
  193. gagent_mqtt.connect_callback = gagent_mqtt_connect_callback;
  194. gagent_mqtt.online_callback = gagent_mqtt_online_callback;
  195. gagent_mqtt.offline_callback = gagent_mqtt_offline_callback;
  196. memset(cloud->sub_topic[0], 0, sizeof(cloud->sub_topic[0]));
  197. snprintf(cloud->sub_topic[0], 128, "ser2cli_res/%s", cloud->con->did);
  198. gagent_mqtt.messageHandlers[0].topicFilter = cloud->sub_topic[0];
  199. gagent_mqtt.messageHandlers[0].callback = gagent_mqtt_callback;
  200. memset(cloud->sub_topic[1], 0,sizeof(cloud->sub_topic[1]));
  201. snprintf(cloud->sub_topic[1], 128, "app2dev/%s/#", cloud->con->did);
  202. gagent_mqtt.messageHandlers[1].topicFilter = cloud->sub_topic[1];
  203. gagent_mqtt.messageHandlers[1].callback = gagent_mqtt_callback;
  204. gagent_mqtt.defaultMessageHandler = gagent_mqtt_callback;
  205. gagent_dbg("host:%s port:%d\n", gagent_mqtt.host, gagent_mqtt.port);
  206. gagent_dbg("clientID:%s username:%s password:%s\n",
  207. gagent_mqtt.condata.clientID.cstring,
  208. gagent_mqtt.condata.username.cstring,
  209. gagent_mqtt.condata.password.cstring);
  210. gagent_dbg("topic:%s\n", gagent_mqtt.messageHandlers[0].topicFilter);
  211. return paho_mqtt_start(&gagent_mqtt);
  212. }