mqtt_client.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. /*
  2. * Copyright (C) 2012-2019 UCloud. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #ifndef C_SDK_MQTT_CLIENT_H_
  16. #define C_SDK_MQTT_CLIENT_H_
  17. #ifdef __cplusplus
  18. extern "C" {
  19. #endif
  20. #include <stddef.h>
  21. #include <stdbool.h>
  22. #include <stdint.h>
  23. #include "uiot_export.h"
  24. #include "uiot_import.h"
  25. #include "uiot_internal.h"
  26. #include "mqtt_client_net.h"
  27. #include "utils_timer.h"
  28. #include "utils_list.h"
  29. /* 报文id最大值 */
  30. #define MAX_PACKET_ID (65535)
  31. /* 成功订阅主题的最大个数 */
  32. #define MAX_SUB_TOPICS (10)
  33. /* 在列表中最大的重新发布数量 */
  34. #define MAX_REPUB_NUM (20)
  35. /* 重连最小等待时间 */
  36. #define MIN_RECONNECT_WAIT_INTERVAL (1000)
  37. /* MQTT报文最小超时时间 */
  38. #define MIN_COMMAND_TIMEOUT (500)
  39. /* MQTT报文最大超时时间 */
  40. #define MAX_COMMAND_TIMEOUT (5000)
  41. /* 云端保留主题的最大长度 */
  42. #define MAX_SIZE_OF_CLOUD_TOPIC (128)
  43. /* 动态注册相关的topic */
  44. #define DYNAMIC_REGISTER_PUB_TEMPLATE "/$system/%s/%s/password"
  45. #define DYNAMIC_REGISTER_SUB_TEMPLATE "/$system/%s/%s/password_reply"
  46. /* 获取设备密钥的消息中的字段 */
  47. #define PASSWORD_REPLY_RETCODE "RetCode"
  48. #define PASSWORD_REPLY_REQUEST_ID "RequestID"
  49. #define PASSWORD_REPLY_PASSWORD "Password"
  50. /**
  51. * @brief MQTT Message Type
  52. */
  53. typedef enum msgTypes {
  54. RESERVED = 0, // Reserved
  55. CONNECT = 1, // Client request to connect to Server
  56. CONNACK = 2, // Connect Acknowledgment
  57. PUBLISH = 3, // Publish message
  58. PUBACK = 4, // Publish Acknowledgment
  59. PUBREC = 5, // Publish Received
  60. PUBREL = 6, // Publish Release
  61. PUBCOMP = 7, // Publish Complete
  62. SUBSCRIBE = 8, // Client Subscribe request
  63. SUBACK = 9, // Subscribe Acknowledgment
  64. UNSUBSCRIBE = 10, // Client Unsubscribe request
  65. UNSUBACK = 11, // Unsubscribe Acknowledgment
  66. PINGREQ = 12, // PING Request
  67. PINGRESP = 13, // PING Response
  68. DISCONNECT = 14 // Client is Disconnecting
  69. } MessageTypes;
  70. typedef enum {
  71. DISCONNECTED = 0,
  72. CONNECTED = 1
  73. } ConnStatus;
  74. typedef enum {
  75. STATIC_AUTH = 1,
  76. DYNAMIC_AUTH = 2,
  77. } AuthStatus;
  78. /**
  79. * MQTT byte 1: fixed header
  80. * bits |7654: Message Type | 3:DUP flag | 21:QoS level | 0:RETAIN |
  81. */
  82. #define MQTT_HEADER_TYPE_SHIFT 0x04
  83. #define MQTT_HEADER_TYPE_MASK 0xF0
  84. #define MQTT_HEADER_DUP_SHIFT 0x03
  85. #define MQTT_HEADER_DUP_MASK 0x08
  86. #define MQTT_HEADER_QOS_SHIFT 0x01
  87. #define MQTT_HEADER_QOS_MASK 0x06
  88. #define MQTT_HEADER_RETAIN_MASK 0x01
  89. /**
  90. * @brief MQTT 遗嘱消息参数结构体定义
  91. *
  92. * 当客户端异常断开连接, 若客户端在连接时有设置遗嘱消息, 服务端会发布遗嘱消息。
  93. */
  94. typedef struct {
  95. char struct_id[4]; // The eyecatcher for this structure. must be MQTW
  96. uint8_t struct_version; // 结构体 0
  97. char *topic_name; // 遗嘱消息主题名
  98. char *message; // 遗嘱消息负载部分数据
  99. uint8_t retained; // 遗嘱消息retain标志位
  100. QoS qos; // 遗嘱消息qos标志位
  101. } WillOptions;
  102. /**
  103. * MQTT遗嘱消息结构体默认值定义
  104. */
  105. #define DEFAULT_WILL_OPTIONS { {'M', 'Q', 'T', 'W'}, 0, NULL, NULL, 0, QOS0 }
  106. /**
  107. * @brief MQTT 连接参数结构体定义
  108. *
  109. */
  110. typedef struct {
  111. char *client_id; // 客户端标识符, 请保持唯一
  112. char *username; // 用户名
  113. char *password; // 密码
  114. char struct_id[4]; // The eyecatcher for this structure. must be MQTC.
  115. uint8_t struct_version; // 结构体版本号, 必须为0
  116. uint8_t mqtt_version; // MQTT版本协议号 4 = 3.1.1
  117. uint16_t keep_alive_interval; // 心跳周期, 单位: s
  118. uint8_t clean_session; // 清理会话标志位, 具体含义请参考MQTT协议说明文档3.1.2.4小结
  119. uint8_t auto_connect_enable; // 是否开启自动重连
  120. } MQTTConnectParams;
  121. /**
  122. * MQTT连接参数结构体默认值定义
  123. */
  124. #define DEFAULT_MQTT_CONNECT_PARAMS { NULL, NULL, NULL, {'M', 'Q', 'T', 'C'}, 0, 4, 240, 1 , 1,}
  125. /**
  126. * @brief 订阅主题对应的消息处理结构体定义
  127. */
  128. typedef struct SubTopicHandle {
  129. const char *topic_filter; // 订阅主题名, 可包含通配符
  130. OnMessageHandler message_handler; // 订阅主题消息回调:wq函数指针
  131. void *message_handler_data; // 用户数据, 通过回调函数返回
  132. QoS qos; // 服务质量等级
  133. } SubTopicHandle;
  134. /**
  135. * @brief MQTT Client结构体定义
  136. */
  137. typedef struct Client {
  138. uint8_t is_connected; // 网络是否连接
  139. uint8_t was_manually_disconnected; // 是否手动断开连接
  140. uint8_t is_ping_outstanding; // 心跳包是否未完成, 即未收到服务器响应
  141. uint16_t next_packet_id; // MQTT报文标识符
  142. uint32_t command_timeout_ms; // MQTT消息超时时间, 单位:ms
  143. uint32_t current_reconnect_wait_interval; // MQTT重连周期, 单位:ms
  144. uint32_t counter_network_disconnected; // 网络断开连接次数
  145. size_t write_buf_size; // 消息发送buffer长度
  146. size_t read_buf_size; // 消息接收buffer长度
  147. unsigned char write_buf[UIOT_MQTT_TX_BUF_LEN]; // MQTT消息发送buffer
  148. unsigned char read_buf[UIOT_MQTT_RX_BUF_LEN]; // MQTT消息接收buffer
  149. void *lock_generic; // client原子锁
  150. void *lock_write_buf; // 输出流的锁
  151. void *lock_list_pub; // 等待发布消息ack列表的锁
  152. void *lock_list_sub; // 等待订阅消息ack列表的锁
  153. List *list_pub_wait_ack; // 等待发布消息ack列表
  154. List *list_sub_wait_ack; // 等待订阅消息ack列表
  155. MQTTEventHandler event_handler; // 事件句柄
  156. MQTTConnectParams options; // 连接相关参数
  157. utils_network_t network_stack; // MQTT底层使用的网络参数
  158. Timer ping_timer; // MQTT心跳包发送定时器
  159. Timer reconnect_delay_timer; // MQTT重连定时器, 判断是否已到重连时间
  160. SubTopicHandle sub_handles[MAX_SUB_TOPICS]; // 订阅主题对应的消息处理结构数组
  161. } UIoT_Client;
  162. /**
  163. * @brief MQTT协议版本
  164. */
  165. typedef enum {
  166. MQTT_3_1_1 = 4
  167. } MQTT_VERSION;
  168. typedef enum MQTT_NODE_STATE {
  169. MQTT_NODE_STATE_NORMAL = 0,
  170. MQTT_NODE_STATE_INVALID,
  171. } MQTTNodeState;
  172. /* 记录已经发布的topic的信息 */
  173. typedef struct REPUBLISH_INFO {
  174. Timer pub_start_time; /* 发布的时间 */
  175. MQTTNodeState node_state; /* 节点状态 */
  176. uint16_t msg_id; /* 发布消息的packet id */
  177. uint32_t len; /* 消息长度 */
  178. unsigned char *buf; /* 消息内容 */
  179. } UIoTPubInfo;
  180. /* 记录已经订阅的topic的信息 */
  181. typedef struct SUBSCRIBE_INFO {
  182. enum msgTypes type; /* 类型, (sub or unsub) */
  183. uint16_t msg_id; /* 订阅/取消订阅的 packet id */
  184. Timer sub_start_time; /* 订阅的开始时间 */
  185. MQTTNodeState node_state; /* 节点状态 */
  186. SubTopicHandle handler; /* handle of topic subscribed(unsubscribed) */
  187. uint16_t len; /* 消息长度 */
  188. unsigned char *buf; /* 消息内容 */
  189. } UIoTSubInfo;
  190. /**
  191. * @brief 对结构体Client进行初始化
  192. *
  193. * @param pClient MQTT客户端结构体
  194. * @param pParams MQTT客户端初始化参数
  195. * @return 返回SUCCESS, 表示成功
  196. */
  197. int uiot_mqtt_init(UIoT_Client *pClient, MQTTInitParams *pParams);
  198. /**
  199. * @brief 建立基于TLS的MQTT连接
  200. *
  201. * 注意: Client ID不能为NULL或空字符串
  202. *
  203. * @param pClient MQTT客户端结构体
  204. * @param pParams MQTT连接相关参数, 可参考MQTT协议说明
  205. * @return 返回SUCCESS, 表示成功
  206. */
  207. int uiot_mqtt_connect(UIoT_Client *pClient, MQTTConnectParams *pParams);
  208. /**
  209. * @brief 与服务器重新建立MQTT连接
  210. *
  211. * 1. 与服务器重新建立MQTT连接
  212. * 2. 连接成功后, 重新订阅之前的订阅过的主题
  213. *
  214. * @param pClient MQTT Client结构体
  215. *
  216. * @return 返回ERR_MQTT_RECONNECTED, 表示重连成功
  217. */
  218. int uiot_mqtt_attempt_reconnect(UIoT_Client *pClient);
  219. /**
  220. * @brief 断开MQTT连接
  221. *
  222. * @param pClient MQTT Client结构体
  223. * @return 返回SUCCESS, 表示成功
  224. */
  225. int uiot_mqtt_disconnect(UIoT_Client *pClient);
  226. /**
  227. * @brief 发布MQTT消息
  228. *
  229. * @param pClient MQTT客户端结构体
  230. * @param topicName 主题名
  231. * @param pParams 发布参数
  232. * @return < 0 : 表示失败
  233. * >= 0 : 返回唯一的packet id
  234. */
  235. int uiot_mqtt_publish(UIoT_Client *pClient, char *topicName, PublishParams *pParams);
  236. /**
  237. * @brief 订阅MQTT主题
  238. *
  239. * @param pClient MQTT客户端结构体
  240. * @param topicFilter 主题过滤器, 可参考MQTT协议说明 4.7
  241. * @param pParams 订阅参数
  242. * @return < 0 : 表示失败
  243. * >= 0 : 返回唯一的packet id
  244. */
  245. int uiot_mqtt_subscribe(UIoT_Client *pClient, char *topicFilter, SubscribeParams *pParams);
  246. /**
  247. * @brief 重新订阅断开连接之前已订阅的主题
  248. *
  249. * @param pClient MQTT客户端结构体
  250. * @return 返回SUCCESS, 表示成功
  251. */
  252. int uiot_mqtt_resubscribe(UIoT_Client *pClient);
  253. /**
  254. * @brief 取消订阅已订阅的MQTT主题
  255. *
  256. * @param pClient MQTT客户端结构体
  257. * @param topicFilter 主题过滤器, 可参考MQTT协议说明 4.7
  258. * @return < 0 : 表示失败
  259. * >= 0 : 返回唯一的packet id
  260. */
  261. int uiot_mqtt_unsubscribe(UIoT_Client *pClient, char *topicFilter);
  262. /**
  263. * @brief 在当前线程为底层MQTT客户端让出一定CPU执行时间
  264. *
  265. * 在这段时间内, MQTT客户端会用用处理消息接收, 以及发送PING报文, 监控网络状态
  266. *
  267. * @param pClient MQTT Client结构体
  268. * @param timeout_ms Yield操作超时时间
  269. * @return 返回SUCCESS, 表示成功, 返回ERR_MQTT_ATTEMPTING_RECONNECT, 表示正在重连
  270. */
  271. int uiot_mqtt_yield(UIoT_Client *pClient, uint32_t timeout_ms);
  272. /**
  273. * @brief 客户端自动重连是否开启
  274. *
  275. * @param pClient MQTT客户端结构体
  276. * @return 返回true, 表示客户端已开启自动重连功能
  277. */
  278. bool uiot_mqtt_is_autoreconnect_enabled(UIoT_Client *pClient);
  279. /**
  280. * @brief 设置客户端是否开启自动重连
  281. *
  282. * @param pClient MQTT客户端结构体
  283. * @param value 是否开启该功能
  284. * @return 返回SUCCESS, 表示设置成功
  285. */
  286. int uiot_mqtt_set_autoreconnect(UIoT_Client *pClient, bool value);
  287. /**
  288. * @brief 获取网络断开的次数
  289. *
  290. * @param pClient MQTT Client结构体
  291. * @return 返回客户端MQTT网络断开的次数
  292. */
  293. int uiot_mqtt_get_network_disconnected_count(UIoT_Client *pClient);
  294. /**
  295. * @brief 重置连接断开次数
  296. *
  297. * @param pClient MQTT Client结构体
  298. * @return 返回SUCCESS, 表示设置成功
  299. */
  300. int uiot_mqtt_reset_network_disconnected_count(UIoT_Client *pClient);
  301. /**
  302. * @brief 获取报文标识符
  303. *
  304. * @param pClient
  305. * @return
  306. */
  307. uint16_t get_next_packet_id(UIoT_Client *pClient);
  308. /**
  309. *
  310. * @param header
  311. * @param message_type
  312. * @param qos
  313. * @param dup
  314. * @param retained
  315. * @return
  316. */
  317. int mqtt_init_packet_header(unsigned char *header, MessageTypes message_type, QoS qos, uint8_t dup, uint8_t retained);
  318. /**
  319. * @brief 接收服务端消息
  320. *
  321. * @param pClient
  322. * @param timer
  323. * @param packet_type
  324. * @param qos
  325. * @return
  326. */
  327. int cycle_for_read(UIoT_Client *pClient, Timer *timer, uint8_t *packet_type, QoS qos);
  328. /**
  329. * @brief 发送报文数据
  330. *
  331. * @param pClient Client结构体
  332. * @param length 报文长度
  333. * @param timer 定时器
  334. * @return
  335. */
  336. int send_mqtt_packet(UIoT_Client *pClient, size_t length, Timer *timer);
  337. /**
  338. * @brief 等待指定类型的MQTT控制报文
  339. *
  340. * only used in single-threaded mode where one command at a time is in process
  341. *
  342. * @param pClient MQTT Client结构体
  343. * @param packet_type 控制报文类型
  344. * @param timer 定时器
  345. * @return
  346. */
  347. int wait_for_read(UIoT_Client *pClient, uint8_t packet_type, Timer *timer, QoS qos);
  348. /**
  349. * @brief 设置MQTT当前连接状态
  350. *
  351. * @param pClient Client结构体
  352. * @param connected 0: 连接断开状态 1: 已连接状态
  353. * @return
  354. */
  355. void set_client_conn_state(UIoT_Client *pClient, uint8_t connected);
  356. /**
  357. * @brief 获取MQTT当前连接状态
  358. *
  359. * @param pClient Client结构体
  360. * @return 0: 连接断开状态 1: 已连接状态
  361. */
  362. uint8_t get_client_conn_state(UIoT_Client *pClient);
  363. /**
  364. * @brief 检查 Publish ACK 等待列表,若有成功接收或者超时,则将对应节点从列表中移除
  365. *
  366. * @param pClient MQTT客户端
  367. * @return 返回SUCCESS, 表示成功
  368. */
  369. int uiot_mqtt_pub_info_proc(UIoT_Client *pClient);
  370. /**
  371. * @brief 检查 Subscribe ACK 等待列表,若有成功接收或者超时,则将对应节点从列表中移除
  372. *
  373. * @param pClient MQTT客户端
  374. * @return 返回SUCCESS, 表示成功
  375. */
  376. int uiot_mqtt_sub_info_proc(UIoT_Client *pClient);
  377. int push_sub_info_to(UIoT_Client *c, int len, unsigned short msgId, MessageTypes type,
  378. SubTopicHandle *handler, ListNode **node);
  379. int serialize_pub_ack_packet(unsigned char *buf, size_t buf_len, MessageTypes packet_type, uint8_t dup,
  380. uint16_t packet_id,
  381. uint32_t *serialized_len);
  382. int serialize_packet_with_zero_payload(unsigned char *buf, size_t buf_len, MessageTypes packetType, uint32_t *serialized_len);
  383. int deserialize_publish_packet(unsigned char *dup, QoS *qos, uint8_t *retained, uint16_t *packet_id, char **topicName,
  384. uint16_t *topicNameLen, unsigned char **payload, size_t *payload_len, unsigned char *buf, size_t buf_len);
  385. int deserialize_suback_packet(uint16_t *packet_id, uint32_t max_count, uint32_t *count,
  386. QoS *grantedQoSs, unsigned char *buf, size_t buf_len);
  387. int deserialize_unsuback_packet(uint16_t *packet_id, unsigned char *buf, size_t buf_len);
  388. int deserialize_ack_packet(uint8_t *packet_type, uint8_t *dup, uint16_t *packet_id, unsigned char *buf, size_t buf_len);
  389. bool parse_mqtt_payload_retcode_type(char *pJsonDoc, uint32_t *pRetCode);
  390. bool parse_mqtt_state_request_id_type(char *pJsonDoc, char **pType);
  391. bool parse_mqtt_state_password_type(char *pJsonDoc, char **pType);
  392. #ifdef MQTT_CHECK_REPEAT_MSG
  393. void reset_repeat_packet_id_buffer(void);
  394. #endif
  395. /**
  396. * @brief 根据剩余长度计算整个MQTT报文的长度
  397. *
  398. * @param rem_len 剩余长度
  399. * @return 整个MQTT报文的长度
  400. */
  401. size_t get_mqtt_packet_len(size_t rem_len);
  402. size_t mqtt_write_packet_rem_len(unsigned char *buf, uint32_t length);
  403. int mqtt_read_packet_rem_len_form_buf(unsigned char *buf, uint32_t *value, uint32_t *readBytesLen);
  404. uint16_t mqtt_read_uint16_t(unsigned char **pptr);
  405. unsigned char mqtt_read_char(unsigned char **pptr);
  406. void mqtt_write_char(unsigned char **pptr, unsigned char c);
  407. void mqtt_write_uint_16(unsigned char **pptr, uint16_t anInt);
  408. void mqtt_write_utf8_string(unsigned char **pptr, const char *string);
  409. #ifdef __cplusplus
  410. }
  411. #endif
  412. #endif //C_SDK_MQTT_CLIENT_H_