RyanMqttThread.c 32 KB


  1. #define rlogEnable 1 // 是否使能日志
  2. #define rlogColorEnable 1 // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlWarning) // 日志打印等级
  4. #define rlogTag "RyanMqttThread" // 日志tag
  5. #include "RyanMqttLog.h"
  6. #include "MQTTPacket.h"
  7. #include "RyanMqttClient.h"
  8. #include "RyanMqttUtile.h"
  9. #include "RyanMqttThread.h"
  10. // void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData);
  11. /**
  12. * @brief mqtt心跳保活
  13. *
  14. * @param client
  15. * @return int32_t
  16. */
  17. static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
  18. {
  19. int32_t connectState = RyanMqttConnectAccepted;
  20. int32_t packetLen = 0;
  21. RyanMqttAssert(NULL != client);
  22. // 如果没有连接则不需要心跳保活
  23. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  24. // 服务器在心跳时间的1.5倍内没有收到消息则会断开连接
  25. // 在心跳的一半发送keepalive
  26. if (platformTimerRemain(&client->keepaliveTimer) != 0)
  27. return RyanMqttSuccessError;
  28. // 心跳超时,断开连接
  29. connectState = RyanMqttKeepaliveTimeout;
  30. RyanMqttCheckCode(2 > client->keepaliveTimeoutCount, RyanMqttKeepaliveTimeout, rlog_d,
  31. { RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState); });
  32. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  33. packetLen = MQTTSerialize_pingreq((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize);
  34. if (packetLen > 0)
  35. RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  36. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  37. client->keepaliveTimeoutCount++;
  38. platformTimerCutdown(&client->keepaliveTimer, client->config->keepaliveTimeoutS * 1000 / 2); // 启动心跳定时器
  39. return RyanMqttSuccessError;
  40. }
  41. /**
  42. * @brief
  43. *
  44. * @param client
  45. * @return int32_t
  46. */
  47. static RyanMqttError_e RyanMqttGetPayloadLen(RyanMqttClient_t *client, uint32_t *payloadLen)
  48. {
  49. RyanMqttError_e result = RyanMqttSuccessError;
  50. uint8_t len = 0;
  51. uint8_t encodedByte = 0;
  52. int32_t multiplier = 1;
  53. RyanMqttAssert(NULL != client);
  54. // RyanMqttAssert(NULL != payloadLen); payloadLen == 0时会误判
  55. do
  56. {
  57. RyanMqttCheck(++len < 4, RyanMqttFailedError, rlog_d);
  58. result = RyanMqttRecvPacket(client, (char *)&encodedByte, 1);
  59. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_d);
  60. *payloadLen += (encodedByte & 127) * multiplier; // 根据 MQTT 协议解码数据长度
  61. multiplier *= 128;
  62. } while ((encodedByte & 128) != 0);
  63. RyanMqttCheck(*payloadLen <= RyanMqttMaxPayloadLen, RyanMqttFailedError, rlog_d);
  64. return RyanMqttSuccessError;
  65. }
  66. /**
  67. * @brief qos1或者qos2接收消息成功
  68. *
  69. * @param client
  70. * @return RyanMqttError_e
  71. */
  72. static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *client)
  73. {
  74. RyanMqttError_e result = RyanMqttSuccessError;
  75. uint8_t dup = 0;
  76. uint8_t packetType = 0;
  77. uint16_t packetId = 0;
  78. RyanMqttAckHandler_t *ackHandler = NULL;
  79. RyanMqttAssert(NULL != client);
  80. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  81. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  82. // 可能会多次收到puback / pubcomp,仅在首次收到时触发发布成功回调函数
  83. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
  84. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { rlog_i("packetType: %d, packetId: %d", packetType, packetId); });
  85. RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
  86. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  87. return result;
  88. }
  89. /**
  90. * @brief 发布释放处理函数
  91. *
  92. * @param client
  93. * @return RyanMqttError_e
  94. */
  95. static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
  96. {
  97. RyanMqttError_e result = RyanMqttFailedError;
  98. uint8_t dup = 0;
  99. uint8_t packetType = 0;
  100. uint16_t packetId = 0;
  101. int32_t packetLen = 0;
  102. RyanMqttAckHandler_t *ackHandler = NULL;
  103. RyanMqttAssert(NULL != client);
  104. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  105. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  106. // 制作确认数据包并发送
  107. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  108. packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBCOMP, 0, packetId);
  109. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  110. // 每次收到PUBREL都返回消息
  111. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  112. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  113. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  114. // 删除pubrel记录
  115. result = RyanMqttAckListNodeFind(client, PUBREL, packetId, &ackHandler);
  116. if (RyanMqttSuccessError == result)
  117. RyanMqttAckHandlerDestroy(client, ackHandler);
  118. return RyanMqttSuccessError;
  119. }
  120. /**
  121. * @brief 发布收到处理函数
  122. *
  123. * @param client
  124. * @return RyanMqttError_e
  125. */
  126. static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
  127. {
  128. RyanMqttError_e result = RyanMqttFailedError;
  129. uint8_t dup = 0;
  130. RyanMqttBool_e fastFlag = RyanMqttFalse;
  131. uint8_t packetType = 0;
  132. uint16_t packetId = 0;
  133. int32_t packetLen = 0;
  134. RyanMqttMsgHandler_t *msgHandler = NULL;
  135. RyanMqttAckHandler_t *ackHandler = NULL;
  136. RyanMqttAckHandler_t *ackHandlerPubrec = NULL;
  137. RyanMqttAssert(NULL != client);
  138. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  139. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  140. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  141. result = RyanMqttAckListNodeFind(client, PUBREC, packetId, &ackHandlerPubrec);
  142. if (RyanMqttSuccessError == result)
  143. {
  144. // 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
  145. result = RyanMqttAckListNodeFind(client, PUBCOMP, packetId, &ackHandler);
  146. if (RyanMqttSuccessError != result)
  147. fastFlag = RyanMqttTrue;
  148. // 出现pubrec和pubcomp同时存在的情况,清除pubrec理论上不会出现
  149. else
  150. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  151. }
  152. // 制作确认数据包并发送
  153. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  154. // 序列化发布释放报文
  155. packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREL, 0, packetId);
  156. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  157. // 每次收到PUBREC都返回ack
  158. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  159. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  160. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  161. if (RyanMqttTrue == fastFlag)
  162. {
  163. result = RyanMqttMsgHandlerCreate(ackHandlerPubrec->msgHandler->topic,
  164. strlen(ackHandlerPubrec->msgHandler->topic),
  165. ackHandlerPubrec->msgHandler->qos, &msgHandler);
  166. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
  167. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  168. // 创建一个 ACK 处理程序节点
  169. result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, msgHandler, &ackHandler);
  170. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
  171. { platformMemoryFree(msgHandler);
  172. platformMutexUnLock(client->config->userData, client->sendBufLock); });
  173. }
  174. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  175. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  176. if (RyanMqttTrue == fastFlag)
  177. {
  178. result = RyanMqttAckListAdd(client, ackHandler);
  179. // 保证等待PUBCOMP记录成功后再清除PUBREC记录
  180. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  181. }
  182. return result;
  183. }
  184. /**
  185. * @brief 收到服务器发布消息处理函数
  186. *
  187. * @param client
  188. * @return RyanMqttError_e
  189. */
  190. static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
  191. {
  192. RyanMqttError_e result = RyanMqttSuccessError;
  193. int32_t packetLen = 0;
  194. RyanMqttBool_e deliverMsgFlag = RyanMqttFalse;
  195. MQTTString topicName = MQTTString_initializer;
  196. RyanMqttMsgData_t msgData = {0};
  197. RyanMqttMsgHandler_t *msgHandler = NULL;
  198. RyanMqttAckHandler_t *ackHandler = NULL;
  199. RyanMqttAssert(NULL != client);
  200. result = MQTTDeserialize_publish(&msgData.dup, (int *)&msgData.qos, &msgData.retained, &msgData.packetId, &topicName,
  201. (uint8_t **)&msgData.payload, (int *)&msgData.payloadLen, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  202. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  203. // 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
  204. result = RyanMqttMsgHandlerFind(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttTrue, &msgHandler);
  205. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  206. switch (msgData.qos)
  207. {
  208. case RyanMqttQos0:
  209. deliverMsgFlag = RyanMqttTrue;
  210. break;
  211. case RyanMqttQos1:
  212. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  213. packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBACK, 0, msgData.packetId);
  214. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
  215. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  216. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  217. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
  218. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  219. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  220. deliverMsgFlag = RyanMqttTrue;
  221. break;
  222. case RyanMqttQos2:
  223. {
  224. RyanMqttBool_e fastFlag = RyanMqttFalse;
  225. // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish不进行qos2消息处理
  226. result = RyanMqttAckListNodeFind(client, PUBREL, msgData.packetId, &ackHandler);
  227. if (RyanMqttSuccessError != result)
  228. fastFlag = RyanMqttTrue;
  229. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  230. packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREC, 0, msgData.packetId);
  231. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
  232. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  233. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  234. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
  235. { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  236. if (RyanMqttTrue == fastFlag)
  237. {
  238. result = RyanMqttMsgHandlerCreate(topicName.lenstring.data, topicName.lenstring.len, msgData.qos, &msgHandler);
  239. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  240. platformMutexUnLock(client->config->userData, client->sendBufLock);
  241. });
  242. result = RyanMqttAckHandlerCreate(client, PUBREL, msgData.packetId, packetLen, msgHandler, &ackHandler);
  243. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  244. platformMemoryFree(msgHandler);
  245. platformMutexUnLock(client->config->userData, client->sendBufLock);
  246. });
  247. }
  248. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  249. if (RyanMqttTrue == fastFlag)
  250. {
  251. result = RyanMqttAckListAdd(client, ackHandler);
  252. deliverMsgFlag = RyanMqttTrue;
  253. }
  254. }
  255. break;
  256. default:
  257. break;
  258. }
  259. if (RyanMqttTrue == deliverMsgFlag)
  260. {
  261. // 复制主题名字
  262. result = RyanMqttStringCopy(&msgData.topic, topicName.lenstring.data, topicName.lenstring.len);
  263. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  264. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  265. platformMemoryFree(msgData.topic);
  266. }
  267. return result;
  268. }
  269. /**
  270. * @brief 订阅确认处理函数
  271. *
  272. * @param client
  273. * @return RyanMqttError_e
  274. */
  275. static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
  276. {
  277. RyanMqttError_e result = RyanMqttSuccessError;
  278. int32_t count = 0;
  279. int32_t grantedQoS = 0;
  280. uint16_t packetId = 0;
  281. RyanMqttMsgHandler_t *msgHandler = NULL;
  282. RyanMqttAckHandler_t *ackHandler = NULL;
  283. RyanMqttAssert(NULL != client);
  284. result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  285. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  286. // ack链表不存在当前订阅确认节点就直接退出
  287. result = RyanMqttAckListNodeFind(client, SUBACK, packetId, &ackHandler);
  288. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  289. // 订阅失败
  290. if (RyanMqttSubFail == grantedQoS)
  291. {
  292. // mqtt事件回调
  293. RyanMqttEventMachine(client, RyanMqttEventSubscribedFaile, (void *)ackHandler->msgHandler);
  294. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  295. return RyanMqttSuccessError;
  296. }
  297. // 订阅成功
  298. // 查找是否有同名订阅,如果有就销毁之前的
  299. result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, strlen(ackHandler->msgHandler->topic), RyanMqttFalse, &msgHandler);
  300. if (RyanMqttSuccessError == result)
  301. RyanMqttMsgHandlerDestory(msgHandler);
  302. // 服务端可以授予比订阅者要求的低一些的 QoS 等级。
  303. result = RyanMqttMsgHandlerCreate(ackHandler->msgHandler->topic,
  304. strlen(ackHandler->msgHandler->topic),
  305. grantedQoS, &msgHandler);
  306. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d); // 这里创建失败了不触发回调,等待ack超时触发失败回调函数
  307. RyanMqttMsgHandlerAdd(client, msgHandler); // 将msg信息添加到订阅链表上
  308. RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)msgHandler); // mqtt回调函数
  309. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  310. return result;
  311. }
  312. /**
  313. * @brief 取消订阅确认处理函数
  314. *
  315. * @param client
  316. * @return RyanMqttError_e
  317. */
  318. static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client)
  319. {
  320. RyanMqttError_e result = RyanMqttFailedError;
  321. RyanMqttAckHandler_t *ackHandler = NULL;
  322. uint16_t packetId = 0;
  323. RyanMqttAssert(NULL != client);
  324. result = MQTTDeserialize_unsuback(&packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  325. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  326. // ack链表不存在当前取消订阅确认节点就直接退出
  327. result = RyanMqttAckListNodeFind(client, UNSUBACK, packetId, &ackHandler);
  328. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  329. // mqtt事件回调
  330. RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
  331. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  332. return result;
  333. }
  334. /**
  335. * @brief mqtt数据包处理函数
  336. *
  337. * @param client
  338. * @param packetType
  339. * @return RyanMqttError_e
  340. */
  341. static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8_t *packetType)
  342. {
  343. RyanMqttError_e result = RyanMqttSuccessError;
  344. int32_t fixedHeaderLen = 1;
  345. uint32_t payloadLen = 0;
  346. MQTTHeader header = {0};
  347. RyanMqttAssert(NULL != client);
  348. // RyanMqttAssert(NULL != packetType); packetType == 0时会误判
  349. // 1.读取标头字节。 其中包含数据包类型
  350. result = RyanMqttRecvPacket(client, client->config->recvBuffer, fixedHeaderLen);
  351. if (RyanMqttRecvPacketTimeOutError == result)
  352. return RyanMqttRecvPacketTimeOutError;
  353. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  354. // 填充联合体标头信息
  355. header.byte = client->config->recvBuffer[0];
  356. rlog_d("packetType: %d", header.bits.type);
  357. RyanMqttCheck(CONNECT <= header.bits.type && DISCONNECT >= header.bits.type, result, rlog_d);
  358. // 2.读取mqtt报文剩余长度。 这本身是可变的
  359. result = RyanMqttGetPayloadLen(client, &payloadLen);
  360. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  361. // 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
  362. fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config->recvBuffer + fixedHeaderLen, payloadLen);
  363. RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config->recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
  364. { RyanMqttRecvPacket(client, client->config->recvBuffer, payloadLen); });
  365. // 3.读取mqtt载荷数据并放到读取缓冲区
  366. if (payloadLen > 0)
  367. {
  368. result = RyanMqttRecvPacket(client, client->config->recvBuffer + fixedHeaderLen, payloadLen);
  369. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  370. }
  371. // 控制报文类型
  372. switch (header.bits.type)
  373. {
  374. case CONNACK: // 连接报文确认
  375. break;
  376. case PUBACK: // QoS 1消息发布收到确认
  377. case PUBCOMP: // 发布完成(qos2 第三步)
  378. result = RyanMqttPubackAndPubcompPacketHandler(client);
  379. break;
  380. case PUBREC: // 发布收到(qos2 第一步)
  381. result = RyanMqttPubrecPacketHandler(client);
  382. break;
  383. case PUBREL: // 发布释放(qos2 第二步)
  384. result = RyanMqttPubrelPacketHandler(client);
  385. break;
  386. case SUBACK: // 订阅确认
  387. result = RyanMqttSubackHandler(client);
  388. break;
  389. case UNSUBACK: // 取消订阅确认
  390. result = RyanMqttUnSubackHandler(client);
  391. break;
  392. case PUBLISH: // 接收到订阅消息
  393. result = RyanMqttPublishPacketHandler(client);
  394. break;
  395. case PINGRESP: // 心跳响应
  396. client->keepaliveTimeoutCount = 0;
  397. break;
  398. default:
  399. break;
  400. }
  401. if (packetType)
  402. *packetType = header.bits.type;
  403. return result;
  404. }
  405. /**
  406. * @brief 遍历ack链表,进行相应的处理
  407. *
  408. * @param client
  409. * @param WaitFlag
  410. * WaitFlag : RyanMqttFalse 表示不需要等待超时立即处理这些数据包。通常在重新连接后立即进行处理
  411. * WaitFlag : RyanMqttTrue 表示需要等待超时再处理这些消息,一般是稳定连接下的超时处理
  412. */
  413. static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFlag)
  414. {
  415. RyanList_t *curr = NULL,
  416. *next = NULL;
  417. RyanMqttAckHandler_t *ackHandler = NULL;
  418. RyanMqttAssert(NULL != client);
  419. // 如果链表为空或者mqtt没有连接就退出
  420. if ((RyanListIsEmpty(&client->ackHandlerList)) || (RyanMqttConnectState != RyanMqttGetClientState(client)))
  421. return;
  422. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  423. {
  424. // 获取此节点的结构体
  425. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  426. // ack响应没有超时就不进行处理
  427. if (0 != platformTimerRemain(&ackHandler->timer) && RyanMqttTrue == WaitFlag)
  428. continue;
  429. switch (ackHandler->packetType)
  430. {
  431. // 发送qos1 / qos2消息, 服务器ack响应超时。需要重新发送它们。
  432. case PUBACK:
  433. case PUBREC:
  434. case PUBREL:
  435. case PUBCOMP:
  436. {
  437. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  438. continue;
  439. // 重发数据事件回调
  440. RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
  441. RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
  442. // 重置ack超时时间
  443. platformTimerCutdown(&ackHandler->timer, client->config->ackTimeout);
  444. ackHandler->repeatCount++;
  445. // 重发次数超过警告值回调
  446. if (ackHandler->repeatCount >= client->config->ackHandlerRepeatCountWarning)
  447. RyanMqttEventMachine(client, RyanMqttEventAckRepeatCountWarning, (void *)ackHandler);
  448. break;
  449. }
  450. // 订阅 / 取消订阅超时就认为失败
  451. case SUBACK:
  452. case UNSUBACK:
  453. {
  454. RyanMqttEventMachine(client, (SUBACK == ackHandler->packetType) ? RyanMqttEventSubscribedFaile : RyanMqttEventUnSubscribedFaile,
  455. (void *)ackHandler->msgHandler);
  456. // 清除句柄
  457. RyanMqttAckHandlerDestroy(client, ackHandler);
  458. break;
  459. }
  460. default:
  461. {
  462. rlog_e("不应该出现的值: %d", ackHandler->packetType);
  463. RyanMqttAssert(NULL); // 不应该为别的值
  464. }
  465. break;
  466. }
  467. }
  468. }
  469. /**
  470. * @brief mqtt连接函数
  471. *
  472. * @param client
  473. * @return RyanMqttError_e
  474. */
  475. static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
  476. {
  477. RyanMqttError_e result = RyanMqttSuccessError;
  478. uint8_t sessionPresent = 0; // 会话存在标志
  479. uint8_t packetType = 0; // 接收到的报文类型
  480. int32_t packetLen = 0;
  481. int32_t connackRc = 0;
  482. MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
  483. RyanMqttAssert(NULL != client);
  484. RyanMqttAssert(NULL != client->network);
  485. RyanMqttAssert(NULL != client->config);
  486. RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectAccepted, rlog_d);
  487. // 连接标志位
  488. connectData.clientID.cstring = client->config->clientId;
  489. connectData.username.cstring = client->config->userName;
  490. connectData.password.cstring = client->config->password;
  491. connectData.keepAliveInterval = client->config->keepaliveTimeoutS;
  492. connectData.cleansession = client->config->cleanSessionFlag;
  493. connectData.MQTTVersion = client->config->mqttVersion;
  494. if (RyanMqttTrue == client->lwtFlag)
  495. {
  496. connectData.willFlag = 1;
  497. connectData.will.qos = client->lwtOptions->qos;
  498. connectData.will.retained = client->lwtOptions->retain;
  499. connectData.will.message.lenstring.data = client->lwtOptions->payload;
  500. connectData.will.message.lenstring.len = client->lwtOptions->payloadLen;
  501. connectData.will.topicName.cstring = client->lwtOptions->topic;
  502. }
  503. // 调用底层的连接函数连接上服务器
  504. result = platformNetworkConnect(client->config->userData, client->network, client->config->host, client->config->port);
  505. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttConnectNetWorkFail, rlog_d);
  506. platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
  507. // 序列化mqtt的CONNECT报文
  508. packetLen = MQTTSerialize_connect((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, &connectData);
  509. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  510. // 发送序列化mqtt的CONNECT报文
  511. result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
  512. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  513. // 等待报文
  514. // mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
  515. result = RyanMqttReadPacketHandler(client, &packetType);
  516. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  517. RyanMqttCheckCode(CONNACK == packetType, RyanMqttConnectDisconnected, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  518. // 解析CONNACK报文
  519. result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
  520. RyanMqttCheckCode(1 == result, RyanMqttDeserializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
  521. platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
  522. rlog_i("result: %d, packetLen: %d, packetType: %d connackRc: %d", result, packetLen, packetType, connackRc);
  523. return connackRc;
  524. }
  525. /**
  526. * @brief mqtt事件处理函数
  527. *
  528. * @param client
  529. * @param eventId
  530. * @param eventData
  531. */
  532. void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
  533. {
  534. RyanMqttAssert(NULL != client);
  535. RyanMqttAssert(NULL != client->network);
  536. RyanMqttAssert(NULL != client->config);
  537. switch (eventId)
  538. {
  539. case RyanMqttEventConnected: // 连接成功
  540. client->keepaliveTimeoutCount = 0; // 重置心跳超时计数器
  541. platformTimerCutdown(&client->keepaliveTimer, (client->config->keepaliveTimeoutS * 1000 / 2)); // 启动心跳定时器
  542. RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
  543. RyanMqttSetClientState(client, RyanMqttConnectState);
  544. break;
  545. case RyanMqttEventDisconnected: // 断开连接事件
  546. platformNetworkClose(client->config->userData, client->network);
  547. if (RyanMqttTrue == client->config->cleanSessionFlag)
  548. RyanMqttCleanSession(client);
  549. RyanMqttSetClientState(client, RyanMqttDisconnectState); // 将客户端状态设置为断开连接
  550. break;
  551. case RyanMqttEventReconnectBefore: // 重连前回调
  552. RyanMqttSetClientState(client, RyanMqttReconnectState);
  553. break;
  554. default:
  555. break;
  556. }
  557. if (client->config->mqttEventHandle == NULL)
  558. return;
  559. if (client->eventFlag & eventId)
  560. client->config->mqttEventHandle(client, eventId, eventData);
  561. }
  562. /**
  563. * @brief mqtt运行线程
  564. *
  565. * @param argument
  566. */
  567. void RyanMqttThread(void *argument)
  568. {
  569. int32_t result = 0;
  570. RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
  571. RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
  572. RyanMqttAssert(NULL != client->network); // RyanMqttStart前没有调用RyanMqttInit
  573. RyanMqttAssert(NULL != client->config); // RyanMqttStart前没有调用RyanMqttSetConfig
  574. while (1)
  575. {
  576. // 销毁客户端
  577. if (RyanMqttTrue == client->destoryFlag)
  578. {
  579. RyanMqttEventMachine(client, RyanMqttEventDestoryBefore, (void *)NULL);
  580. // 清除网络组件
  581. if (NULL != client->network)
  582. {
  583. platformNetworkClose(client->config->userData, client->network);
  584. platformMemoryFree(client->network);
  585. client->network = NULL;
  586. }
  587. // 清除互斥锁
  588. if (NULL != client->sendBufLock)
  589. {
  590. platformMutexDestroy(client->config->userData, client->sendBufLock);
  591. platformMemoryFree(client->sendBufLock);
  592. client->sendBufLock = NULL;
  593. }
  594. // 清除config信息
  595. if (NULL != client->config)
  596. {
  597. if (NULL != client->config->clientId)
  598. platformMemoryFree(client->config->clientId);
  599. if (NULL != client->config->host)
  600. platformMemoryFree(client->config->host);
  601. if (NULL != client->config->port)
  602. platformMemoryFree(client->config->port);
  603. if (NULL != client->config->userName)
  604. platformMemoryFree(client->config->userName);
  605. if (NULL != client->config->password)
  606. platformMemoryFree(client->config->password);
  607. if (NULL != client->config->taskName)
  608. platformMemoryFree(client->config->taskName);
  609. if (NULL != client->config)
  610. platformMemoryFree(client->config);
  611. }
  612. // 清除遗嘱相关配置
  613. if (RyanMqttTrue == client->lwtFlag && NULL != client->lwtOptions)
  614. {
  615. if (NULL != client->lwtOptions->topic)
  616. platformMemoryFree(client->lwtOptions->topic);
  617. platformMemoryFree(client->lwtOptions);
  618. }
  619. // 清除session ack链表和msg链表
  620. RyanMqttCleanSession(client);
  621. // 清除掉线程动态资源
  622. platformMemoryFree(client->mqttThread);
  623. client->mqttThread = NULL;
  624. platformMemoryFree(client);
  625. client = NULL;
  626. // 销毁自身线程
  627. platformThreadDestroy(client->config->userData, client->mqttThread);
  628. }
  629. // 客户端状态变更状态机
  630. switch (client->clientState)
  631. {
  632. case RyanMqttStartState: // 开始状态状态
  633. rlog_d("初始化状态,开始连接");
  634. result = RyanMqttConnect(client);
  635. RyanMqttEventMachine(client, RyanMqttConnectAccepted == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  636. (void *)&result);
  637. break;
  638. case RyanMqttConnectState: // 连接状态
  639. rlog_d("连接状态");
  640. result = RyanMqttReadPacketHandler(client, NULL);
  641. RyanMqttAckListScan(client, RyanMqttTrue);
  642. RyanMqttKeepalive(client);
  643. break;
  644. case RyanMqttDisconnectState: // 断开连接状态
  645. rlog_d("断开连接状态");
  646. if (RyanMqttTrue != client->config->autoReconnectFlag) // 没有使能自动连接就休眠线程
  647. platformThreadStop(client->config->userData, client->mqttThread);
  648. rlog_d("触发自动连接,%dms后开始连接\r\n", client->config->reconnectTimeout);
  649. platformDelay(client->config->reconnectTimeout);
  650. RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL); // 给上层触发重新连接前事件
  651. break;
  652. case RyanMqttReconnectState:
  653. result = RyanMqttConnect(client);
  654. RyanMqttEventMachine(client, RyanMqttConnectAccepted == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  655. (void *)&result);
  656. break;
  657. default:
  658. RyanMqttAssert(NULL);
  659. break;
  660. }
  661. }
  662. }