RyanMqttThread.c 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. // #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlError) // 日志打印等级
  4. #define rlogTag "RyanMqttThread" // 日志tag
  5. #include "RyanMqttThread.h"
  6. void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
  7. {
  8. // 服务器在心跳时间的1.5倍内没有收到keeplive消息则会断开连接
  9. // 这里算 1.4 b倍时间内没有收到心跳就断开连接
  10. platformCriticalEnter(client->config.userData, &client->criticalLock);
  11. platformTimerCutdown(&client->keepaliveTimer, 1000 * 1.4 * client->config.keepaliveTimeoutS); // 启动心跳定时器
  12. platformCriticalExit(client->config.userData, &client->criticalLock);
  13. }
  14. /**
  15. * @brief mqtt心跳保活
  16. *
  17. * @param client
  18. * @return int32_t
  19. */
  20. static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
  21. {
  22. RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
  23. RyanMqttError_e result = RyanMqttFailedError;
  24. int32_t packetLen = 0;
  25. uint32_t timeRemain = 0;
  26. RyanMqttAssert(NULL != client);
  27. // mqtt没有连接就退出
  28. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  29. return RyanMqttNotConnectError;
  30. timeRemain = platformTimerRemain(&client->keepaliveTimer);
  31. // 超过设置的 1.4 倍心跳周期
  32. if (0 == timeRemain)
  33. {
  34. connectState = RyanMqttKeepaliveTimeout;
  35. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  36. rlog_d("ErrorCode: %d, strError: %s", RyanMqttKeepaliveTimeout, RyanMqttStrError(RyanMqttKeepaliveTimeout));
  37. return RyanMqttFailedError;
  38. }
  39. // 剩余时间小于 recvtimeout 或者 当到达 0.9 倍时间时发送心跳包
  40. else if (timeRemain < client->config.recvTimeout ||
  41. timeRemain < 1000 * 0.5 * client->config.keepaliveTimeoutS)
  42. {
  43. // 节流时间内不发送心跳报文
  44. if (platformTimerRemain(&client->keepaliveThrottleTimer))
  45. return RyanMqttSuccessError;
  46. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  47. packetLen = MQTTSerialize_pingreq((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
  48. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  49. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  50. });
  51. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  52. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  53. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  54. });
  55. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  56. platformTimerCutdown(&client->keepaliveThrottleTimer, 1500); // 启动心跳检查节流定时器
  57. }
  58. return RyanMqttSuccessError;
  59. }
  60. /**
  61. * @brief
  62. *
  63. * @param client
  64. * @return int32_t
  65. */
  66. static RyanMqttError_e RyanMqttGetPayloadLen(RyanMqttClient_t *client, uint32_t *payloadLen)
  67. {
  68. RyanMqttError_e result = RyanMqttSuccessError;
  69. uint8_t len = 0;
  70. uint8_t encodedByte = 0;
  71. int32_t multiplier = 1;
  72. RyanMqttAssert(NULL != client);
  73. // RyanMqttAssert(NULL != payloadLen); payloadLen == 0时会误判
  74. do
  75. {
  76. RyanMqttCheck(++len < 4, RyanMqttFailedError, rlog_d);
  77. result = RyanMqttRecvPacket(client, (char *)&encodedByte, 1);
  78. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_d);
  79. *payloadLen += (encodedByte & 127) * multiplier; // 根据 MQTT 协议解码数据长度
  80. multiplier *= 128;
  81. } while ((encodedByte & 128) != 0);
  82. RyanMqttCheck(*payloadLen <= RyanMqttMaxPayloadLen, RyanMqttFailedError, rlog_d);
  83. return RyanMqttSuccessError;
  84. }
  85. /**
  86. * @brief qos1或者qos2接收消息成功
  87. *
  88. * @param client
  89. * @return RyanMqttError_e
  90. */
  91. static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *client)
  92. {
  93. RyanMqttError_e result = RyanMqttSuccessError;
  94. uint8_t dup = 0;
  95. uint8_t packetType = 0;
  96. uint16_t packetId = 0;
  97. RyanMqttAckHandler_t *ackHandler = NULL;
  98. RyanMqttAssert(NULL != client);
  99. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  100. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  101. // 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
  102. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
  103. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { rlog_i("packetType: %d, packetId: %d", packetType, packetId); });
  104. RyanMqttAckListRemoveToAckList(client, ackHandler);
  105. RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
  106. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  107. return result;
  108. }
  109. /**
  110. * @brief 发布释放处理函数
  111. *
  112. * @param client
  113. * @return RyanMqttError_e
  114. */
  115. static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
  116. {
  117. RyanMqttError_e result = RyanMqttFailedError;
  118. uint8_t dup = 0;
  119. uint8_t packetType = 0;
  120. uint16_t packetId = 0;
  121. int32_t packetLen = 0;
  122. RyanMqttAckHandler_t *ackHandler = NULL;
  123. RyanMqttAssert(NULL != client);
  124. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  125. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  126. // 删除pubrel记录
  127. result = RyanMqttAckListNodeFind(client, PUBREL, packetId, &ackHandler);
  128. if (RyanMqttSuccessError == result)
  129. {
  130. RyanMqttAckListRemoveToAckList(client, ackHandler);
  131. RyanMqttAckHandlerDestroy(client, ackHandler);
  132. }
  133. // 制作确认数据包并发送
  134. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  135. packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBCOMP, 0, packetId);
  136. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
  137. // 每次收到PUBREL都返回消息
  138. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  139. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  140. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  141. return RyanMqttSuccessError;
  142. }
  143. /**
  144. * @brief 发布收到处理函数
  145. *
  146. * @param client
  147. * @return RyanMqttError_e
  148. */
  149. static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
  150. {
  151. RyanMqttError_e result = RyanMqttFailedError;
  152. uint8_t dup = 0;
  153. uint8_t packetType = 0;
  154. uint16_t packetId = 0;
  155. int32_t packetLen = 0;
  156. RyanMqttMsgHandler_t *msgHandler = NULL;
  157. RyanMqttAckHandler_t *ackHandler = NULL;
  158. RyanMqttAckHandler_t *ackHandlerPubrec = NULL;
  159. RyanMqttAssert(NULL != client);
  160. result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  161. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  162. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  163. result = RyanMqttAckListNodeFind(client, PUBREC, packetId, &ackHandlerPubrec);
  164. if (RyanMqttSuccessError == result)
  165. {
  166. // 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
  167. result = RyanMqttAckListNodeFind(client, PUBCOMP, packetId, &ackHandler);
  168. if (RyanMqttSuccessError != result)
  169. {
  170. result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
  171. ackHandlerPubrec->msgHandler->topicLen,
  172. ackHandlerPubrec->msgHandler->qos, &msgHandler);
  173. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  174. result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
  175. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttMsgHandlerDestory(client->config.userData, msgHandler); });
  176. RyanMqttAckListAddToAckList(client, ackHandler);
  177. RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
  178. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  179. }
  180. // 出现pubrec和pubcomp同时存在的情况,清除pubrec。理论上不会出现(冗余措施)
  181. else
  182. {
  183. RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
  184. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  185. }
  186. }
  187. // 每次收到PUBREC都返回ack,制作确认数据包并发送
  188. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  189. // 序列化发布释放报文
  190. packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBREL, 0, packetId);
  191. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  192. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  193. });
  194. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  195. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  196. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  197. return result;
  198. }
  199. /**
  200. * @brief 收到服务器发布消息处理函数
  201. *
  202. * @param client
  203. * @return RyanMqttError_e
  204. */
  205. static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
  206. {
  207. RyanMqttError_e result = RyanMqttSuccessError;
  208. int32_t packetLen = 0;
  209. MQTTString topicName = MQTTString_initializer;
  210. RyanMqttMsgData_t msgData = {0};
  211. RyanMqttMsgHandler_t *msgHandler = NULL;
  212. RyanMqttAckHandler_t *ackHandler = NULL;
  213. RyanMqttAssert(NULL != client);
  214. result = MQTTDeserialize_publish(&msgData.dup, (int *)&msgData.qos, &msgData.retained, &msgData.packetId, &topicName,
  215. (uint8_t **)&msgData.payload, (int *)&msgData.payloadLen, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  216. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  217. msgData.topic = topicName.lenstring.data;
  218. msgData.topicLen = topicName.lenstring.len;
  219. // 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
  220. result = RyanMqttMsgHandlerFind(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttTrue, &msgHandler);
  221. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  222. switch (msgData.qos)
  223. {
  224. case RyanMqttQos0:
  225. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  226. break;
  227. case RyanMqttQos1:
  228. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  229. packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBACK, 0, msgData.packetId);
  230. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
  231. { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
  232. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  233. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  234. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  235. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  236. break;
  237. case RyanMqttQos2: // qos2采用方法B
  238. // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish, 不进行qos2 PUBREC消息处理
  239. result = RyanMqttAckListNodeFind(client, PUBREL, msgData.packetId, &ackHandler);
  240. if (RyanMqttSuccessError != result)
  241. {
  242. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, msgData.qos, &msgHandler);
  243. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  244. result = RyanMqttAckHandlerCreate(client, PUBREL, msgData.packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
  245. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttMsgHandlerDestory(client->config.userData, msgHandler); });
  246. RyanMqttAckListAddToAckList(client, ackHandler);
  247. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  248. }
  249. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  250. packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBREC, 0, msgData.packetId);
  251. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  252. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  253. });
  254. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  255. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  256. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  257. break;
  258. default:
  259. break;
  260. }
  261. return result;
  262. }
  263. /**
  264. * @brief 订阅确认处理函数
  265. *
  266. * @param client
  267. * @return RyanMqttError_e
  268. */
  269. static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
  270. {
  271. RyanMqttError_e result = RyanMqttSuccessError;
  272. int32_t count = 0;
  273. int32_t grantedQoS = 0;
  274. uint16_t packetId = 0;
  275. RyanMqttMsgHandler_t *msgHandler = NULL;
  276. RyanMqttAckHandler_t *ackHandler = NULL;
  277. RyanMqttAssert(NULL != client);
  278. result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  279. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  280. // ack链表不存在当前订阅确认节点就直接退出
  281. result = RyanMqttAckListNodeFind(client, SUBACK, packetId, &ackHandler);
  282. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  283. // 订阅失败
  284. if (RyanMqttSubFail == grantedQoS)
  285. {
  286. RyanMqttAckListRemoveToAckList(client, ackHandler);
  287. // mqtt事件回调
  288. RyanMqttEventMachine(client, RyanMqttEventSubscribedFaile, (void *)ackHandler->msgHandler);
  289. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  290. return RyanMqttSuccessError;
  291. }
  292. // 订阅成功
  293. // 查找是否有同名订阅,如果有就销毁之前的
  294. result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, ackHandler->msgHandler->topicLen, RyanMqttFalse, &msgHandler);
  295. if (RyanMqttSuccessError == result)
  296. {
  297. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  298. RyanMqttMsgHandlerDestory(client, msgHandler);
  299. }
  300. // 服务端可以授予比订阅者要求的低一些的 QoS 等级。
  301. result = RyanMqttMsgHandlerCreate(client, ackHandler->msgHandler->topic,
  302. ackHandler->msgHandler->topicLen,
  303. grantedQoS, &msgHandler);
  304. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d); // 这里创建失败了不触发回调,等待ack超时触发失败回调函数
  305. RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)msgHandler); // mqtt回调函数
  306. RyanMqttMsgHandlerAddToMsgList(client, msgHandler); // 将msg信息添加到订阅链表上
  307. RyanMqttAckListRemoveToAckList(client, ackHandler);
  308. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  309. return result;
  310. }
  311. /**
  312. * @brief 取消订阅确认处理函数
  313. *
  314. * @param client
  315. * @return RyanMqttError_e
  316. */
  317. static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client)
  318. {
  319. RyanMqttError_e result = RyanMqttFailedError;
  320. RyanMqttMsgHandler_t *subMsgHandler = NULL;
  321. RyanMqttAckHandler_t *ackHandler = NULL;
  322. uint16_t packetId = 0;
  323. RyanMqttAssert(NULL != client);
  324. result = MQTTDeserialize_unsuback(&packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  325. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  326. // ack链表不存在当前取消订阅确认节点就直接退出
  327. result = RyanMqttAckListNodeFind(client, UNSUBACK, packetId, &ackHandler);
  328. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  329. // 查找当前主题是否已经订阅,进行取消订阅
  330. result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, ackHandler->msgHandler->topicLen, RyanMqttFalse, &subMsgHandler);
  331. if (RyanMqttSuccessError == result)
  332. {
  333. RyanMqttMsgHandlerRemoveToMsgList(client, subMsgHandler);
  334. RyanMqttMsgHandlerDestory(client, subMsgHandler);
  335. }
  336. RyanMqttAckListRemoveToAckList(client, ackHandler);
  337. // mqtt事件回调
  338. RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
  339. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  340. return result;
  341. }
  342. /**
  343. * @brief mqtt数据包处理函数
  344. *
  345. * @param client
  346. * @param packetType
  347. * @return RyanMqttError_e
  348. */
  349. static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8_t *packetType)
  350. {
  351. RyanMqttError_e result = RyanMqttSuccessError;
  352. uint32_t fixedHeaderLen = 1;
  353. uint32_t payloadLen = 0;
  354. MQTTHeader header = {0};
  355. RyanList_t *curr = NULL,
  356. *next = NULL;
  357. RyanMqttAckHandler_t *userAckHandler = NULL;
  358. RyanMqttAssert(NULL != client);
  359. // RyanMqttAssert(NULL != packetType); packetType == 0时会误判
  360. // 1.读取标头字节。 其中包含数据包类型
  361. result = RyanMqttRecvPacket(client, client->config.recvBuffer, fixedHeaderLen);
  362. // 同步用户接口的ack链表
  363. platformMutexLock(client->config.userData, &client->userAckHandleLock);
  364. RyanListForEachSafe(curr, next, &client->userAckHandlerList)
  365. {
  366. // 获取此节点的结构体
  367. userAckHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  368. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  369. RyanMqttAckListAddToAckList(client, userAckHandler);
  370. }
  371. platformMutexUnLock(client->config.userData, &client->userAckHandleLock);
  372. if (RyanMqttRecvPacketTimeOutError == result)
  373. return RyanMqttRecvPacketTimeOutError;
  374. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  375. // 填充联合体标头信息
  376. header.byte = client->config.recvBuffer[0];
  377. rlog_d("packetType: %d", header.bits.type);
  378. RyanMqttCheck(CONNECT <= header.bits.type && DISCONNECT >= header.bits.type, result, rlog_d);
  379. // 2.读取mqtt报文剩余长度。 这本身是可变的
  380. result = RyanMqttGetPayloadLen(client, &payloadLen);
  381. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  382. // 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
  383. fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config.recvBuffer + fixedHeaderLen, payloadLen);
  384. //? 上面不判断是因为recv空间不可能小到 fixedHeaderLen 都无法写入
  385. // 判断recvBufferSize是否可以存的下数据
  386. RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config.recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
  387. {
  388. while (payloadLen > 0)
  389. {
  390. if (payloadLen <= client->config.recvBufferSize)
  391. {
  392. RyanMqttRecvPacket(client, client->config.recvBuffer, payloadLen);
  393. payloadLen = 0;
  394. }
  395. else
  396. {
  397. RyanMqttRecvPacket(client, client->config.recvBuffer, client->config.recvBufferSize);
  398. payloadLen -= client->config.recvBufferSize;
  399. }
  400. }
  401. });
  402. // 3.读取mqtt载荷数据并放到读取缓冲区
  403. if (payloadLen > 0)
  404. {
  405. result = RyanMqttRecvPacket(client, client->config.recvBuffer + fixedHeaderLen, payloadLen);
  406. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  407. }
  408. // uint16_t packLen = fixedHeaderLen + payloadLen;
  409. // 控制报文类型
  410. // 发送者QoS2动作 发布PUBLISH报文 -> 等待PUBREC报文 -> 发送PUBREL报文 -> 等待PUBCOMP报文
  411. // 接收者QoS2动作 等待PUBLISH报文 -> 发送PUBREC报文 -> 等待PUBREL报文 -> 发送PUBCOMP报文
  412. switch (header.bits.type)
  413. {
  414. case PUBLISH: // 接收到订阅消息
  415. result = RyanMqttPublishPacketHandler(client);
  416. break;
  417. case CONNACK: // 连接报文确认
  418. break;
  419. case PUBACK: // QoS 1消息发布收到确认
  420. case PUBCOMP: // 发布完成
  421. result = RyanMqttPubackAndPubcompPacketHandler(client);
  422. break;
  423. case PUBREC: // 发布收到
  424. result = RyanMqttPubrecPacketHandler(client);
  425. break;
  426. case PUBREL: // 发布释放
  427. result = RyanMqttPubrelPacketHandler(client);
  428. break;
  429. case SUBACK: // 订阅确认
  430. result = RyanMqttSubackHandler(client);
  431. break;
  432. case UNSUBACK: // 取消订阅确认
  433. result = RyanMqttUnSubackHandler(client);
  434. break;
  435. case PINGRESP: // 心跳响应
  436. RyanMqttRefreshKeepaliveTime(client);
  437. result = RyanMqttSuccessError;
  438. break;
  439. default:
  440. break;
  441. }
  442. if (packetType)
  443. *packetType = header.bits.type;
  444. return result;
  445. }
  446. /**
  447. * @brief 遍历ack链表,进行相应的处理
  448. *
  449. * @param client
  450. * @param WaitFlag
  451. * WaitFlag : RyanMqttFalse 表示不需要等待超时立即处理这些数据包。通常在重新连接后立即进行处理
  452. * WaitFlag : RyanMqttTrue 表示需要等待超时再处理这些消息,一般是稳定连接下的超时处理
  453. */
  454. static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFlag)
  455. {
  456. RyanList_t *curr = NULL,
  457. *next = NULL;
  458. RyanMqttAckHandler_t *ackHandler = NULL;
  459. RyanMqttAssert(NULL != client);
  460. // mqtt没有连接就退出
  461. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  462. return;
  463. // 节流时间内不检查ack链表
  464. if (platformTimerRemain(&client->ackScanThrottleTimer))
  465. return;
  466. platformMutexLock(client->config.userData, &client->ackHandleLock);
  467. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  468. {
  469. // 获取此节点的结构体
  470. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  471. // ack响应没有超时就不进行处理
  472. if (RyanMqttTrue == WaitFlag && 0 != platformTimerRemain(&ackHandler->timer))
  473. continue;
  474. switch (ackHandler->packetType)
  475. {
  476. // 发送qos1 / qos2消息, 服务器ack响应超时。需要重新发送它们。
  477. case PUBACK: // qos1 publish后没有收到puback
  478. case PUBREC: // qos2 publish后没有收到pubrec
  479. {
  480. RyanMqttSetPublishDup(&ackHandler->packet[0], 1); // 设置重发标志位
  481. }
  482. case PUBREL: // qos2 收到pubrec,发送pubrel后没有收到pubcomp
  483. case PUBCOMP: // 理论不会出现,冗余措施
  484. {
  485. if (RyanMqttConnectState != RyanMqttGetClientState(client))
  486. continue;
  487. // 重发次数超过警告值回调
  488. if (ackHandler->repeatCount >= client->config.ackHandlerRepeatCountWarning)
  489. {
  490. RyanMqttEventMachine(client, RyanMqttEventAckRepeatCountWarning, (void *)ackHandler);
  491. continue;
  492. }
  493. // 重发数据事件回调
  494. RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
  495. //? 发送失败也是重试,所以这里不进行错误判断
  496. RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
  497. // 重置ack超时时间
  498. platformTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
  499. ackHandler->repeatCount++;
  500. break;
  501. }
  502. // 订阅 / 取消订阅超时就认为失败
  503. case SUBACK:
  504. case UNSUBACK:
  505. {
  506. RyanMqttAckListRemoveToAckList(client, ackHandler);
  507. RyanMqttEventMachine(client, (SUBACK == ackHandler->packetType) ? RyanMqttEventSubscribedFaile : RyanMqttEventUnSubscribedFaile,
  508. (void *)ackHandler->msgHandler);
  509. // 清除句柄
  510. RyanMqttAckHandlerDestroy(client, ackHandler);
  511. break;
  512. }
  513. default:
  514. {
  515. rlog_e("不应该出现的值: %d", ackHandler->packetType);
  516. RyanMqttAssert(NULL); // 不应该为别的值
  517. break;
  518. }
  519. }
  520. }
  521. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  522. platformTimerCutdown(&client->ackScanThrottleTimer, 1000); // 启动ack scan节流定时器
  523. }
  524. /**
  525. * @brief mqtt连接函数
  526. *
  527. * @param client
  528. * @return RyanMqttError_e
  529. */
  530. static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
  531. {
  532. RyanMqttError_e result = RyanMqttSuccessError;
  533. uint8_t sessionPresent = 0; // 会话存在标志
  534. uint8_t packetType = 0; // 接收到的报文类型
  535. int32_t packetLen = 0;
  536. int32_t connackRc = 0;
  537. MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
  538. RyanMqttAssert(NULL != client);
  539. RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
  540. // 连接标志位
  541. connectData.clientID.cstring = client->config.clientId;
  542. connectData.username.cstring = client->config.userName;
  543. connectData.password.cstring = client->config.password;
  544. connectData.keepAliveInterval = client->config.keepaliveTimeoutS;
  545. connectData.cleansession = client->config.cleanSessionFlag;
  546. connectData.MQTTVersion = client->config.mqttVersion;
  547. if (RyanMqttTrue == client->lwtFlag)
  548. {
  549. connectData.willFlag = 1;
  550. connectData.will.qos = client->lwtOptions.qos;
  551. connectData.will.retained = client->lwtOptions.retain;
  552. connectData.will.message.lenstring.data = client->lwtOptions.payload;
  553. connectData.will.message.lenstring.len = client->lwtOptions.payloadLen;
  554. connectData.will.topicName.cstring = client->lwtOptions.topic;
  555. }
  556. // 调用底层的连接函数连接上服务器
  557. result = platformNetworkConnect(client->config.userData, &client->network, client->config.host, client->config.port);
  558. RyanMqttCheck(RyanMqttSuccessError == result, RyanSocketFailedError, rlog_d);
  559. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  560. // 序列化mqtt的CONNECT报文
  561. packetLen = MQTTSerialize_connect((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, &connectData);
  562. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
  563. // 发送序列化mqtt的CONNECT报文
  564. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  565. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  566. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  567. // 等待报文
  568. // mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
  569. result = RyanMqttReadPacketHandler(client, &packetType);
  570. RyanMqttCheck(CONNACK == packetType, RyanMqttFailedError, rlog_d);
  571. // 解析CONNACK报文
  572. result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
  573. RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
  574. rlog_i("result: %d, packetLen: %d, packetType: %d connackRc: %d", result, packetLen, packetType, connackRc);
  575. return connackRc;
  576. }
  577. /**
  578. * @brief mqtt事件处理函数
  579. *
  580. * @param client
  581. * @param eventId
  582. * @param eventData
  583. */
  584. void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
  585. {
  586. RyanMqttAssert(NULL != client);
  587. switch (eventId)
  588. {
  589. case RyanMqttEventConnected: // 第一次连接成功
  590. RyanMqttRefreshKeepaliveTime(client);
  591. RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
  592. RyanMqttSetClientState(client, RyanMqttConnectState);
  593. break;
  594. case RyanMqttEventDisconnected: // 断开连接事件
  595. RyanMqttSetClientState(client, RyanMqttDisconnectState); // 先将客户端状态设置为断开连接,避免close网络资源时用户依然在使用
  596. platformNetworkClose(client->config.userData, &client->network);
  597. if (RyanMqttTrue == client->config.cleanSessionFlag)
  598. RyanMqttCleanSession(client);
  599. break;
  600. case RyanMqttEventReconnectBefore: // 重连前回调
  601. RyanMqttSetClientState(client, RyanMqttReconnectState);
  602. break;
  603. default:
  604. break;
  605. }
  606. if (client->config.mqttEventHandle == NULL)
  607. return;
  608. if (client->eventFlag & eventId)
  609. client->config.mqttEventHandle(client, eventId, eventData);
  610. }
  611. /**
  612. * @brief mqtt运行线程
  613. *
  614. * @param argument
  615. */
  616. void RyanMqttThread(void *argument)
  617. {
  618. int32_t result = 0;
  619. RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
  620. RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
  621. while (1)
  622. {
  623. // 销毁客户端
  624. if (RyanMqttTrue == client->destoryFlag)
  625. {
  626. RyanMqttEventMachine(client, RyanMqttEventDestoryBefore, (void *)NULL);
  627. // 清除网络组件
  628. platformNetworkClose(client->config.userData, &client->network);
  629. // 清除config信息
  630. if (NULL != client->config.clientId)
  631. platformMemoryFree(client->config.clientId);
  632. if (NULL != client->config.userName)
  633. platformMemoryFree(client->config.userName);
  634. if (NULL != client->config.password)
  635. platformMemoryFree(client->config.password);
  636. if (NULL != client->config.host)
  637. platformMemoryFree(client->config.host);
  638. if (NULL != client->config.taskName)
  639. platformMemoryFree(client->config.taskName);
  640. // 清除遗嘱相关配置
  641. if (NULL != client->lwtOptions.payload)
  642. platformMemoryFree(client->lwtOptions.payload);
  643. if (NULL != client->lwtOptions.topic)
  644. platformMemoryFree(client->lwtOptions.topic);
  645. // 清除session ack链表和msg链表
  646. RyanMqttCleanSession(client);
  647. // 清除互斥锁
  648. platformMutexDestroy(client->config.userData, &client->sendBufLock);
  649. platformMutexDestroy(client->config.userData, &client->msgHandleLock);
  650. platformMutexDestroy(client->config.userData, &client->ackHandleLock);
  651. platformMutexDestroy(client->config.userData, &client->userAckHandleLock);
  652. // 清除临界区
  653. platformCriticalDestroy(client->config.userData, &client->criticalLock);
  654. // 清除掉线程动态资源
  655. platformThread_t mqttThread = {0};
  656. memcpy(&mqttThread, &client->mqttThread, sizeof(platformThread_t));
  657. void *userData = client->config.userData;
  658. platformMemoryFree(client);
  659. client = NULL;
  660. // 销毁自身线程
  661. platformThreadDestroy(userData, &mqttThread);
  662. }
  663. // 客户端状态变更状态机
  664. switch (client->clientState)
  665. {
  666. case RyanMqttStartState: // 开始状态状态
  667. rlog_d("初始化状态,开始连接");
  668. result = RyanMqttConnect(client);
  669. RyanMqttEventMachine(client, RyanMqttConnectAccepted == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  670. (void *)&result);
  671. break;
  672. case RyanMqttConnectState: // 连接状态
  673. rlog_d("连接状态");
  674. result = RyanMqttReadPacketHandler(client, NULL);
  675. RyanMqttAckListScan(client, RyanMqttTrue);
  676. RyanMqttKeepalive(client);
  677. break;
  678. case RyanMqttDisconnectState: // 断开连接状态
  679. rlog_d("断开连接状态");
  680. if (RyanMqttTrue != client->config.autoReconnectFlag) // 没有使能自动连接就休眠线程
  681. platformThreadStop(client->config.userData, &client->mqttThread);
  682. rlog_d("触发自动连接,%dms后开始连接\r\n", client->config.reconnectTimeout);
  683. platformDelay(client->config.reconnectTimeout);
  684. RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL); // 给上层触发重新连接前事件
  685. break;
  686. case RyanMqttReconnectState:
  687. result = RyanMqttConnect(client);
  688. RyanMqttEventMachine(client, RyanMqttConnectAccepted == result ? RyanMqttEventConnected : RyanMqttEventDisconnected,
  689. (void *)&result);
  690. break;
  691. default:
  692. RyanMqttAssert(NULL);
  693. break;
  694. }
  695. }
  696. }