RyanMqttThreadProcessPacket.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  3. #include "RyanMqttThread.h"
  4. #include "RyanMqttLog.h"
  5. #include "RyanMqttUtil.h"
  6. /**
  7. * @brief qos1或者qos2接收消息成功确认处理
  8. *
  9. * @param client
  10. * @return RyanMqttError_e
  11. */
  12. static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *client,
  13. MQTTPacketInfo_t *pIncomingPacket)
  14. {
  15. RyanMqttError_e result = RyanMqttSuccessError;
  16. uint16_t packetId;
  17. RyanMqttAckHandler_t *ackHandler;
  18. RyanMqttAssert(NULL != client);
  19. // 反序列化ack包
  20. MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  21. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  22. // 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
  23. result = RyanMqttAckListNodeFind(client, pIncomingPacket->type & 0xF0U, packetId, &ackHandler);
  24. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  25. RyanMqttLog_i("packetType: %02x, packetId: %d", pIncomingPacket->type & 0xF0U, packetId);
  26. });
  27. RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
  28. RyanMqttAckListRemoveToAckList(client, ackHandler);
  29. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  30. return result;
  31. }
  32. /**
  33. * @brief 发布释放处理函数
  34. *
  35. * @param client
  36. * @return RyanMqttError_e
  37. */
  38. static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  39. {
  40. RyanMqttError_e result = RyanMqttSuccessError;
  41. uint16_t packetId;
  42. RyanMqttAckHandler_t *ackHandler;
  43. RyanMqttAssert(NULL != client);
  44. // 反序列化ack包
  45. MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  46. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  47. // 删除pubrel记录
  48. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, packetId, &ackHandler);
  49. if (RyanMqttSuccessError == result)
  50. {
  51. RyanMqttAckListRemoveToAckList(client, ackHandler);
  52. RyanMqttAckHandlerDestroy(client, ackHandler);
  53. }
  54. // 制作确认数据包并发送
  55. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  56. MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
  57. // 序列化ack数据包
  58. status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBCOMP, packetId);
  59. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  60. // 每次收到PUBREL都返回消息
  61. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  62. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  63. return RyanMqttSuccessError;
  64. }
  65. /**
  66. * @brief 发布收到处理函数
  67. *
  68. * @param client
  69. * @return RyanMqttError_e
  70. */
  71. static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  72. {
  73. RyanMqttError_e result = RyanMqttSuccessError;
  74. uint16_t packetId;
  75. RyanMqttAckHandler_t *ackHandlerPubrec;
  76. RyanMqttAssert(NULL != client);
  77. // 反序列化ack包
  78. MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  79. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  80. // 每次收到PUBREC都返回ack,确保服务器可以认为数据包被发送了
  81. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  82. MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
  83. // 序列化ack数据包
  84. status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREL, packetId);
  85. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  86. // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
  87. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec);
  88. if (RyanMqttSuccessError == result)
  89. {
  90. RyanMqttMsgHandler_t *msgHandler;
  91. RyanMqttAckHandler_t *ackHandler;
  92. // 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
  93. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandler);
  94. if (RyanMqttSuccessError != result)
  95. {
  96. // 首次收到消息
  97. result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
  98. ackHandlerPubrec->msgHandler->topicLen,
  99. RyanMqttMsgInvalidPacketId, ackHandlerPubrec->msgHandler->qos,
  100. ackHandlerPubrec->msgHandler->userData, &msgHandler);
  101. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  102. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId,
  103. MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
  104. &ackHandler, RyanMqttFalse);
  105. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  106. { RyanMqttMsgHandlerDestroy(client, msgHandler); });
  107. RyanMqttAckListAddToAckList(client, ackHandler);
  108. RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
  109. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  110. }
  111. // 出现pubrec和pubcomp同时存在的情况,清除pubrec。理论上不会出现(冗余措施)
  112. else
  113. {
  114. RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
  115. RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
  116. }
  117. }
  118. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  119. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  120. return result;
  121. }
  122. /**
  123. * @brief 收到服务器发布消息处理函数
  124. *
  125. * @param client
  126. * @return RyanMqttError_e
  127. */
  128. static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  129. {
  130. RyanMqttError_e result = RyanMqttSuccessError;
  131. uint16_t packetId;
  132. RyanMqttMsgData_t msgData;
  133. RyanMqttMsgHandler_t *msgHandler;
  134. RyanMqttAssert(NULL != client);
  135. {
  136. // 反系列化 publish 消息
  137. MQTTPublishInfo_t publishInfo;
  138. MQTTStatus_t status = MQTT_DeserializePublish(pIncomingPacket, &packetId, &publishInfo);
  139. RyanMqttCheck(MQTTSuccess == status, RyanMqttDeserializePacketError, RyanMqttLog_d);
  140. msgData.topic = (char *)publishInfo.pTopicName;
  141. msgData.topicLen = publishInfo.topicNameLength;
  142. msgData.packetId = packetId;
  143. msgData.payload = (char *)publishInfo.pPayload;
  144. msgData.payloadLen = publishInfo.payloadLength;
  145. msgData.qos = (RyanMqttQos_e)publishInfo.qos;
  146. msgData.retained = publishInfo.retain;
  147. msgData.dup = publishInfo.dup;
  148. // 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
  149. RyanMqttMsgHandler_t tempMsgHandler = {.topic = msgData.topic, .topicLen = msgData.topicLen};
  150. result = RyanMqttMsgHandlerFind(client, &tempMsgHandler, RyanMqttTrue, &msgHandler);
  151. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  152. { RyanMqttLog_w("主题不匹配: %.*s", msgData.topicLen, msgData.topic); });
  153. }
  154. switch (msgData.qos)
  155. {
  156. case RyanMqttQos0: RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData); break;
  157. case RyanMqttQos1: {
  158. // 先分发消息,再回答ack
  159. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  160. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  161. MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
  162. // 序列化ack数据包
  163. MQTTStatus_t status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBACK, packetId);
  164. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  165. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  166. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  167. }
  168. break;
  169. case RyanMqttQos2: // qos2采用方法B
  170. {
  171. RyanMqttAckHandler_t *ackHandler;
  172. uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
  173. MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
  174. // !序列化ack数据包,必须先执行,因为创建ack需要用到这个报文
  175. MQTTStatus_t status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREC, packetId);
  176. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  177. // 上面代码不太可能出错,出错后就让服务器重新发送吧
  178. // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish,
  179. // 不进行qos2 PUBREC消息处理
  180. result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler);
  181. if (RyanMqttSuccessError != result)
  182. {
  183. result = RyanMqttMsgHandlerCreate(client, msgData.topic, msgData.topicLen,
  184. RyanMqttMsgInvalidPacketId, msgData.qos, NULL, &msgHandler);
  185. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  186. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId,
  187. MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
  188. &ackHandler, RyanMqttFalse);
  189. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  190. { RyanMqttMsgHandlerDestroy(client, msgHandler); });
  191. RyanMqttAckListAddToAckList(client, ackHandler);
  192. RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
  193. }
  194. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
  195. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  196. }
  197. break;
  198. default: RyanMqttLog_w("Unhandled QoS level: %d", msgData.qos); break;
  199. }
  200. return result;
  201. }
  202. /**
  203. * @brief 订阅确认处理函数
  204. *
  205. * @param client
  206. * @return RyanMqttError_e
  207. */
  208. static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  209. {
  210. uint16_t packetId;
  211. RyanMqttMsgHandler_t *msgHandler;
  212. RyanMqttAckHandler_t *ackHandler;
  213. RyanMqttList_t *curr, *next;
  214. RyanMqttAssert(NULL != client);
  215. // 反序列化ack包,MQTTSuccess和MQTTServerRefused都是成功的
  216. MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  217. RyanMqttCheck(MQTTSuccess == status || MQTTServerRefused == status, RyanMqttDeserializePacketError,
  218. RyanMqttLog_d);
  219. // 检查ack的msgCount和返回消息的msgCount是否一致
  220. {
  221. // MQTT_DeserializeAck会保证 pIncomingPacket->remainingLength >= 3
  222. uint32_t statusCount = pIncomingPacket->remainingLength - sizeof(uint16_t);
  223. uint32_t ackMsgCount = 0;
  224. // ?使用ack或msg遍历都行,使用msg更容易测试出问题,遍历性能也会更好一些
  225. platformMutexLock(client->config.userData, &client->msgHandleLock);
  226. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  227. {
  228. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  229. if (packetId == msgHandler->packetId)
  230. {
  231. ackMsgCount++;
  232. }
  233. }
  234. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  235. // 服务区回复的ack数和记录的ack数不一致就清除所有ack
  236. RyanMqttCheckCode(ackMsgCount == statusCount, RyanMqttNoRescourceError, RyanMqttLog_d, {
  237. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
  238. platformMutexLock(client->config.userData, &client->msgHandleLock);
  239. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  240. {
  241. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  242. if (packetId == msgHandler->packetId)
  243. {
  244. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  245. RyanMqttMsgHandlerDestroy(client, msgHandler);
  246. }
  247. }
  248. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  249. });
  250. }
  251. RyanMqttQos_e subscriptionQos;
  252. uint32_t ackMsgIndex = 0;
  253. const uint8_t *pStatusStart = &pIncomingPacket->pRemainingData[sizeof(uint16_t)];
  254. // todo 这里效率非常低,订阅属于用的少的功能,暂时可以接受
  255. // 查找ack句柄
  256. platformMutexLock(client->config.userData, &client->ackHandleLock);
  257. RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
  258. {
  259. ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  260. if (packetId != ackHandler->packetId || MQTT_PACKET_TYPE_SUBACK != ackHandler->packetType)
  261. {
  262. continue;
  263. }
  264. // 查找同名订阅并删除,保证订阅主题列表只有一个最新的
  265. RyanMqttMsgHandlerFindAndDestroyByPackId(client, ackHandler->msgHandler, RyanMqttTrue);
  266. // 到这里就可以保证没有同名订阅了
  267. // 查找之前记录的topic句柄,根据服务器授权Qos进行更新
  268. RyanMqttError_e result =
  269. RyanMqttMsgHandlerFind(client, ackHandler->msgHandler, RyanMqttFalse, &msgHandler);
  270. // 几乎不可能,可以查找到 ackHandler 就一定有 msgHandler
  271. // 没有的话可以打印一条信息吧
  272. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { continue; });
  273. // 解析服务端授权 QoS(0,1,2)或失败(0x80)
  274. subscriptionQos = pStatusStart[ackMsgIndex];
  275. ackMsgIndex++;
  276. switch (subscriptionQos)
  277. {
  278. case RyanMqttQos0:
  279. case RyanMqttQos1:
  280. case RyanMqttQos2:
  281. // 到这里说明订阅成功,更新 QoS 并清除临时 packetId
  282. platformMutexLock(client->config.userData, &client->msgHandleLock);
  283. msgHandler->qos = subscriptionQos;
  284. msgHandler->packetId = RyanMqttMsgInvalidPacketId;
  285. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  286. RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)msgHandler); // mqtt回调函数
  287. break;
  288. case RyanMqttSubFail:
  289. default:
  290. // 订阅失败,服务器拒绝;删除并通知失败
  291. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  292. RyanMqttMsgHandlerDestroy(client, msgHandler);
  293. // mqtt事件回调
  294. RyanMqttEventMachine(client, RyanMqttEventSubscribedFailed, (void *)ackHandler->msgHandler);
  295. break;
  296. }
  297. RyanMqttAckListRemoveToAckList(client, ackHandler);
  298. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  299. }
  300. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  301. return RyanMqttSuccessError;
  302. }
  303. /**
  304. * @brief 取消订阅确认处理函数
  305. *
  306. * @param client
  307. * @return RyanMqttError_e
  308. */
  309. static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  310. {
  311. RyanMqttError_e result = RyanMqttSuccessError;
  312. RyanMqttMsgHandler_t *subMsgHandler;
  313. RyanMqttAckHandler_t *ackHandler;
  314. RyanMqttList_t *curr, *next;
  315. uint16_t packetId;
  316. RyanMqttAssert(NULL != client);
  317. // 反序列化ack包
  318. MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
  319. RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
  320. // todo 这里效率非常低,订阅属于用的少的功能,暂时可以接受
  321. platformMutexLock(client->config.userData, &client->ackHandleLock);
  322. RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
  323. {
  324. ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  325. if ((packetId != ackHandler->packetId) || (MQTT_PACKET_TYPE_UNSUBACK != ackHandler->packetType))
  326. {
  327. continue;
  328. }
  329. // 查找当前主题是否已经订阅,进行取消订阅
  330. result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler, RyanMqttFalse, &subMsgHandler);
  331. if (RyanMqttSuccessError == result)
  332. {
  333. ackHandler->msgHandler->qos = subMsgHandler->qos;
  334. RyanMqttMsgHandlerRemoveToMsgList(client, subMsgHandler);
  335. RyanMqttMsgHandlerDestroy(client, subMsgHandler);
  336. }
  337. // mqtt事件回调
  338. RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
  339. RyanMqttAckListRemoveToAckList(client, ackHandler);
  340. RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
  341. }
  342. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  343. return RyanMqttSuccessError;
  344. }
  345. /**
  346. * @brief 将用户空间的ack链表搬到mqtt线程空间
  347. *
  348. * @param client
  349. */
  350. static void RyanMqttSyncUserAckHandle(RyanMqttClient_t *client)
  351. {
  352. RyanMqttAckHandler_t *userAckHandler;
  353. RyanMqttList_t *curr, *next;
  354. platformMutexLock(client->config.userData, &client->userSessionLock);
  355. RyanMqttListForEachSafe(curr, next, &client->userAckHandlerList)
  356. {
  357. // 获取此节点的结构体
  358. userAckHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  359. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  360. RyanMqttAckListAddToAckList(client, userAckHandler);
  361. }
  362. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  363. }
  364. RyanMqttError_e RyanMqttGetPacketInfo(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
  365. {
  366. RyanMqttError_e result = RyanMqttSuccessError;
  367. RyanMqttAssert(NULL != client);
  368. NetworkContext_t pNetworkContext = {.client = client};
  369. // todo 可以考虑增加包大小限制,目前不准备加
  370. MQTTStatus_t status =
  371. MQTT_GetIncomingPacketTypeAndLength(coreMqttTransportRecv, &pNetworkContext, pIncomingPacket);
  372. // 先同步用户接口的ack链表
  373. RyanMqttSyncUserAckHandle(client);
  374. if (MQTTSuccess == status)
  375. {
  376. // 申请断开连接数据包的空间
  377. if (pIncomingPacket->remainingLength > 0)
  378. {
  379. pIncomingPacket->pRemainingData = platformMemoryMalloc(pIncomingPacket->remainingLength);
  380. RyanMqttCheck(NULL != pIncomingPacket->pRemainingData, RyanMqttNoRescourceError, RyanMqttLog_d);
  381. }
  382. }
  383. else if (MQTTNoDataAvailable == status)
  384. {
  385. return RyanMqttRecvPacketTimeOutError;
  386. }
  387. else
  388. {
  389. RyanMqttLog_e("获取包长度失败");
  390. return RyanMqttFailedError;
  391. }
  392. // 3.读取mqtt载荷数据并放到读取缓冲区
  393. if (pIncomingPacket->remainingLength > 0)
  394. {
  395. result = RyanMqttRecvPacket(client, pIncomingPacket->pRemainingData, pIncomingPacket->remainingLength);
  396. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  397. }
  398. return result;
  399. __exit:
  400. if (NULL != pIncomingPacket->pRemainingData)
  401. {
  402. platformMemoryFree(pIncomingPacket->pRemainingData);
  403. pIncomingPacket->pRemainingData = NULL;
  404. }
  405. return result;
  406. }
  407. /**
  408. * @brief mqtt数据包处理函数
  409. *
  410. * @param client
  411. * @return RyanMqttError_e
  412. */
  413. RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
  414. {
  415. RyanMqttError_e result = RyanMqttSuccessError;
  416. MQTTPacketInfo_t pIncomingPacket = {0}; // 下面有非空判断
  417. RyanMqttAssert(NULL != client);
  418. result = RyanMqttGetPacketInfo(client, &pIncomingPacket);
  419. if (RyanMqttRecvPacketTimeOutError == result)
  420. {
  421. RyanMqttLog_d("没有待处理的数据包");
  422. goto __exit;
  423. }
  424. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttSerializePacketError, RyanMqttLog_d,
  425. { goto __exit; });
  426. RyanMqttLog_d("pIncomingPacket.type: %x ", pIncomingPacket.type & 0xF0U);
  427. // 控制报文类型
  428. // 发送者QoS2动作 发布PUBLISH报文 -> 等待PUBREC报文 -> 发送PUBREL报文 -> 等待PUBCOMP报文
  429. // 接收者QoS2动作 等待PUBLISH报文 -> 发送PUBREC报文 -> 等待PUBREL报文 -> 发送PUBCOMP报文
  430. switch (pIncomingPacket.type & 0xF0U)
  431. {
  432. case MQTT_PACKET_TYPE_PUBLISH: // 接收到订阅消息
  433. result = RyanMqttPublishPacketHandler(client, &pIncomingPacket);
  434. break;
  435. case MQTT_PACKET_TYPE_CONNACK: // 连接报文确认
  436. {
  437. // 客户端已处于连接状态时又收到CONNACK报文,应该视为严重错误,断开连接
  438. RyanMqttLog_e("收到 CONNACK 时已连接,正在断开连接");
  439. RyanMqttConnectStatus_e connectState = RyanMqttConnectProtocolError;
  440. RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
  441. result = RyanMqttHaveRescourceError;
  442. }
  443. break;
  444. case MQTT_PACKET_TYPE_PUBACK: // 客户端发送QoS 1消息,服务端发布收到确认
  445. case MQTT_PACKET_TYPE_PUBCOMP: // 发送QOS2 发布完成
  446. result = RyanMqttPubackAndPubcompPacketHandler(client, &pIncomingPacket);
  447. break;
  448. case MQTT_PACKET_TYPE_PUBREC: // 客户端发送QOS2,服务端发布PUBREC,需要客户端继续发送PUBREL
  449. result = RyanMqttPubrecPacketHandler(client, &pIncomingPacket);
  450. break;
  451. case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放
  452. if (pIncomingPacket.type & 0x02U)
  453. { // PUBREL 控制报文固定报头的第 3,2,1,0 位必须被设置为
  454. // 0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
  455. result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
  456. }
  457. break;
  458. case MQTT_PACKET_TYPE_SUBACK: // 订阅确认
  459. result = RyanMqttSubackHandler(client, &pIncomingPacket);
  460. break;
  461. case MQTT_PACKET_TYPE_UNSUBACK: // 取消订阅确认
  462. result = RyanMqttUnSubackHandler(client, &pIncomingPacket);
  463. break;
  464. case MQTT_PACKET_TYPE_PINGRESP: // 心跳响应
  465. RyanMqttRefreshKeepaliveTime(client);
  466. result = RyanMqttSuccessError;
  467. break;
  468. default:
  469. RyanMqttLog_w("Unhandled packet type: 0x%02X", pIncomingPacket.type & 0xF0U);
  470. result = RyanMqttDeserializePacketError;
  471. break;
  472. }
  473. __exit:
  474. if (NULL != pIncomingPacket.pRemainingData)
  475. {
  476. platformMemoryFree(pIncomingPacket.pRemainingData);
  477. }
  478. return result;
  479. }