RyanMqttThread.c 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  2. #include "RyanMqttThread.h"
  3. /**
  4. * @brief Resets the MQTT client's keepalive timer to 1.4 times the configured keepalive interval.
  5. *
  6. * Ensures the keepalive timer is refreshed in a thread-safe manner, allowing the client to detect connection loss if no heartbeat is received within this period.
  7. */
  8. void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
  9. {
  10. // 服务器在心跳时间的1.5倍内没有收到keeplive消息则会断开连接
  11. // 这里算 1.4 b倍时间内没有收到心跳就断开连接
  12. platformCriticalEnter(client->config.userData, &client->criticalLock);
  13. platformTimerCutdown(&client->keepaliveTimer, (uint32_t)(1000 * client->config.keepaliveTimeoutS * 1.4)); // 启动心跳定时器
  14. platformCriticalExit(client->config.userData, &client->criticalLock);
  15. }
  16. /**
  17. * @brief Manages MQTT keepalive by sending heartbeat (PINGREQ) packets and handling keepalive timeouts.
  18. *
  19. * Checks if the client is connected and monitors the keepalive timer. If the timer expires (no heartbeat within 1.4 times the keepalive period), triggers a disconnect event. Sends a PINGREQ packet when appropriate, based on remaining time and throttle conditions.
  20. *
  21. * @return RyanMqttError_e Returns RyanMqttSuccessError on success, RyanMqttNotConnectError if not connected, or an error code on failure.
  22. */
  23. static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
  24. {
  25. RyanMqttError_e result = RyanMqttFailedError;
  26. uint32_t timeRemain = 0;
  27. RyanMqttAssert(NULL != client);
  28. // mqtt没有连接就退出
  29. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  30. return RyanMqttNotConnectError;
  31. timeRemain = platformTimerRemain(&client->keepaliveTimer);
  32. // 超过设置的 1.4 倍心跳周期,主动通知用户断开连接
  33. if (0 == timeRemain)
  34. {
  35. RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
  36. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  37. rlog_d("ErrorCode: %d, strError: %s", RyanMqttKeepaliveTimeout, RyanMqttStrError(RyanMqttKeepaliveTimeout));
  38. return RyanMqttFailedError;
  39. }
  40. // 当剩余时间小于 recvtimeout 时强制发送心跳包
  41. if (timeRemain > client->config.recvTimeout)
  42. {
  43. // 当到达 0.9 倍时间时发送心跳包
  44. if (timeRemain < 1000 * 0.9 * client->config.keepaliveTimeoutS)
  45. return RyanMqttSuccessError;
  46. // 节流时间内不发送心跳报文
  47. if (platformTimerRemain(&client->keepaliveThrottleTimer))
  48. return RyanMqttSuccessError;
  49. }
  50. // 发送mqtt心跳包
  51. {
  52. // MQTT_PACKET_PINGREQ_SIZE
  53. MQTTStatus_t status = MQTTSuccess;
  54. uint8_t buffer[2] = {0};
  55. MQTTFixedBuffer_t fixedBuffer = {
  56. .pBuffer = buffer,
  57. .size = 2};
  58. // 序列化数据包
  59. status = MQTT_SerializePingreq(&fixedBuffer);
  60. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  61. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  62. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  63. platformTimerCutdown(&client->keepaliveThrottleTimer, client->config.recvTimeout + 1500); // 启动心跳检查节流定时器
  64. }
  65. return RyanMqttSuccessError;
  66. }
  67. /**
  68. * @brief Handles incoming PUBACK and PUBCOMP packets for QoS 1 and QoS 2 acknowledgments.
  69. *
  70. * Processes acknowledgment packets by deserializing the packet ID, locating the corresponding acknowledgment handler,
  71. * removing it from the acknowledgment list, triggering the published event callback, and destroying the handler.
  72. *
  73. * @param client Pointer to the MQTT client instance.
  74. * @param pIncomingPacket Pointer to the incoming MQTT packet information.
  75. * @return RyanMqttError_e Result code indicating success or the type of error encountered.
  76. */
  77. static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  78. {
  79. RyanMqttError_e result = RyanMqttSuccessError;
  80. uint16_t packetId = 0;
  81. RyanMqttAckHandler_t *ackHandler = NULL;
  82. MQTTStatus_t status = MQTTSuccess;
  83. RyanMqttAssert(NULL != client);
  84. // 反序列化ack包
  85. status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  86. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  87. // 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
  88. result = RyanMqttAckListNodeFind(client, pIncomingPacket->type & 0xF0U, packetId, &ackHandler);
  89. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { rlog_i("packetType: %02x, packetId: %d", pIncomingPacket->type & 0xF0U, packetId); });
  90. RyanMqttAckListRemoveToAckList(client, ackHandler);
  91. RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
  92. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  93. return result;
  94. }
  95. /**
  96. * @brief Handles incoming PUBREL packets as part of the MQTT QoS 2 message flow.
  97. *
  98. * Deserializes the PUBREL packet to extract the packet ID, removes the corresponding acknowledgment handler, and sends a PUBCOMP acknowledgment packet in response.
  99. *
  100. * @return RyanMqttError_e Result of the operation.
  101. */
  102. static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  103. {
  104. RyanMqttError_e result = RyanMqttFailedError;
  105. uint16_t packetId = 0;
  106. RyanMqttAckHandler_t *ackHandler = NULL;
  107. MQTTStatus_t status = MQTTSuccess;
  108. RyanMqttAssert(NULL != client);
  109. // 反序列化ack包
  110. status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  111. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  112. // 删除pubrel记录
  113. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, packetId, &ackHandler);
  114. if (RyanMqttSuccessError == result)
  115. {
  116. RyanMqttAckListRemoveToAckList(client, ackHandler);
  117. RyanMqttAckHandlerDestroy(client, ackHandler);
  118. }
  119. // 制作确认数据包并发送
  120. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  121. MQTTFixedBuffer_t fixedBuffer = {
  122. .pBuffer = buffer,
  123. .size = MQTT_PUBLISH_ACK_PACKET_SIZE};
  124. // 序列化ack数据包
  125. status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBCOMP, packetId);
  126. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  127. // 每次收到PUBREL都返回消息
  128. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  129. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  130. return RyanMqttSuccessError;
  131. }
  132. /**
  133. * @brief Handles incoming PUBREC packets as part of MQTT QoS 2 message flow.
  134. *
  135. * Deserializes the PUBREC packet to extract the packet ID, serializes and sends a PUBREL acknowledgment in response, and manages acknowledgment handlers. On first receipt of a PUBREC for a given packet ID (i.e., when no PUBCOMP handler exists), creates a message handler and a PUBCOMP acknowledgment handler, adds it to the acknowledgment list, and removes the PUBREC handler. If both PUBREC and PUBCOMP handlers exist, removes the redundant PUBREC handler. Ensures correct progression of the QoS 2 handshake.
  136. *
  137. * @return RyanMqttError_e Result of the operation.
  138. */
  139. static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  140. {
  141. RyanMqttError_e result = RyanMqttFailedError;
  142. uint16_t packetId = 0;
  143. RyanMqttMsgHandler_t *msgHandler = NULL;
  144. RyanMqttAckHandler_t *ackHandler = NULL;
  145. RyanMqttAckHandler_t *ackHandlerPubrec = NULL;
  146. MQTTStatus_t status = MQTTSuccess;
  147. RyanMqttAssert(NULL != client);
  148. // 反序列化ack包
  149. status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  150. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  151. // 每次收到PUBREC都返回ack,确保服务器可以认为数据包被发送了
  152. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  153. MQTTFixedBuffer_t fixedBuffer = {
  154. .pBuffer = buffer,
  155. .size = MQTT_PUBLISH_ACK_PACKET_SIZE};
  156. // 序列化ack数据包
  157. status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREL, packetId);
  158. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  159. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  160. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec);
  161. if (RyanMqttSuccessError == result)
  162. {
  163. // 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
  164. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandler);
  165. if (RyanMqttSuccessError != result)
  166. {
  167. // 首次收到消息
  168. result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
  169. ackHandlerPubrec->msgHandler->topicLen,
  170. ackHandlerPubrec->msgHandler->qos, &msgHandler);
  171. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  172. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler, &ackHandler, RyanMqttFalse);
  173. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttMsgHandlerDestory(client->config.userData, msgHandler); });
  174. RyanMqttAckListAddToAckList(client, ackHandler);
  175. RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
  176. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  177. }
  178. // 出现pubrec和pubcomp同时存在的情况,清除pubrec。理论上不会出现(冗余措施)
  179. else
  180. {
  181. RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
  182. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  183. }
  184. }
  185. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  186. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  187. return result;
  188. }
  189. /**
  190. * @brief Handles incoming MQTT PUBLISH packets from the server.
  191. *
  192. * Deserializes the PUBLISH packet, matches the topic against the subscription list (with wildcard support), and dispatches the message to the user event handler if subscribed. Responds with the appropriate acknowledgment packet based on the QoS level:
  193. * - QoS 0: Delivers the message without acknowledgment.
  194. * - QoS 1: Delivers the message and sends a PUBACK.
  195. * - QoS 2: Implements QoS 2 method B, sending PUBREC and managing acknowledgment handlers.
  196. *
  197. * @param client Pointer to the MQTT client instance.
  198. * @param pIncomingPacket Pointer to the incoming MQTT packet information.
  199. * @return RyanMqttError_e Result code indicating success or the type of error encountered.
  200. */
  201. static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  202. {
  203. RyanMqttError_e result = RyanMqttSuccessError;
  204. uint16_t packetId = 0;
  205. MQTTStatus_t status = MQTTSuccess;
  206. RyanMqttMsgData_t msgData = {0};
  207. RyanMqttMsgHandler_t *msgHandler = NULL;
  208. RyanMqttAssert(NULL != client);
  209. {
  210. // 反系列化 publish 消息
  211. MQTTPublishInfo_t publishInfo = {0};
  212. status = MQTT_DeserializePublish(pIncomingPacket, &packetId, &publishInfo);
  213. RyanMqttCheck(MQTTSuccess == status, RyanMqttDeserializePacketError, rlog_d);
  214. msgData.topic = (char *)publishInfo.pTopicName;
  215. msgData.topicLen = publishInfo.topicNameLength;
  216. msgData.packetId = packetId;
  217. msgData.payload = (char *)publishInfo.pPayload;
  218. msgData.payloadLen = publishInfo.payloadLength;
  219. msgData.qos = (RyanMqttQos_e)publishInfo.qos;
  220. msgData.retained = publishInfo.retain;
  221. msgData.dup = publishInfo.dup;
  222. }
  223. // 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
  224. result = RyanMqttMsgHandlerFind(client, msgData.topic, msgData.topicLen, RyanMqttTrue, &msgHandler);
  225. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  226. switch (msgData.qos)
  227. {
  228. case RyanMqttQos0:
  229. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  230. break;
  231. case RyanMqttQos1:
  232. {
  233. // 先分发消息,再回答ack
  234. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  235. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  236. MQTTFixedBuffer_t fixedBuffer = {
  237. .pBuffer = buffer,
  238. .size = MQTT_PUBLISH_ACK_PACKET_SIZE};
  239. // 序列化ack数据包
  240. status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBACK, packetId);
  241. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  242. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  243. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  244. }
  245. break;
  246. case RyanMqttQos2: // qos2采用方法B
  247. {
  248. RyanMqttAckHandler_t *ackHandler = NULL;
  249. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  250. MQTTFixedBuffer_t fixedBuffer = {
  251. .pBuffer = buffer,
  252. .size = MQTT_PUBLISH_ACK_PACKET_SIZE};
  253. // !序列化ack数据包,必须先执行,因为创建ack需要用到这个报文
  254. status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREC, packetId);
  255. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  256. // 上面代码不太可能出错,出错后就让服务器重新发送吧
  257. // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish, 不进行qos2 PUBREC消息处理
  258. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler);
  259. if (RyanMqttSuccessError != result)
  260. {
  261. result = RyanMqttMsgHandlerCreate(client, msgData.topic, msgData.topicLen, msgData.qos, &msgHandler);
  262. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  263. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler, &ackHandler, RyanMqttFalse);
  264. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttMsgHandlerDestory(client->config.userData, msgHandler); });
  265. RyanMqttAckListAddToAckList(client, ackHandler);
  266. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  267. }
  268. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  269. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  270. }
  271. break;
  272. default:
  273. break;
  274. }
  275. return result;
  276. }
  277. /**
  278. * @brief Handles incoming SUBACK packets to process subscription acknowledgments.
  279. *
  280. * Processes a SUBACK packet by matching it to the corresponding acknowledgment handler. If the subscription is successful, creates and registers a message handler for the subscribed topic and triggers the subscription success event. If the subscription is rejected by the server, triggers the subscription failure event and cleans up the acknowledgment handler.
  281. *
  282. * @param client Pointer to the MQTT client instance.
  283. * @param pIncomingPacket Pointer to the incoming SUBACK packet information.
  284. * @return RyanMqttError_e Result of the SUBACK handling operation.
  285. */
  286. static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  287. {
  288. // todo 服务端可以授予比订阅者要求的低一些的 QoS 等级。
  289. RyanMqttError_e result = RyanMqttSuccessError;
  290. uint16_t packetId = 0;
  291. RyanMqttMsgHandler_t *msgHandler = NULL;
  292. RyanMqttAckHandler_t *ackHandler = NULL;
  293. MQTTStatus_t status = MQTTSuccess;
  294. RyanMqttAssert(NULL != client);
  295. // 反序列化ack包
  296. status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  297. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  298. // 需要判断服务器拒绝
  299. // RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  300. // ack链表不存在当前订阅确认节点就直接退出
  301. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_SUBACK, packetId, &ackHandler);
  302. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  303. // 订阅失败,服务器拒绝
  304. if (MQTTSuccess != status)
  305. {
  306. RyanMqttAckListRemoveToAckList(client, ackHandler);
  307. // mqtt事件回调
  308. RyanMqttEventMachine(client, RyanMqttEventSubscribedFaile, (void *)ackHandler->msgHandler);
  309. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  310. return RyanMqttSuccessError;
  311. }
  312. // 订阅成功
  313. // 查找是否有同名订阅,如果有就销毁之前的
  314. // result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, ackHandler->msgHandler->topicLen, RyanMqttFalse, &msgHandler);
  315. // if (RyanMqttSuccessError == result)
  316. // {
  317. // RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  318. // RyanMqttMsgHandlerDestory(client, msgHandler);
  319. // }
  320. rlog_w("mqtt11111111 topic: %s, qos: %d", ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  321. result = RyanMqttMsgHandlerCreate(client, ackHandler->msgHandler->topic,
  322. ackHandler->msgHandler->topicLen,
  323. ackHandler->msgHandler->qos, &msgHandler);
  324. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d); // 这里创建失败了不触发回调,等待ack超时触发失败回调函数
  325. rlog_w("mqtt222222222222222 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  326. RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)msgHandler); // mqtt回调函数
  327. RyanMqttMsgHandlerAddToMsgList(client, msgHandler); // 将msg信息添加到订阅链表上
  328. RyanMqttAckListRemoveToAckList(client, ackHandler);
  329. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  330. return result;
  331. }
  332. /**
  333. * @brief Handles incoming UNSUBACK packets to confirm unsubscription.
  334. *
  335. * Processes an UNSUBACK packet by locating the corresponding acknowledgment handler, removing the associated subscription handler if present, and triggering the unsubscription event callback. Cleans up acknowledgment and message handlers as needed.
  336. *
  337. * @param pIncomingPacket Pointer to the received UNSUBACK packet information.
  338. * @return RyanMqttError_e Result of the unsubscription acknowledgment handling.
  339. */
  340. static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  341. {
  342. RyanMqttError_e result = RyanMqttFailedError;
  343. RyanMqttMsgHandler_t *subMsgHandler = NULL;
  344. RyanMqttAckHandler_t *ackHandler = NULL;
  345. uint16_t packetId = 0;
  346. MQTTStatus_t status = MQTTSuccess;
  347. RyanMqttAssert(NULL != client);
  348. // 反序列化ack包
  349. status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  350. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d);
  351. // ack链表不存在当前取消订阅确认节点就直接退出
  352. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, &ackHandler);
  353. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  354. // 查找当前主题是否已经订阅,进行取消订阅
  355. result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, ackHandler->msgHandler->topicLen, RyanMqttFalse, &subMsgHandler);
  356. if (RyanMqttSuccessError == result)
  357. {
  358. RyanMqttMsgHandlerRemoveToMsgList(client, subMsgHandler);
  359. RyanMqttMsgHandlerDestory(client, subMsgHandler);
  360. }
  361. RyanMqttAckListRemoveToAckList(client, ackHandler);
  362. // mqtt事件回调
  363. RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
  364. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  365. return result;
  366. }
  367. /**
  368. * @brief Transfers acknowledgment handlers from the user acknowledgment list to the MQTT thread's acknowledgment list.
  369. *
  370. * Ensures thread safety by locking the user acknowledgment list during the transfer.
  371. */
  372. static void RyanMqttSyncUserAckHandle(RyanMqttClient_t *client)
  373. {
  374. RyanMqttAckHandler_t *userAckHandler = NULL;
  375. RyanList_t *curr = NULL,
  376. *next = NULL;
  377. platformMutexLock(client->config.userData, &client->userAckHandleLock);
  378. RyanListForEachSafe(curr, next, &client->userAckHandlerList)
  379. {
  380. // 获取此节点的结构体
  381. userAckHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  382. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  383. RyanMqttAckListAddToAckList(client, userAckHandler);
  384. }
  385. platformMutexUnLock(client->config.userData, &client->userAckHandleLock);
  386. }
  387. /**
  388. * @brief Reads and processes an incoming MQTT packet for the client.
  389. *
  390. * Retrieves the next MQTT packet from the network, synchronizes user acknowledgment handlers, and dispatches the packet to the appropriate handler based on its type (e.g., PUBLISH, PUBACK, SUBACK, etc.). Allocates and frees memory for packet payloads as needed. Returns a status code indicating the result of packet processing.
  391. *
  392. * @return RyanMqttError_e Result of the packet handling operation.
  393. */
  394. static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client)
  395. {
  396. RyanMqttError_e result = RyanMqttSuccessError;
  397. MQTTPacketInfo_t pIncomingPacket = {0};
  398. RyanMqttAssert(NULL != client);
  399. NetworkContext_t pNetworkContext = {.client = client};
  400. MQTTStatus_t status = MQTT_GetIncomingPacketTypeAndLength(coreMqttTransportRecv, &pNetworkContext, &pIncomingPacket);
  401. // 先同步用户接口的ack链表
  402. RyanMqttSyncUserAckHandle(client);
  403. if (MQTTSuccess == status)
  404. {
  405. // 申请断开连接数据包的空间
  406. if (pIncomingPacket.remainingLength > 0)
  407. {
  408. pIncomingPacket.pRemainingData = platformMemoryMalloc(pIncomingPacket.remainingLength);
  409. RyanMqttCheck(NULL != pIncomingPacket.pRemainingData, RyanMqttNoRescourceError, rlog_d);
  410. }
  411. }
  412. else if (MQTTNoDataAvailable == status)
  413. {
  414. return RyanMqttRecvPacketTimeOutError;
  415. }
  416. else
  417. {
  418. rlog_e("获取包长度失败");
  419. return RyanMqttFailedError;
  420. }
  421. // 3.读取mqtt载荷数据并放到读取缓冲区
  422. if (pIncomingPacket.remainingLength > 0)
  423. {
  424. result = RyanMqttRecvPacket(client, pIncomingPacket.pRemainingData, pIncomingPacket.remainingLength);
  425. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  426. }
  427. rlog_d("pIncomingPacket.type: %x ", pIncomingPacket.type & 0xF0U);
  428. // 控制报文类型
  429. // 发送者QoS2动作 发布PUBLISH报文 -> 等待PUBREC报文 -> 发送PUBREL报文 -> 等待PUBCOMP报文
  430. // 接收者QoS2动作 等待PUBLISH报文 -> 发送PUBREC报文 -> 等待PUBREL报文 -> 发送PUBCOMP报文
  431. switch (pIncomingPacket.type & 0xF0U)
  432. {
  433. case MQTT_PACKET_TYPE_PUBLISH: // 接收到订阅消息
  434. result = RyanMqttPublishPacketHandler(client, &pIncomingPacket);
  435. break;
  436. case MQTT_PACKET_TYPE_CONNACK: // 连接报文确认
  437. {
  438. MQTTStatus_t status = MQTTSuccess;
  439. uint16_t packetId;
  440. bool sessionPresent; // 会话位
  441. // 反序列化ack包
  442. status = MQTT_DeserializeAck(&pIncomingPacket, &packetId, &sessionPresent);
  443. if (MQTTSuccess != status)
  444. result = RyanMqttFailedError;
  445. }
  446. break;
  447. case MQTT_PACKET_TYPE_PUBACK: // 客户端发送QoS 1消息,服务端发布收到确认
  448. case MQTT_PACKET_TYPE_PUBCOMP: // 发送QOS2 发布完成
  449. result = RyanMqttPubackAndPubcompPacketHandler(client, &pIncomingPacket);
  450. break;
  451. case MQTT_PACKET_TYPE_PUBREC: // 客户端发送QOS2,服务端发布PUBREC,需要客户端继续发送PUBREL
  452. result = RyanMqttPubrecPacketHandler(client, &pIncomingPacket);
  453. break;
  454. case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放
  455. if (pIncomingPacket.type & 0x02U) // PUBREL 控制报文固定报头的第 3,2,1,0 位必须被设置为 0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
  456. result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
  457. break;
  458. case MQTT_PACKET_TYPE_SUBACK: // 订阅确认
  459. result = RyanMqttSubackHandler(client, &pIncomingPacket);
  460. break;
  461. case MQTT_PACKET_TYPE_UNSUBACK: // 取消订阅确认
  462. result = RyanMqttUnSubackHandler(client, &pIncomingPacket);
  463. break;
  464. case MQTT_PACKET_TYPE_PINGRESP: // 心跳响应
  465. RyanMqttRefreshKeepaliveTime(client);
  466. result = RyanMqttSuccessError;
  467. break;
  468. default:
  469. break;
  470. }
  471. if (pIncomingPacket.remainingLength > 0)
  472. platformMemoryFree(pIncomingPacket.pRemainingData);
  473. return result;
  474. }
  475. // 也可以考虑有ack链表的时候recvTime可以短一些,有坑点
  476. /**
  477. * @brief Scans the MQTT acknowledgment handler list to process timeouts and retransmissions.
  478. *
  479. * Iterates through the client's acknowledgment handler list to handle message retransmissions or failures based on packet type and timeout status. For QoS 1 and 2 publish acknowledgments (PUBACK, PUBREC, PUBREL, PUBCOMP), retransmits packets if the acknowledgment has timed out, triggers warning events if the retry count exceeds a threshold, and resets timers. For subscription acknowledgments (SUBACK, UNSUBACK), removes handlers and triggers failure events if timed out. Processing is limited by a maximum scan duration and is skipped if the client is disconnected or a throttle timer is active.
  480. *
  481. * @param client Pointer to the MQTT client instance.
  482. * @param WaitFlag If RyanMqttTrue, only processes handlers whose timers have expired (normal operation). If RyanMqttFalse, processes all handlers immediately (typically after reconnect).
  483. */
  484. static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFlag)
  485. {
  486. RyanList_t *curr = NULL,
  487. *next = NULL;
  488. RyanMqttAckHandler_t *ackHandler = NULL;
  489. platformTimer_t ackScanRemainTimer = {0};
  490. RyanMqttAssert(NULL != client);
  491. // mqtt没有连接就退出
  492. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  493. return;
  494. // 节流时间内不检查ack链表
  495. if (platformTimerRemain(&client->ackScanThrottleTimer))
  496. return;
  497. // 设置scan最大处理时间定时器
  498. platformTimerInit(&ackScanRemainTimer);
  499. platformTimerCutdown(&ackScanRemainTimer, client->config.recvTimeout - 100);
  500. platformMutexLock(client->config.userData, &client->ackHandleLock);
  501. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  502. {
  503. // 需要再判断一次
  504. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  505. continue;
  506. // 超过最大处理时间,直接跳出处理函数
  507. if (0 == platformTimerRemain(&ackScanRemainTimer))
  508. break;
  509. // 获取此节点的结构体
  510. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  511. // ack响应没有超时就不进行处理
  512. if (RyanMqttTrue == WaitFlag && 0 != platformTimerRemain(&ackHandler->timer))
  513. continue;
  514. switch (ackHandler->packetType)
  515. {
  516. // 发送qos1 / qos2消息, 服务器ack响应超时。需要重新发送它们。
  517. case MQTT_PACKET_TYPE_PUBACK: // qos1 publish后没有收到puback
  518. case MQTT_PACKET_TYPE_PUBREC: // qos2 publish后没有收到pubrec
  519. case MQTT_PACKET_TYPE_PUBREL: // qos2 收到pubrec,发送pubrel后没有收到pubcomp
  520. case MQTT_PACKET_TYPE_PUBCOMP: // 理论不会出现,冗余措施
  521. {
  522. // 避免 implicit-fallthrough 警告
  523. if (MQTT_PACKET_TYPE_PUBREC == ackHandler->packetType || MQTT_PACKET_TYPE_PUBACK == ackHandler->packetType)
  524. MQTT_UpdateDuplicatePublishFlag(ackHandler->packet, true); // 设置重发标志位
  525. // 重发次数超过警告值回调
  526. if (ackHandler->repeatCount >= client->config.ackHandlerRepeatCountWarning)
  527. {
  528. RyanMqttEventMachine(client, RyanMqttEventAckRepeatCountWarning, (void *)ackHandler);
  529. continue;
  530. }
  531. // 重发数据事件回调
  532. RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
  533. //? 发送失败也是重试,所以这里不进行错误判断
  534. RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
  535. // 重置ack超时时间
  536. platformTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
  537. ackHandler->repeatCount++;
  538. break;
  539. }
  540. // 订阅 / 取消订阅超时就认为失败
  541. case MQTT_PACKET_TYPE_SUBACK:
  542. case MQTT_PACKET_TYPE_UNSUBACK:
  543. {
  544. RyanMqttAckListRemoveToAckList(client, ackHandler);
  545. RyanMqttEventMachine(client, (MQTT_PACKET_TYPE_SUBACK == ackHandler->packetType) ? RyanMqttEventSubscribedFaile : RyanMqttEventUnSubscribedFaile,
  546. (void *)ackHandler->msgHandler);
  547. // 清除句柄
  548. RyanMqttAckHandlerDestroy(client, ackHandler);
  549. break;
  550. }
  551. default:
  552. {
  553. rlog_e("不应该出现的值: %d", ackHandler->packetType);
  554. RyanMqttAssert(NULL); // 不应该为别的值
  555. break;
  556. }
  557. }
  558. }
  559. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  560. // 扫描链表没有超时时,才设置scan节流定时器
  561. if (platformTimerRemain(&ackScanRemainTimer))
  562. platformTimerCutdown(&client->ackScanThrottleTimer, 1000); // 启动ack scan节流定时器
  563. }
  564. /**
  565. * @brief Establishes a connection to the MQTT server and completes the MQTT CONNECT handshake.
  566. *
  567. * Prepares and serializes the MQTT CONNECT packet using the client's configuration, including optional Last Will and Testament (LWT) information. Establishes a network connection to the server, sends the CONNECT packet, and waits for the CONNACK response to confirm the connection. Handles memory allocation and cleanup for the packet buffer.
  568. *
  569. * @return RyanMqttError_e Result code indicating success or the type of failure encountered during connection or handshake.
  570. */
  571. static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
  572. {
  573. RyanMqttError_e result = RyanMqttSuccessError;
  574. MQTTStatus_t status = MQTTSuccess;
  575. MQTTConnectInfo_t connectInfo = {0};
  576. MQTTPublishInfo_t willInfo = {0};
  577. MQTTFixedBuffer_t fixedBuffer = {0};
  578. size_t remainingLength = {0};
  579. RyanMqttAssert(NULL != client);
  580. RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
  581. // connect 信息
  582. connectInfo.pClientIdentifier = client->config.clientId;
  583. connectInfo.clientIdentifierLength = strlen(client->config.clientId);
  584. connectInfo.pUserName = client->config.userName;
  585. if (connectInfo.pUserName)
  586. connectInfo.userNameLength = strlen(client->config.userName);
  587. connectInfo.pPassword = client->config.password;
  588. if (connectInfo.pPassword)
  589. connectInfo.passwordLength = strlen(client->config.password);
  590. connectInfo.keepAliveSeconds = client->config.keepaliveTimeoutS;
  591. connectInfo.cleanSession = client->config.cleanSessionFlag;
  592. if (RyanMqttTrue == client->lwtFlag)
  593. {
  594. willInfo.qos = (MQTTQoS_t)client->lwtOptions.qos;
  595. willInfo.retain = client->lwtOptions.retain;
  596. willInfo.pPayload = client->lwtOptions.payload;
  597. willInfo.payloadLength = client->lwtOptions.payloadLen;
  598. willInfo.pTopicName = client->lwtOptions.topic;
  599. willInfo.topicNameLength = strlen(client->lwtOptions.topic);
  600. }
  601. // 获取数据包大小
  602. status = MQTT_GetConnectPacketSize(&connectInfo, RyanMqttTrue == client->lwtFlag ? &willInfo : NULL, &remainingLength, &fixedBuffer.size);
  603. RyanMqttAssert(MQTTSuccess == status);
  604. // 申请数据包的空间
  605. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  606. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d);
  607. // 序列化数据包
  608. status = MQTT_SerializeConnect(&connectInfo, RyanMqttTrue == client->lwtFlag ? &willInfo : NULL, remainingLength, &fixedBuffer);
  609. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  610. // 调用底层的连接函数连接上服务器
  611. result = platformNetworkConnect(client->config.userData, &client->network, client->config.host, client->config.port);
  612. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanSocketFailedError, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  613. // 发送序列化mqtt的CONNECT报文
  614. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  615. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  616. platformNetworkClose(client->config.userData, &client->network);
  617. platformMemoryFree(fixedBuffer.pBuffer);
  618. });
  619. // 等待报文
  620. // mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
  621. result = RyanMqttReadPacketHandler(client);
  622. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_d, {
  623. platformNetworkClose(client->config.userData, &client->network);
  624. platformMemoryFree(fixedBuffer.pBuffer);
  625. });
  626. platformMemoryFree(fixedBuffer.pBuffer);
  627. return RyanMqttSuccessError;
  628. }
  629. /**
  630. * @brief Handles MQTT client events and dispatches user event callbacks.
  631. *
  632. * Processes core MQTT client events such as connection, disconnection, and reconnection, updating client state and managing resources accordingly. If a user event handler is configured and the event is enabled, invokes the user callback with the provided event data.
  633. *
  634. * @param eventId The MQTT event identifier to process.
  635. * @param eventData Optional data associated with the event.
  636. */
  637. void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
  638. {
  639. RyanMqttAssert(NULL != client);
  640. switch (eventId)
  641. {
  642. case RyanMqttEventConnected: // 第一次连接成功
  643. RyanMqttRefreshKeepaliveTime(client);
  644. RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
  645. RyanMqttSetClientState(client, RyanMqttConnectState);
  646. break;
  647. case RyanMqttEventDisconnected: // 断开连接事件
  648. RyanMqttSetClientState(client, RyanMqttDisconnectState); // 先将客户端状态设置为断开连接,避免close网络资源时用户依然在使用
  649. platformNetworkClose(client->config.userData, &client->network);
  650. if (RyanMqttTrue == client->config.cleanSessionFlag)
  651. RyanMqttCleanSession(client);
  652. break;
  653. case RyanMqttEventReconnectBefore: // 重连前回调
  654. RyanMqttSetClientState(client, RyanMqttReconnectState);
  655. break;
  656. default:
  657. break;
  658. }
  659. if (client->config.mqttEventHandle == NULL)
  660. return;
  661. platformCriticalEnter(client->config.userData, &client->criticalLock);
  662. RyanMqttEventId_e eventFlag = client->eventFlag;
  663. platformCriticalExit(client->config.userData, &client->criticalLock);
  664. if (eventFlag & eventId)
  665. client->config.mqttEventHandle(client, eventId, eventData);
  666. }
  667. // todo 考虑将发送操作独立出去,异步发送
  668. /**
  669. * @brief Main MQTT client thread handling connection, packet processing, and state transitions.
  670. *
  671. * Runs the MQTT client state machine in an infinite loop, managing connection establishment, packet reading, keepalive, reconnection logic, and resource cleanup upon destruction. Handles both automatic and manual reconnection scenarios and ensures proper cleanup of all resources when the client is destroyed.
  672. *
  673. * @param argument Pointer to the initialized MQTT client structure.
  674. */
  675. void RyanMqttThread(void *argument)
  676. {
  677. int32_t result = 0;
  678. RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
  679. RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
  680. while (1)
  681. {
  682. // 销毁客户端
  683. if (RyanMqttTrue == client->destoryFlag)
  684. {
  685. RyanMqttEventMachine(client, RyanMqttEventDestoryBefore, (void *)NULL);
  686. // 关闭网络组件
  687. platformNetworkClose(client->config.userData, &client->network);
  688. // 销毁网络组件
  689. platformNetworkDestroy(client->config.userData, &client->network);
  690. // 清除config信息
  691. if (NULL != client->config.clientId)
  692. platformMemoryFree(client->config.clientId);
  693. if (NULL != client->config.userName)
  694. platformMemoryFree(client->config.userName);
  695. if (NULL != client->config.password)
  696. platformMemoryFree(client->config.password);
  697. if (NULL != client->config.host)
  698. platformMemoryFree(client->config.host);
  699. if (NULL != client->config.taskName)
  700. platformMemoryFree(client->config.taskName);
  701. // 清除遗嘱相关配置
  702. if (NULL != client->lwtOptions.payload)
  703. platformMemoryFree(client->lwtOptions.payload);
  704. if (NULL != client->lwtOptions.topic)
  705. platformMemoryFree(client->lwtOptions.topic);
  706. // 清除session ack链表和msg链表
  707. RyanMqttCleanSession(client);
  708. // 清除互斥锁
  709. platformMutexDestroy(client->config.userData, &client->sendBufLock);
  710. platformMutexDestroy(client->config.userData, &client->msgHandleLock);
  711. platformMutexDestroy(client->config.userData, &client->ackHandleLock);
  712. platformMutexDestroy(client->config.userData, &client->userAckHandleLock);
  713. // 清除临界区
  714. platformCriticalDestroy(client->config.userData, &client->criticalLock);
  715. // 清除掉线程动态资源
  716. platformThread_t mqttThread;
  717. memcpy(&mqttThread, &client->mqttThread, sizeof(platformThread_t));
  718. void *userData = client->config.userData;
  719. platformMemoryFree(client);
  720. client = NULL;
  721. // 销毁自身线程
  722. platformThreadDestroy(userData, &mqttThread);
  723. }
  724. // 客户端状态变更状态机
  725. switch (client->clientState)
  726. {
  727. case RyanMqttStartState: // 开始状态状态
  728. rlog_d("初始化状态,开始连接");
  729. result = RyanMqttConnect(client);
  730. RyanMqttEventMachine(client, RyanMqttSuccessError == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  731. (void *)&result);
  732. break;
  733. case RyanMqttConnectState: // 连接状态
  734. rlog_d("连接状态");
  735. result = RyanMqttReadPacketHandler(client);
  736. RyanMqttAckListScan(client, RyanMqttTrue);
  737. RyanMqttKeepalive(client);
  738. break;
  739. case RyanMqttDisconnectState: // 断开连接状态
  740. rlog_d("断开连接状态");
  741. if (RyanMqttTrue != client->config.autoReconnectFlag) // 没有使能自动连接就休眠线程
  742. {
  743. platformThreadStop(client->config.userData, &client->mqttThread);
  744. // 断连的时候会暂停线程,线程重新启动就是用户手动连接了
  745. rlog_d("手动重新连接\r\n");
  746. RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL);
  747. }
  748. else
  749. {
  750. rlog_d("触发自动连接,%dms后开始连接\r\n", client->config.reconnectTimeout);
  751. platformDelay(client->config.reconnectTimeout);
  752. RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL); // 给上层触发重新连接前事件
  753. }
  754. break;
  755. case RyanMqttReconnectState:
  756. result = RyanMqttConnect(client);
  757. RyanMqttEventMachine(client, RyanMqttSuccessError == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  758. (void *)&result);
  759. break;
  760. default:
  761. RyanMqttAssert(NULL);
  762. break;
  763. }
  764. }
  765. }