RyanMqttThread.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. #define DBG_ENABLE
  2. #define DBG_SECTION_NAME RyanMqttTag
  3. #define DBG_LEVEL LOG_LVL_WARNING
  4. #define DBG_COLOR
  5. #include "RyanMqttPublic.h"
  6. #include "RyanMqttUtile.h"
  7. #include "RyanMqttThread.h"
  8. /**
  9. * @brief mqtt心跳保活
  10. *
  11. * @param client
  12. * @return int32_t
  13. */
  14. RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
  15. {
  16. int32_t connectState = RyanMqttConnectAccepted;
  17. int32_t packetLen = 0;
  18. RyanMqttAssert(NULL != client);
  19. // 如果没有连接则不需要心跳保活
  20. RyanMqttCheck(mqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError);
  21. // 服务器在心跳时间的1.5倍内没有收到消息则会断开连接
  22. // 在心跳的一半发送keepalive
  23. if (platformTimerRemain(&client->keepaliveTimer) != 0)
  24. return RyanMqttSuccessError;
  25. // 心跳超时,断开连接
  26. connectState = RyanMqttKeepaliveTimeout;
  27. RyanMqttCheckCode(2 > client->keepaliveTimeoutCount, RyanMqttKeepaliveTimeout,
  28. { RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState); });
  29. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  30. packetLen = MQTTSerialize_pingreq(client->config->sendBuffer, client->config->sendBufferSize);
  31. if (packetLen > 0)
  32. RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  33. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  34. client->keepaliveTimeoutCount++;
  35. platformTimerCutdown(&client->keepaliveTimer, client->config->keepaliveTimeoutS * 1000 / 2); // 启动心跳定时器
  36. return RyanMqttSuccessError;
  37. }
  38. /**
  39. * @brief
  40. *
  41. * @param client
  42. * @return int32_t
  43. */
  44. static RyanMqttError_e RyanMqttGetPayloadLen(RyanMqttClient_t *client, uint32_t *payloadLen)
  45. {
  46. RyanMqttError_e result = RyanMqttSuccessError;
  47. uint8_t len = 0;
  48. uint8_t encodedByte = 0;
  49. int32_t multiplier = 1;
  50. RyanMqttAssert(NULL != client);
  51. // RyanMqttAssert(NULL != payloadLen); payloadLen == 0时会误判
  52. do
  53. {
  54. RyanMqttCheck(++len < 4, RyanMqttFailedError);
  55. result = RyanMqttRecvPacket(client, &encodedByte, 1);
  56. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttFailedError);
  57. *payloadLen += (encodedByte & 127) * multiplier; // 根据 MQTT 协议解码数据长度
  58. multiplier *= 128;
  59. } while ((encodedByte & 128) != 0);
  60. RyanMqttCheck(*payloadLen <= RyanMqttMaxPayloadLen, RyanMqttFailedError);
  61. return RyanMqttSuccessError;
  62. }
  63. /**
  64. * @brief qos1或者qos2接收消息成功
  65. *
  66. * @param client
  67. * @return RyanMqttError_e
  68. */
  69. static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *client)
  70. {
  71. RyanMqttError_e result = RyanMqttSuccessError;
  72. uint8_t dup = 0;
  73. uint8_t packetType = 0;
  74. uint16_t packetId = 0;
  75. RyanMqttAckHandler_t *ackHandler = NULL;
  76. RyanMqttAssert(NULL != client);
  77. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, client->config->recvBuffer, client->config->recvBufferSize);
  78. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError);
  79. // 可能会多次收到puback / pubcomp,仅在首次收到时触发发布成功回调函数
  80. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
  81. RyanMqttCheckCode(RyanMqttSuccessError == result, result, { LOG_I("packetType: %d, packetId: %d", packetType, packetId); });
  82. RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
  83. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  84. return result;
  85. }
  86. /**
  87. * @brief 发布释放处理函数
  88. *
  89. * @param client
  90. * @return RyanMqttError_e
  91. */
  92. static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
  93. {
  94. RyanMqttError_e result = RyanMqttFailedError;
  95. uint8_t dup = 0;
  96. uint8_t packetType = 0;
  97. uint16_t packetId = 0;
  98. int32_t packetLen = 0;
  99. RyanMqttAckHandler_t *ackHandler = NULL;
  100. RyanMqttAssert(NULL != client);
  101. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, client->config->recvBuffer, client->config->recvBufferSize);
  102. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError);
  103. // 制作确认数据包并发送
  104. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  105. packetLen = MQTTSerialize_ack(client->config->sendBuffer, client->config->sendBufferSize, PUBCOMP, 0, packetId);
  106. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  107. // 每次收到PUBREL都返回消息
  108. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  109. RyanMqttCheckCode(RyanMqttSuccessError == result, result, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  110. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  111. // 删除pubrel记录
  112. result = RyanMqttAckListNodeFind(client, PUBREL, packetId, &ackHandler);
  113. if (RyanMqttSuccessError == result)
  114. RyanMqttAckHandlerDestroy(client, ackHandler);
  115. }
  116. /**
  117. * @brief 发布收到处理函数
  118. *
  119. * @param client
  120. * @return RyanMqttError_e
  121. */
  122. static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
  123. {
  124. RyanMqttError_e result = RyanMqttFailedError;
  125. uint8_t dup = 0;
  126. RyanBool_e fastFlag = RyanFalse;
  127. uint8_t packetType = 0;
  128. uint16_t packetId = 0;
  129. int32_t packetLen = 0;
  130. RyanMqttMsgHandler_t *msgHandler = NULL;
  131. RyanMqttAckHandler_t *ackHandler = NULL;
  132. RyanMqttAckHandler_t *ackHandlerPubrec = NULL;
  133. RyanMqttAssert(NULL != client);
  134. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, client->config->recvBuffer, client->config->recvBufferSize);
  135. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError);
  136. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  137. result = RyanMqttAckListNodeFind(client, PUBREC, packetId, &ackHandlerPubrec);
  138. if (RyanMqttSuccessError == result)
  139. {
  140. // 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
  141. result = RyanMqttAckListNodeFind(client, PUBCOMP, packetId, &ackHandler);
  142. if (RyanMqttSuccessError != result)
  143. fastFlag = RyanTrue;
  144. // 出现pubrec和pubcomp同时存在的情况,清除pubrec理论上不会出现
  145. else
  146. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  147. }
  148. // 制作确认数据包并发送
  149. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  150. // 序列化发布释放报文
  151. packetLen = MQTTSerialize_ack(client->config->sendBuffer, client->config->sendBufferSize, PUBREL, 0, packetId);
  152. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  153. // 每次收到PUBREC都返回ack
  154. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  155. RyanMqttCheckCode(RyanMqttSuccessError == result, result, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  156. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  157. if (RyanTrue == fastFlag)
  158. {
  159. result = RyanMqttMsgHandlerCreate(ackHandlerPubrec->msgHandler->topic,
  160. strlen(ackHandlerPubrec->msgHandler->topic),
  161. ackHandlerPubrec->msgHandler->qos, &msgHandler);
  162. RyanMqttCheckCode(RyanMqttSuccessError == result, result,
  163. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  164. // 创建一个 ACK 处理程序节点
  165. result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, msgHandler, &ackHandler);
  166. RyanMqttCheckCode(
  167. RyanMqttSuccessError == result, result,
  168. { platformMemoryFree(msgHandler);
  169. platformMutexUnLock(client->config->userData, client->sendBufLock); });
  170. }
  171. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  172. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  173. if (RyanTrue == fastFlag)
  174. {
  175. result = RyanMqttAckListAdd(client, ackHandler);
  176. // 保证等待PUBCOMP记录成功后再清除PUBREC记录
  177. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  178. }
  179. return result;
  180. }
  181. /**
  182. * @brief 收到服务器发布消息处理函数
  183. *
  184. * @param client
  185. * @return RyanMqttError_e
  186. */
  187. static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
  188. {
  189. RyanMqttError_e result = RyanMqttSuccessError;
  190. int32_t packetLen = 0;
  191. RyanBool_e deliverMsgFlag = RyanFalse;
  192. MQTTString topicName = MQTTString_initializer;
  193. RyanMqttMsgData_t msgData = {0};
  194. RyanMqttMsgHandler_t *msgHandler = NULL;
  195. RyanMqttAckHandler_t *ackHandler = NULL;
  196. RyanMqttAssert(NULL != client);
  197. result = MQTTDeserialize_publish(&msgData.dup, &msgData.qos, &msgData.retained, &msgData.packetId, &topicName,
  198. &msgData.payload, (int *)&msgData.payloadLen, client->config->recvBuffer, client->config->recvBufferSize);
  199. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError);
  200. // 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
  201. result = RyanMqttMsgHandlerFind(client, topicName.lenstring.data, topicName.lenstring.len, RyanTrue, &msgHandler);
  202. RyanMqttCheck(RyanMqttSuccessError == result, result);
  203. switch (msgData.qos)
  204. {
  205. case QOS0:
  206. deliverMsgFlag = RyanTrue;
  207. break;
  208. case QOS1:
  209. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  210. packetLen = MQTTSerialize_ack(client->config->sendBuffer, client->config->sendBufferSize, PUBACK, 0, msgData.packetId);
  211. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError,
  212. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  213. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  214. RyanMqttCheckCode(RyanMqttSuccessError == result, result,
  215. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  216. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  217. deliverMsgFlag = RyanTrue;
  218. break;
  219. case QOS2:
  220. {
  221. RyanBool_e fastFlag = RyanFalse;
  222. // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish不进行qos2消息处理
  223. result = RyanMqttAckListNodeFind(client, PUBREL, msgData.packetId, &ackHandler);
  224. if (RyanMqttSuccessError != result)
  225. fastFlag = RyanTrue;
  226. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  227. packetLen = MQTTSerialize_ack(client->config->sendBuffer, client->config->sendBufferSize, PUBREC, 0, msgData.packetId);
  228. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError,
  229. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  230. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  231. RyanMqttCheckCode(RyanMqttSuccessError == result, result,
  232. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  233. if (RyanTrue == fastFlag)
  234. {
  235. result = RyanMqttMsgHandlerCreate(topicName.lenstring.data, topicName.lenstring.len, msgData.qos, &msgHandler);
  236. RyanMqttCheckCode(RyanMqttSuccessError == result, result, {
  237. platformMutexUnLock(client->config->userData, client->sendBufLock);
  238. });
  239. result = RyanMqttAckHandlerCreate(client, PUBREL, msgData.packetId, packetLen, msgHandler, &ackHandler);
  240. RyanMqttCheckCode(RyanMqttSuccessError == result, result, {
  241. platformMemoryFree(msgHandler);
  242. platformMutexUnLock(client->config->userData, client->sendBufLock);
  243. });
  244. }
  245. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  246. if (RyanTrue == fastFlag)
  247. {
  248. result = RyanMqttAckListAdd(client, ackHandler);
  249. deliverMsgFlag = RyanTrue;
  250. }
  251. }
  252. break;
  253. default:
  254. break;
  255. }
  256. if (RyanTrue == deliverMsgFlag)
  257. {
  258. // 复制主题名字
  259. result = RyanMqttStringCopy(&msgData.topic, topicName.lenstring.data, topicName.lenstring.len);
  260. RyanMqttCheck(RyanMqttSuccessError == result, result);
  261. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  262. platformMemoryFree(msgData.topic);
  263. }
  264. return result;
  265. }
  266. /**
  267. * @brief 订阅确认处理函数
  268. *
  269. * @param client
  270. * @return RyanMqttError_e
  271. */
  272. static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
  273. {
  274. RyanMqttError_e result = RyanMqttSuccessError;
  275. int32_t count = 0;
  276. int32_t grantedQoS = 0;
  277. uint16_t packetId = 0;
  278. RyanMqttMsgHandler_t *msgHandler = NULL;
  279. RyanMqttAckHandler_t *ackHandler = NULL;
  280. RyanMqttAssert(NULL != client);
  281. result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, client->config->recvBuffer, client->config->recvBufferSize);
  282. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError);
  283. // ack链表不存在当前订阅确认节点就直接退出
  284. result = RyanMqttAckListNodeFind(client, SUBACK, packetId, &ackHandler);
  285. RyanMqttCheck(RyanMqttSuccessError == result, result);
  286. // 订阅失败
  287. if (subFail == grantedQoS)
  288. {
  289. // mqtt事件回调
  290. RyanMqttEventMachine(client, RyanMqttEventSubscribedFaile, (void *)ackHandler->msgHandler);
  291. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  292. return RyanMqttSuccessError;
  293. }
  294. // 订阅成功
  295. // 查找是否有同名订阅,如果有就销毁之前的
  296. result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, strlen(ackHandler->msgHandler->topic), RyanFalse, &msgHandler);
  297. if (RyanMqttSuccessError == result)
  298. RyanMqttMsgHandlerDestory(msgHandler);
  299. // 服务端可以授予比订阅者要求的低一些的 QoS 等级。
  300. result = RyanMqttMsgHandlerCreate(ackHandler->msgHandler->topic,
  301. strlen(ackHandler->msgHandler->topic),
  302. grantedQoS, &msgHandler);
  303. RyanMqttCheck(RyanMqttSuccessError == result, result); // 这里创建失败了不触发回调,等待ack超时触发失败回调函数
  304. RyanMqttMsgHandlerAdd(client, msgHandler); // 将msg信息添加到订阅链表上
  305. RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)msgHandler); // mqtt回调函数
  306. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  307. return result;
  308. }
  309. /**
  310. * @brief 取消订阅确认处理函数
  311. *
  312. * @param client
  313. * @return RyanMqttError_e
  314. */
  315. static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client)
  316. {
  317. RyanMqttError_e result = RyanMqttFailedError;
  318. RyanMqttAckHandler_t *ackHandler = NULL;
  319. uint16_t packetId = 0;
  320. RyanMqttAssert(NULL != client);
  321. result = MQTTDeserialize_unsuback(&packetId, client->config->recvBuffer, client->config->recvBufferSize);
  322. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError);
  323. // ack链表不存在当前取消订阅确认节点就直接退出
  324. result = RyanMqttAckListNodeFind(client, UNSUBACK, packetId, &ackHandler);
  325. RyanMqttCheck(RyanMqttSuccessError == result, result);
  326. // mqtt事件回调
  327. RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
  328. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  329. return result;
  330. }
  331. /**
  332. * @brief mqtt数据包处理函数
  333. *
  334. * @param client
  335. * @param packetType
  336. * @return RyanMqttError_e
  337. */
  338. RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8_t *packetType)
  339. {
  340. RyanMqttError_e result = RyanMqttSuccessError;
  341. int32_t fixedHeaderLen = 1;
  342. uint32_t payloadLen = 0;
  343. MQTTHeader header = {0};
  344. RyanMqttAssert(NULL != client);
  345. // RyanMqttAssert(NULL != packetType); packetType == 0时会误判
  346. // 1.读取标头字节。 其中包含数据包类型
  347. result = RyanMqttRecvPacket(client, client->config->recvBuffer, fixedHeaderLen);
  348. if (RyanMqttRecvPacketTimeOutError == result)
  349. return RyanMqttRecvPacketTimeOutError;
  350. RyanMqttCheck(RyanMqttSuccessError == result, result);
  351. // 填充联合体标头信息
  352. header.byte = client->config->recvBuffer[0];
  353. LOG_I("packetType: %d", header.bits.type);
  354. RyanMqttCheck(CONNECT <= header.bits.type && DISCONNECT >= header.bits.type, result);
  355. // 2.读取mqtt报文剩余长度。 这本身是可变的
  356. result = RyanMqttGetPayloadLen(client, &payloadLen);
  357. RyanMqttCheck(RyanMqttSuccessError == result, result);
  358. // 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
  359. fixedHeaderLen += MQTTPacket_encode(client->config->recvBuffer + fixedHeaderLen, payloadLen);
  360. RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config->recvBufferSize, RyanMqttRecvBufToShortError,
  361. { RyanMqttRecvPacket(client, client->config->recvBuffer, payloadLen); });
  362. // 3.读取mqtt载荷数据并放到读取缓冲区
  363. if (payloadLen > 0)
  364. {
  365. result = RyanMqttRecvPacket(client, client->config->recvBuffer + fixedHeaderLen, payloadLen);
  366. RyanMqttCheck(RyanMqttSuccessError == result, result);
  367. }
  368. // 控制报文类型
  369. switch (header.bits.type)
  370. {
  371. case CONNACK: // 连接报文确认
  372. break;
  373. case PUBACK: // QoS 1消息发布收到确认
  374. case PUBCOMP: // 发布完成(qos2 第三步)
  375. result = RyanMqttPubackAndPubcompPacketHandler(client);
  376. break;
  377. case PUBREC: // 发布收到(qos2 第一步)
  378. result = RyanMqttPubrecPacketHandler(client);
  379. break;
  380. case PUBREL: // 发布释放(qos2 第二步)
  381. result = RyanMqttPubrelPacketHandler(client);
  382. break;
  383. case SUBACK: // 订阅确认
  384. result = RyanMqttSubackHandler(client);
  385. break;
  386. case UNSUBACK: // 取消订阅确认
  387. result = RyanMqttUnSubackHandler(client);
  388. break;
  389. case PUBLISH: // 接收到订阅消息
  390. result = RyanMqttPublishPacketHandler(client);
  391. break;
  392. case PINGRESP: // 心跳响应
  393. client->keepaliveTimeoutCount = 0;
  394. break;
  395. default:
  396. break;
  397. }
  398. *packetType = header.bits.type;
  399. return result;
  400. }
  401. /**
  402. * @brief 遍历ack链表,进行相应的处理
  403. *
  404. * @param client
  405. * @param WaitFlag
  406. * WaitFlag : RyanFalse 表示不需要等待超时立即处理这些数据包。通常在重新连接后立即进行处理
  407. * WaitFlag : RyanTrue 表示需要等待超时再处理这些消息,一般是稳定连接下的超时处理
  408. */
  409. void RyanMqttAckListScan(RyanMqttClient_t *client, RyanBool_e WaitFlag)
  410. {
  411. RyanList_t *curr = NULL,
  412. *next = NULL;
  413. RyanMqttAckHandler_t *ackHandler = NULL;
  414. RyanMqttAssert(NULL != client);
  415. // 如果链表为空或者mqtt没有连接就退出
  416. if ((RyanListIsEmpty(&client->ackHandlerList)) || (mqttConnectState != RyanMqttGetClientState(client)))
  417. return;
  418. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  419. {
  420. // 获取此节点的结构体
  421. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  422. // ack响应没有超时就不进行处理
  423. if (0 != platformTimerRemain(&ackHandler->timer) && RyanTrue == WaitFlag)
  424. continue;
  425. switch (ackHandler->packetType)
  426. {
  427. // 发送qos1 / qos2消息, 服务器ack响应超时。需要重新发送它们。
  428. case PUBACK:
  429. case PUBREC:
  430. case PUBREL:
  431. case PUBCOMP:
  432. {
  433. if (mqttConnectState != RyanMqttGetClientState(client))
  434. continue;
  435. // 重发数据事件回调
  436. RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
  437. RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
  438. // 重置ack超时时间
  439. platformTimerCutdown(&ackHandler->timer, client->config->ackTimeout);
  440. ackHandler->repeatCount++;
  441. // 重发次数超过警告值回调
  442. if (ackHandler->repeatCount >= client->config->ackHandlerRepeatCountWarning)
  443. RyanMqttEventMachine(client, RyanMqttEventAckRepeatCountWarning, (void *)ackHandler);
  444. break;
  445. }
  446. // 订阅 / 取消订阅超时就认为失败
  447. case SUBACK:
  448. case UNSUBACK:
  449. {
  450. RyanMqttEventMachine(client, (SUBACK == ackHandler->packetType) ? RyanMqttEventSubscribedFaile : RyanMqttEventUnSubscribedFaile,
  451. (void *)ackHandler->msgHandler);
  452. // 清除句柄
  453. RyanMqttAckHandlerDestroy(client, ackHandler);
  454. break;
  455. }
  456. default:
  457. RyanMqttAssert(NULL); // 不应该为别的值
  458. break;
  459. }
  460. }
  461. }
  462. /**
  463. * @brief mqtt连接函数
  464. *
  465. * @param client
  466. * @return RyanMqttError_e
  467. */
  468. RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
  469. {
  470. RyanMqttError_e result = RyanMqttSuccessError;
  471. uint8_t sessionPresent = 0; // 会话存在标志
  472. uint8_t packetType = 0; // 接收到的报文类型
  473. int32_t packetLen = 0;
  474. int32_t connackRc = 0;
  475. MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
  476. RyanMqttAssert(NULL != client);
  477. RyanMqttAssert(NULL != client->network);
  478. RyanMqttAssert(NULL != client->config);
  479. RyanMqttCheck(mqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectAccepted);
  480. // 连接标志位
  481. connectData.clientID.cstring = client->config->clientId;
  482. connectData.username.cstring = client->config->userName;
  483. connectData.password.cstring = client->config->password;
  484. connectData.keepAliveInterval = client->config->keepaliveTimeoutS;
  485. connectData.cleansession = client->config->cleanSessionFlag;
  486. connectData.MQTTVersion = client->config->mqttVersion;
  487. if (RyanTrue == client->lwtFlag)
  488. {
  489. connectData.willFlag = 1;
  490. connectData.will.qos = client->lwtOptions->qos;
  491. connectData.will.retained = client->lwtOptions->retain;
  492. connectData.will.message.lenstring.data = client->lwtOptions->payload;
  493. connectData.will.message.lenstring.len = client->lwtOptions->payloadLen;
  494. connectData.will.topicName.cstring = client->lwtOptions->topic;
  495. }
  496. // 调用底层的连接函数连接上服务器
  497. result = platformNetworkConnect(client->config->userData, client->network, client->config->host, client->config->port);
  498. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttConnectNetWorkFail);
  499. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  500. // 序列化mqtt的CONNECT报文
  501. packetLen = MQTTSerialize_connect(client->config->sendBuffer, client->config->sendBufferSize, &connectData);
  502. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  503. // 发送序列化mqtt的CONNECT报文
  504. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  505. RyanMqttCheckCode(RyanMqttSuccessError == result, result, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  506. // 等待报文
  507. // mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
  508. result = RyanMqttReadPacketHandler(client, &packetType);
  509. RyanMqttCheckCode(RyanMqttSuccessError == result, result, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  510. RyanMqttCheckCode(CONNACK == packetType, RyanMqttConnectDisconnected, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  511. // 解析CONNACK报文
  512. result = MQTTDeserialize_connack(&sessionPresent, &connackRc, client->config->recvBuffer, client->config->recvBufferSize);
  513. RyanMqttCheckCode(1 == result, RyanMqttDeserializePacketError, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  514. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  515. LOG_I("result: %d, packetLen: %d, packetType: %d connackRc: %d", result, packetLen, packetType, connackRc);
  516. return connackRc;
  517. }
  518. /**
  519. * @brief mqtt事件处理函数
  520. *
  521. * @param client
  522. * @param eventId
  523. * @param eventData
  524. */
  525. void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
  526. {
  527. RyanMqttAssert(NULL != client);
  528. RyanMqttAssert(NULL != client->network);
  529. RyanMqttAssert(NULL != client->config);
  530. switch (eventId)
  531. {
  532. case RyanMqttEventConnected: // 连接成功
  533. client->keepaliveTimeoutCount = 0; // 重置心跳超时计数器
  534. platformTimerCutdown(&client->keepaliveTimer, (client->config->keepaliveTimeoutS * 1000 / 2)); // 启动心跳定时器
  535. RyanMqttAckListScan(client, RyanFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
  536. RyanMqttSetClientState(client, mqttConnectState);
  537. break;
  538. case RyanMqttEventDisconnected: // 断开连接事件
  539. platformNetworkClose(client->config->userData, client->network);
  540. if (RyanTrue == client->config->cleanSessionFlag)
  541. RyanMqttCleanSession(client);
  542. RyanMqttSetClientState(client, mqttDisconnectState); // 将客户端状态设置为断开连接
  543. break;
  544. case RyanMqttEventReconnectBefore: // 重连前回调
  545. RyanMqttSetClientState(client, mqttReconnectState);
  546. break;
  547. default:
  548. break;
  549. }
  550. if (client->config->mqttEventHandle == NULL)
  551. return;
  552. if (client->eventFlag & eventId)
  553. client->config->mqttEventHandle(client, eventId, eventData);
  554. }
  555. /**
  556. * @brief mqtt运行线程
  557. *
  558. * @param argument
  559. */
  560. void RyanMqttThread(void *argument)
  561. {
  562. int32_t result = 0;
  563. RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
  564. RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
  565. RyanMqttAssert(NULL != client->network); // RyanMqttStart前没有调用RyanMqttInit
  566. RyanMqttAssert(NULL != client->config); // RyanMqttStart前没有调用RyanMqttSetConfig
  567. while (1)
  568. {
  569. switch (client->clientState)
  570. {
  571. case mqttStartState: // 开始状态状态
  572. LOG_I("初始化状态,开始连接");
  573. result = RyanMqttConnect(client);
  574. RyanMqttEventMachine(client, RyanMqttConnectAccepted == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  575. (void *)&result);
  576. break;
  577. case mqttConnectState: // 连接状态
  578. LOG_I("连接状态");
  579. result = RyanMqttReadPacketHandler(client, NULL);
  580. RyanMqttAckListScan(client, 1);
  581. RyanMqttKeepalive(client);
  582. break;
  583. case mqttDisconnectState: // 断开连接状态
  584. LOG_I("断开连接状态");
  585. if (RyanTrue != client->config->autoReconnectFlag) // 没有使能自动连接就休眠线程
  586. platformThreadStop(client->config->userData, client->mqttThread);
  587. LOG_I("触发自动连接,%dms后开始连接\r\n", client->config->reconnectTimeout);
  588. platformDelay(client->config->reconnectTimeout);
  589. RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL); // 给上层触发重新连接前事件
  590. break;
  591. case mqttReconnectState:
  592. result = RyanMqttConnect(client);
  593. RyanMqttEventMachine(client, RyanMqttConnectAccepted == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  594. (void *)&result);
  595. break;
  596. default:
  597. RyanMqttAssert(NULL);
  598. break;
  599. }
  600. }
  601. }