RyanMqttUtile.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. #define rlogLevel (rlogLvlInfo) // 日志打印等级
  2. #include "RyanMqttUtile.h"
  3. /**
  4. * @brief Copies a string segment into newly allocated memory and null-terminates it.
  5. *
  6. * Allocates memory for a string of the specified length, copies the content from the source buffer, appends a null terminator, and assigns the result to the destination pointer.
  7. *
  8. * @param dest Pointer to the destination string pointer to receive the allocated and copied string.
  9. * @param rest Source buffer containing the string segment to copy.
  10. * @param strLen Number of bytes to copy from the source buffer.
  11. * @return RyanMqttSuccessError on success, or RyanMqttNotEnoughMemError if memory allocation fails.
  12. */
  13. RyanMqttError_e RyanMqttStringCopy(char **dest, char *rest, uint32_t strLen)
  14. {
  15. char *str2 = NULL;
  16. RyanMqttAssert(NULL != dest);
  17. RyanMqttAssert(NULL != rest);
  18. // RyanMqttCheck(0 != strLen, RyanMqttFailedError, rlog_d);
  19. str2 = (char *)platformMemoryMalloc(strLen + 1);
  20. if (NULL == str2)
  21. return RyanMqttNotEnoughMemError;
  22. memcpy(str2, rest, strLen);
  23. str2[strLen] = '\0';
  24. *dest = str2;
  25. return RyanMqttSuccessError;
  26. }
  27. /**
  28. * @brief Receives data from the MQTT network context with socket-style return values.
  29. *
  30. * Attempts to receive a specified number of bytes into the provided buffer using the MQTT client's network context.
  31. * Returns the number of bytes received, 0 on timeout, or -1 on failure.
  32. *
  33. * @param pBuffer Buffer to store received data.
  34. * @param bytesToRecv Number of bytes to receive.
  35. * @return int32_t Number of bytes received, 0 if timeout occurred, or -1 on error.
  36. */
  37. int32_t coreMqttTransportRecv(NetworkContext_t *pNetworkContext, void *pBuffer, size_t bytesToRecv)
  38. {
  39. RyanMqttError_e result = RyanMqttSuccessError;
  40. result = RyanMqttRecvPacket(pNetworkContext->client, pBuffer, bytesToRecv);
  41. switch (result)
  42. {
  43. case RyanMqttRecvPacketTimeOutError:
  44. return 0;
  45. case RyanMqttSuccessError:
  46. return (int32_t)bytesToRecv;
  47. case RyanSocketFailedError:
  48. default:
  49. return -1;
  50. }
  51. }
  52. /**
  53. * @brief Receives a specified number of bytes from the network into a buffer with timeout handling.
  54. *
  55. * Attempts to read `recvLen` bytes asynchronously from the client's network connection into `recvBuf` within the configured receive timeout. Accumulates received data until the requested length is met or the timeout expires. If a socket error occurs, triggers a disconnect event and returns a socket failure error. Returns a timeout error if the full length is not received within the timeout period.
  56. *
  57. * @param recvBuf Buffer to store received data.
  58. * @param recvLen Number of bytes to receive.
  59. * @return RyanMqttSuccessError on success, RyanMqttRecvPacketTimeOutError on timeout, or RyanSocketFailedError on socket failure.
  60. */
  61. RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, uint8_t *recvBuf, uint32_t recvLen)
  62. {
  63. uint32_t offset = 0;
  64. int32_t recvResult = 0;
  65. uint32_t timeOut = 0;
  66. platformTimer_t timer = {0};
  67. RyanMqttAssert(NULL != client);
  68. RyanMqttAssert(NULL != recvBuf);
  69. RyanMqttAssert(0 != recvLen);
  70. timeOut = client->config.recvTimeout;
  71. platformTimerInit(&timer);
  72. platformTimerCutdown(&timer, timeOut);
  73. while ((offset < recvLen) && (0 != timeOut))
  74. {
  75. recvResult = platformNetworkRecvAsync(client->config.userData, &client->network, (char *)(recvBuf + offset),
  76. (size_t)(recvLen - offset), (int32_t)timeOut);
  77. // 错误
  78. if (-1 == recvResult)
  79. {
  80. RyanMqttConnectStatus_e connectState = RyanMqttConnectAccepted;
  81. RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
  82. rlog_d("recv错误, result: %d", recvResult);
  83. return RyanSocketFailedError;
  84. }
  85. offset += recvResult;
  86. timeOut = platformTimerRemain(&timer);
  87. }
  88. // rlog_d("offset: %d, recvLen: %d, recvResult: %d", offset, recvLen, recvResult);
  89. // 发送超时
  90. if (offset != recvLen)
  91. return RyanMqttRecvPacketTimeOutError;
  92. return RyanMqttSuccessError;
  93. }
  94. /**
  95. * @brief Sends an MQTT packet asynchronously over the network with timeout and mutex protection.
  96. *
  97. * Attempts to send the specified buffer within the configured timeout period, handling partial sends and socket errors. Returns an error if the full packet cannot be sent or if a socket failure occurs. Refreshes the client's keepalive timer on successful send.
  98. *
  99. * @param sendBuf Pointer to the buffer containing the packet data to send.
  100. * @param sendLen Length of the data to send in bytes.
  101. * @return RyanMqttError_e RyanMqttSuccessError on success, RyanMqttSendPacketTimeOutError on timeout, or RyanSocketFailedError on socket failure.
  102. */
  103. RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, uint8_t *sendBuf, uint32_t sendLen)
  104. {
  105. uint32_t offset = 0;
  106. int32_t sendResult = 0;
  107. uint32_t timeOut = 0;
  108. platformTimer_t timer = {0};
  109. RyanMqttAssert(NULL != client);
  110. RyanMqttAssert(NULL != sendBuf);
  111. RyanMqttAssert(0 != sendLen);
  112. timeOut = client->config.sendTimeout;
  113. platformTimerInit(&timer);
  114. platformTimerCutdown(&timer, timeOut);
  115. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  116. while ((offset < sendLen) && (0 != timeOut))
  117. {
  118. sendResult = platformNetworkSendAsync(client->config.userData, &client->network, (char *)(sendBuf + offset),
  119. (size_t)(sendLen - offset), (int32_t)timeOut);
  120. if (-1 == sendResult)
  121. {
  122. RyanMqttConnectStatus_e connectState = RyanMqttConnectAccepted;
  123. RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
  124. return RyanSocketFailedError;
  125. }
  126. offset += sendResult;
  127. timeOut = platformTimerRemain(&timer);
  128. }
  129. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  130. // 发送超时
  131. if (offset != sendLen)
  132. return RyanMqttSendPacketTimeOutError;
  133. RyanMqttRefreshKeepaliveTime(client); // 只要发送数据就刷新 keepalive 时间,可以降低一些心智负担
  134. return RyanMqttSuccessError;
  135. }
  136. /**
  137. * @brief 设置mqtt客户端状态
  138. *
  139. * @param client
  140. * @param state
  141. */
  142. void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state)
  143. {
  144. RyanMqttAssert(NULL != client);
  145. platformCriticalEnter(client->config.userData, &client->criticalLock);
  146. client->clientState = state;
  147. platformCriticalExit(client->config.userData, &client->criticalLock);
  148. }
  149. /**
  150. * @brief 获取mqtt客户端状态
  151. *
  152. * @param client
  153. * @return RyanMqttState_e
  154. */
  155. RyanMqttState_e RyanMqttGetClientState(RyanMqttClient_t *client)
  156. {
  157. RyanMqttAssert(NULL != client);
  158. return client->clientState;
  159. }
  160. /**
  161. * @brief 根据 MQTT 3.1.1 协议规范确定传递的主题过滤器和主题名称是否匹配的实用程序函数,
  162. * 应仅在strcmp / strncmp不相等时再进行通配符匹配
  163. *
  164. * @param topic 要检查的主题名称
  165. * @param topicLength 主题名称的长度。
  166. * @param topicFilter 要检查的主题过滤器。
  167. * @param topicFilterLength 要检查的主题过滤器长度
  168. * @return RyanMqttBool_e
  169. */
  170. RyanMqttBool_e RyanMqttMatchTopic(const char *topic,
  171. const uint16_t topicLength,
  172. const char *topicFilter,
  173. const uint16_t topicFilterLength)
  174. {
  175. RyanMqttBool_e topicFilterStartsWithWildcard = RyanMqttFalse,
  176. matchFound = RyanMqttFalse,
  177. shouldStopMatching = RyanMqttFalse;
  178. uint16_t topicIndex = 0,
  179. topicFilterIndex = 0;
  180. RyanMqttAssert((NULL != topic) && (topicLength != 0u));
  181. RyanMqttAssert((NULL != topicFilter) && (topicFilterLength != 0u));
  182. // 确定主题过滤器是否以通配符开头。
  183. topicFilterStartsWithWildcard = (RyanMqttBool_e)((topicFilter[0] == '+') || (topicFilter[0] == '#'));
  184. // 不能将 $ 字符开头的主题名匹配通配符 (#或+) 开头的主题过滤器
  185. if ((topic[0] == '$') && (topicFilterStartsWithWildcard == RyanMqttTrue))
  186. return RyanMqttFalse;
  187. // 匹配主题名称和主题过滤器,允许使用通配符。
  188. while ((topicIndex < topicLength) && (topicFilterIndex < topicFilterLength))
  189. {
  190. // 检查主题名称中的字符是否与主题筛选器字符串中的对应字符匹配。
  191. if (topic[topicIndex] == topicFilter[topicFilterIndex])
  192. {
  193. // 当主题名称已被消耗但主题过滤器中还有剩余字符需要匹配时,此功能处理以下两种情况:
  194. // -当主题过滤器以"/+"或"/#"字符结尾时,主题名称以"/"结尾。
  195. // -当主题过滤器以"/#"字符结尾时,主题名称以父级别结尾。
  196. if (topicIndex == (topicLength - 1U))
  197. {
  198. // 检查主题筛选器是否有2个剩余字符,并且以"/#"结尾。
  199. // 此检查处理将筛选器"sport/#"与主题"sport"匹配的情况。
  200. // 原因是"#"通配符表示主题名称中的父级和任意数量的子级。
  201. if ((topicFilterLength >= 3U) &&
  202. (topicFilterIndex == (topicFilterLength - 3U)) &&
  203. (topicFilter[topicFilterIndex + 1U] == '/') &&
  204. (topicFilter[topicFilterIndex + 2U] == '#'))
  205. matchFound = RyanMqttTrue;
  206. // 检查下一个字符是否为"#"或"+",主题过滤器以"/#"或"/+"结尾。
  207. // 此检查处理要匹配的情况:
  208. // -主题过滤器"sport/+"与主题"sport/"。
  209. // -主题过滤器"sport/#",主题为"sport/"。
  210. if ((topicFilterIndex == (topicFilterLength - 2U)) &&
  211. (topicFilter[topicFilterIndex] == '/'))
  212. // 检查最后一个字符是否为通配符
  213. matchFound = (RyanMqttBool_e)((topicFilter[topicFilterIndex + 1U] == '+') || (topicFilter[topicFilterIndex + 1U] == '#'));
  214. }
  215. }
  216. else
  217. {
  218. // 检查是否匹配通配符
  219. RyanMqttBool_e locationIsValidForWildcard;
  220. // 主题过滤器中的通配符仅在起始位置或前面有"/"时有效。
  221. locationIsValidForWildcard = (RyanMqttBool_e)((topicFilterIndex == 0u) || (topicFilter[topicFilterIndex - 1U] == '/'));
  222. if ((topicFilter[topicFilterIndex] == '+') && (locationIsValidForWildcard == RyanMqttTrue))
  223. {
  224. RyanMqttBool_e nextLevelExistsInTopicName = RyanMqttFalse;
  225. RyanMqttBool_e nextLevelExistsinTopicFilter = RyanMqttFalse;
  226. // 将主题名称索引移动到当前级别的末尾, 当前级别的结束由下一个级别分隔符"/"之前的最后一个字符标识。
  227. while (topicIndex < topicLength)
  228. {
  229. // 如果我们碰到级别分隔符,则退出循环
  230. if (topic[topicIndex] == '/')
  231. {
  232. nextLevelExistsInTopicName = RyanMqttTrue;
  233. break;
  234. }
  235. (topicIndex)++;
  236. }
  237. // 确定主题过滤器是否包含在由"+"通配符表示的当前级别之后的子级别。
  238. if ((topicFilterIndex < (topicFilterLength - 1U)) &&
  239. (topicFilter[topicFilterIndex + 1U] == '/'))
  240. nextLevelExistsinTopicFilter = RyanMqttTrue;
  241. // 如果主题名称包含子级别但主题过滤器在当前级别结束,则不存在匹配项。
  242. if ((nextLevelExistsInTopicName == RyanMqttTrue) &&
  243. (nextLevelExistsinTopicFilter == RyanMqttFalse))
  244. {
  245. matchFound = RyanMqttFalse;
  246. shouldStopMatching = RyanMqttTrue;
  247. }
  248. // 如果主题名称和主题过滤器有子级别,则将过滤器索引推进到主题过滤器中的级别分隔符,以便在下一个级别进行匹配。
  249. // 注意:名称索引已经指向主题名称中的级别分隔符。
  250. else if (nextLevelExistsInTopicName == RyanMqttTrue)
  251. (topicFilterIndex)++;
  252. // 如果我们已经到达这里,循环以(*pNameIndex < topicLength)条件终止,
  253. // 这意味着已经超过主题名称的末尾,因此,我们将索引缩减为主题名称中的最后一个字符。
  254. else
  255. (topicIndex)--;
  256. }
  257. // "#"匹配主题名称中剩余的所有内容。它必须是主题过滤器中的最后一个字符。
  258. else if ((topicFilter[topicFilterIndex] == '#') &&
  259. (topicFilterIndex == (topicFilterLength - 1U)) &&
  260. (locationIsValidForWildcard == RyanMqttTrue))
  261. {
  262. // 后续字符不需要检查多级通配符。
  263. matchFound = RyanMqttTrue;
  264. shouldStopMatching = RyanMqttTrue;
  265. }
  266. else
  267. {
  268. // 除"+"或"#"以外的任何字符不匹配均表示主题名称与主题过滤器不匹配。
  269. matchFound = RyanMqttFalse;
  270. shouldStopMatching = RyanMqttTrue;
  271. }
  272. }
  273. if ((matchFound == RyanMqttTrue) || (shouldStopMatching == RyanMqttTrue))
  274. break;
  275. // 增量索引
  276. topicIndex++;
  277. topicFilterIndex++;
  278. }
  279. // 如果已到达两个字符串的末尾,则它们匹配。这表示当主题过滤器在非起始位置包含 "+" 通配符时的情况。
  280. // 例如,当将 "sport/+/player" 或 "sport/hockey/+" 主题过滤器与 "sport/hockey/player" 主题名称匹配时。
  281. if (matchFound == RyanMqttFalse)
  282. matchFound = (RyanMqttBool_e)((topicIndex == topicLength) && (topicFilterIndex == topicFilterLength));
  283. return matchFound;
  284. }
  285. /**
  286. * @brief Allocates and initializes a message handler for a specific topic and QoS.
  287. *
  288. * Creates a new message handler structure, copies the provided topic string, sets the QoS, and initializes the internal list node. Returns an error if memory allocation fails.
  289. *
  290. * @param topic Pointer to the topic string to associate with the handler.
  291. * @param topicLen Length of the topic string.
  292. * @param qos Quality of Service level for the handler.
  293. * @param pMsgHandler Output pointer to the created message handler.
  294. * @return RyanMqttError_e RyanMqttSuccessError on success, or an error code on failure.
  295. */
  296. RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, const char *topic, uint16_t topicLen, RyanMqttQos_e qos, RyanMqttMsgHandler_t **pMsgHandler)
  297. {
  298. RyanMqttMsgHandler_t *msgHandler = NULL;
  299. RyanMqttAssert(NULL != client);
  300. RyanMqttAssert(NULL != topic);
  301. RyanMqttAssert(NULL != pMsgHandler);
  302. RyanMqttAssert(RyanMqttQos0 == qos || RyanMqttQos1 == qos || RyanMqttQos2 == qos);
  303. msgHandler = (RyanMqttMsgHandler_t *)platformMemoryMalloc(sizeof(RyanMqttMsgHandler_t) + topicLen + 1);
  304. RyanMqttCheck(NULL != msgHandler, RyanMqttNotEnoughMemError, rlog_d);
  305. memset(msgHandler, 0, sizeof(RyanMqttMsgHandler_t) + topicLen + 1);
  306. // 初始化链表
  307. RyanListInit(&msgHandler->list);
  308. msgHandler->qos = qos;
  309. msgHandler->topicLen = topicLen;
  310. msgHandler->topic = (char *)msgHandler + sizeof(RyanMqttMsgHandler_t);
  311. memcpy(msgHandler->topic, topic, topicLen); // 将packet数据保存到ack中
  312. *pMsgHandler = msgHandler;
  313. return RyanMqttSuccessError;
  314. }
  315. /**
  316. * @brief Frees the memory allocated for a message handler.
  317. *
  318. * Releases resources associated with the specified message handler.
  319. */
  320. void RyanMqttMsgHandlerDestory(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
  321. {
  322. RyanMqttAssert(NULL != client);
  323. RyanMqttAssert(NULL != msgHandler);
  324. platformMemoryFree(msgHandler);
  325. }
  326. /**
  327. * @brief Searches for a message handler matching a given topic.
  328. *
  329. * Traverses the client's message handler list to find a handler whose topic matches the specified topic. Supports exact or wildcard matching based on the topicMatchedFlag.
  330. *
  331. * @param topic The topic string to search for.
  332. * @param topicLen The length of the topic string.
  333. * @param topicMatchedFlag If true, enables wildcard matching; otherwise, only exact matches are considered.
  334. * @param pMsgHandler Output pointer set to the found message handler if successful.
  335. * @return RyanMqttSuccessError if a matching handler is found; RyanMqttNoRescourceError if not found.
  336. */
  337. RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, const char *topic, uint16_t topicLen, RyanMqttBool_e topicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler)
  338. {
  339. RyanMqttError_e result = RyanMqttSuccessError;
  340. RyanList_t *curr = NULL,
  341. *next = NULL;
  342. RyanMqttMsgHandler_t *msgHandler = NULL;
  343. RyanMqttAssert(NULL != client);
  344. RyanMqttAssert(NULL != topic && 0 != topicLen);
  345. RyanMqttAssert(NULL != pMsgHandler);
  346. platformMutexLock(client->config.userData, &client->msgHandleLock);
  347. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  348. {
  349. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  350. // 不进行通配符匹配
  351. if (RyanMqttTrue != topicMatchedFlag)
  352. {
  353. // 不相等跳过
  354. if (topicLen != msgHandler->topicLen)
  355. continue;
  356. // 主题名称不相等且没有使能通配符匹配
  357. if (0 != strncmp(topic, msgHandler->topic, topicLen))
  358. continue;
  359. }
  360. // 进行通配符匹配
  361. if (RyanMqttTrue != RyanMqttMatchTopic(topic, topicLen, msgHandler->topic, msgHandler->topicLen))
  362. continue;
  363. *pMsgHandler = msgHandler;
  364. result = RyanMqttSuccessError;
  365. goto __exit;
  366. }
  367. result = RyanMqttNoRescourceError;
  368. __exit:
  369. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  370. return result;
  371. }
  372. /**
  373. * @brief 将msg句柄存入client msg链表
  374. *
  375. * @param client
  376. * @param msgHandler
  377. * @return int32_t
  378. */
  379. RyanMqttError_e RyanMqttMsgHandlerAddToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
  380. {
  381. RyanMqttAssert(NULL != client);
  382. RyanMqttAssert(NULL != msgHandler);
  383. platformMutexLock(client->config.userData, &client->msgHandleLock);
  384. RyanListAddTail(&msgHandler->list, &client->msgHandlerList); // 将msgHandler节点添加到链表尾部
  385. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  386. return RyanMqttSuccessError;
  387. }
  388. /**
  389. * @brief 将msg句柄存入client msg链表
  390. *
  391. * @param client
  392. * @param msgHandler
  393. * @return int32_t
  394. */
  395. RyanMqttError_e RyanMqttMsgHandlerRemoveToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
  396. {
  397. RyanMqttAssert(NULL != client);
  398. RyanMqttAssert(NULL != msgHandler);
  399. platformMutexLock(client->config.userData, &client->msgHandleLock);
  400. RyanListDel(&msgHandler->list);
  401. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  402. return RyanMqttSuccessError;
  403. }
  404. /**
  405. * @brief Allocates and initializes an acknowledgment handler for a specific MQTT packet.
  406. *
  407. * Creates an acknowledgment handler structure for the given packet type, packet ID, and packet data. The function supports using either a preallocated packet buffer or allocating and copying the packet data internally, based on the isPreallocatedPacket flag. Initializes the handler's timer for acknowledgment timeout and associates it with the provided message handler.
  408. *
  409. * @param packetType The MQTT packet type for which the acknowledgment handler is created.
  410. * @param packetId The packet identifier associated with the acknowledgment.
  411. * @param packetLen The length of the packet data.
  412. * @param packet Pointer to the packet data buffer.
  413. * @param msgHandler Pointer to the associated message handler.
  414. * @param pAckHandler Output pointer to the created acknowledgment handler.
  415. * @param isPreallocatedPacket Indicates whether the packet buffer is preallocated (true) or should be copied (false).
  416. * @return RyanMqttError_e Returns RyanMqttSuccessError on success, or RyanMqttNotEnoughMemError if memory allocation fails.
  417. */
  418. RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId, uint16_t packetLen,
  419. uint8_t *packet, RyanMqttMsgHandler_t *msgHandler, RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e isPreallocatedPacket)
  420. {
  421. RyanMqttAckHandler_t *ackHandler = NULL;
  422. uint32_t mallocLen = 0;
  423. RyanMqttAssert(NULL != client);
  424. RyanMqttAssert(NULL != msgHandler);
  425. RyanMqttAssert(NULL != pAckHandler);
  426. mallocLen = sizeof(RyanMqttAckHandler_t);
  427. if (RyanMqttTrue != isPreallocatedPacket)
  428. mallocLen += packetLen + 1;
  429. // 给消息主题添加空格
  430. ackHandler = (RyanMqttAckHandler_t *)platformMemoryMalloc(mallocLen);
  431. RyanMqttCheck(NULL != ackHandler, RyanMqttNotEnoughMemError, rlog_d);
  432. memset(ackHandler, 0, mallocLen);
  433. RyanListInit(&ackHandler->list);
  434. platformTimerCutdown(&ackHandler->timer, client->config.ackTimeout); // 超时内没有响应将被销毁或重新发送
  435. ackHandler->isPreallocatedPacket = isPreallocatedPacket;
  436. ackHandler->repeatCount = 0;
  437. ackHandler->packetId = packetId;
  438. ackHandler->packetLen = packetLen;
  439. ackHandler->packetType = packetType;
  440. ackHandler->msgHandler = msgHandler;
  441. if (RyanMqttTrue != isPreallocatedPacket)
  442. {
  443. if (packetLen > 0)
  444. {
  445. ackHandler->packet = (uint8_t *)ackHandler + sizeof(RyanMqttAckHandler_t);
  446. memcpy(ackHandler->packet, packet, packetLen); // 将packet数据保存到ack中
  447. }
  448. else
  449. {
  450. ackHandler->packet = NULL;
  451. }
  452. }
  453. else
  454. {
  455. ackHandler->packet = packet;
  456. }
  457. *pAckHandler = ackHandler;
  458. return RyanMqttSuccessError;
  459. }
  460. /**
  461. * @brief Frees an acknowledgment handler and its associated resources.
  462. *
  463. * Releases the memory for the acknowledgment handler, its associated message handler, and the packet buffer if it was preallocated.
  464. */
  465. void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  466. {
  467. RyanMqttAssert(NULL != client);
  468. RyanMqttAssert(NULL != ackHandler);
  469. RyanMqttAssert(NULL != ackHandler->msgHandler);
  470. RyanMqttMsgHandlerDestory(client, ackHandler->msgHandler); // 释放msgHandler
  471. if (RyanMqttTrue == ackHandler->isPreallocatedPacket && NULL != ackHandler->packet)
  472. platformMemoryFree(ackHandler->packet);
  473. platformMemoryFree(ackHandler);
  474. }
  475. /**
  476. * @brief Searches for an acknowledgment handler in the client's list matching the given packet type and ID.
  477. *
  478. * If a matching acknowledgment handler is found, sets the output pointer to it and returns success. Returns a no resource error if not found. The search is protected by a mutex.
  479. *
  480. * @param packetType MQTT packet type to match.
  481. * @param packetId Packet identifier to match.
  482. * @param pAckHandler Output pointer for the found acknowledgment handler, if any.
  483. * @return RyanMqttSuccessError if found, RyanMqttNoRescourceError if not found.
  484. */
  485. RyanMqttError_e RyanMqttAckListNodeFind(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId, RyanMqttAckHandler_t **pAckHandler)
  486. {
  487. RyanMqttError_e result = RyanMqttSuccessError;
  488. RyanList_t *curr, *next;
  489. RyanMqttAckHandler_t *ackHandler;
  490. RyanMqttAssert(NULL != client);
  491. RyanMqttAssert(NULL != pAckHandler);
  492. platformMutexLock(client->config.userData, &client->ackHandleLock);
  493. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  494. {
  495. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  496. // 对于 qos1 和 qos2 的 mqtt 数据包,使用数据包 ID 和类型作为唯一
  497. // 标识符,用于确定节点是否已存在并避免重复。
  498. if ((packetId == ackHandler->packetId) && (packetType == ackHandler->packetType))
  499. {
  500. *pAckHandler = ackHandler;
  501. result = RyanMqttSuccessError;
  502. goto __exit;
  503. }
  504. }
  505. result = RyanMqttNoRescourceError;
  506. __exit:
  507. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  508. return result;
  509. }
  510. /**
  511. * @brief 添加等待ack到链表
  512. *
  513. * @param client
  514. * @param ackHandler
  515. * @return RyanMqttError_e
  516. */
  517. RyanMqttError_e RyanMqttAckListAddToAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  518. {
  519. RyanMqttAssert(NULL != client);
  520. RyanMqttAssert(NULL != ackHandler);
  521. platformMutexLock(client->config.userData, &client->ackHandleLock);
  522. // 将ack节点添加到链表尾部
  523. RyanListAddTail(&ackHandler->list, &client->ackHandlerList);
  524. client->ackHandlerCount++;
  525. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  526. if (client->ackHandlerCount >= client->config.ackHandlerCountWarning)
  527. RyanMqttEventMachine(client, RyanMqttEventAckCountWarning, (void *)&client->ackHandlerCount);
  528. return RyanMqttSuccessError;
  529. }
  530. /**
  531. * @brief 从链表移除ack
  532. *
  533. * @param client
  534. * @param ackHandler
  535. * @return RyanMqttError_e
  536. */
  537. RyanMqttError_e RyanMqttAckListRemoveToAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  538. {
  539. RyanMqttAssert(NULL != client);
  540. RyanMqttAssert(NULL != ackHandler);
  541. platformMutexLock(client->config.userData, &client->ackHandleLock);
  542. // 将ack节点添加到链表尾部
  543. RyanListDel(&ackHandler->list);
  544. if (client->ackHandlerCount > 0)
  545. client->ackHandlerCount--;
  546. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  547. return RyanMqttSuccessError;
  548. }
  549. /**
  550. * @brief 添加等待ack到链表
  551. *
  552. * @param client
  553. * @param ackHandler
  554. * @return RyanMqttError_e
  555. */
  556. RyanMqttError_e RyanMqttAckListAddToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  557. {
  558. RyanMqttAssert(NULL != client);
  559. RyanMqttAssert(NULL != ackHandler);
  560. platformMutexLock(client->config.userData, &client->userAckHandleLock);
  561. RyanListAddTail(&ackHandler->list, &client->userAckHandlerList); // 将ack节点添加到链表尾部
  562. platformMutexUnLock(client->config.userData, &client->userAckHandleLock);
  563. return RyanMqttSuccessError;
  564. }
  565. /**
  566. * @brief 从链表移除ack
  567. *
  568. * @param client
  569. * @param ackHandler
  570. * @return RyanMqttError_e
  571. */
  572. RyanMqttError_e RyanMqttAckListRemoveToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  573. {
  574. RyanMqttAssert(NULL != client);
  575. RyanMqttAssert(NULL != ackHandler);
  576. platformMutexLock(client->config.userData, &client->userAckHandleLock);
  577. RyanListDel(&ackHandler->list);
  578. platformMutexUnLock(client->config.userData, &client->userAckHandleLock);
  579. return RyanMqttSuccessError;
  580. }
  581. /**
  582. * @brief 清理session
  583. *
  584. * @param client
  585. */
  586. void RyanMqttCleanSession(RyanMqttClient_t *client)
  587. {
  588. RyanList_t *curr = NULL,
  589. *next = NULL;
  590. RyanMqttAckHandler_t *ackHandler = NULL;
  591. RyanMqttAckHandler_t *userAckHandler = NULL;
  592. RyanMqttMsgHandler_t *msgHandler = NULL;
  593. RyanMqttAssert(NULL != client);
  594. // 释放所有msg_handler_list内存
  595. platformMutexLock(client->config.userData, &client->msgHandleLock);
  596. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  597. {
  598. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  599. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  600. RyanMqttMsgHandlerDestory(client, msgHandler);
  601. }
  602. RyanListDelInit(&client->msgHandlerList);
  603. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  604. // 释放所有ackHandler_list内存
  605. platformMutexLock(client->config.userData, &client->ackHandleLock);
  606. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  607. {
  608. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  609. RyanMqttAckListRemoveToAckList(client, ackHandler);
  610. RyanMqttAckHandlerDestroy(client, ackHandler);
  611. }
  612. RyanListDelInit(&client->ackHandlerList);
  613. client->ackHandlerCount = 0;
  614. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  615. // 释放所有userAckHandler_list内存
  616. platformMutexLock(client->config.userData, &client->userAckHandleLock);
  617. RyanListForEachSafe(curr, next, &client->userAckHandlerList)
  618. {
  619. userAckHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  620. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  621. RyanMqttAckHandlerDestroy(client, userAckHandler);
  622. }
  623. RyanListDelInit(&client->userAckHandlerList);
  624. platformMutexUnLock(client->config.userData, &client->userAckHandleLock);
  625. }
  626. const char *RyanMqttStrError(int32_t state)
  627. {
  628. const char *str = NULL;
  629. switch (state)
  630. {
  631. case RyanMqttRecvPacketTimeOutError:
  632. str = "读取数据超时";
  633. break;
  634. case RyanMqttParamInvalidError:
  635. str = "无效参数";
  636. break;
  637. case RyanSocketFailedError:
  638. str = "套接字失败";
  639. break;
  640. case RyanMqttSendPacketError:
  641. str = "数据包发送失败";
  642. break;
  643. case RyanMqttSerializePacketError:
  644. str = "序列化报文失败";
  645. break;
  646. case RyanMqttDeserializePacketError:
  647. str = "反序列化报文失败";
  648. break;
  649. case RyanMqttNoRescourceError:
  650. str = "没有资源";
  651. break;
  652. case RyanMqttHaveRescourceError:
  653. str = "资源已存在";
  654. break;
  655. case RyanMqttNotConnectError:
  656. str = "mqttClient没有连接";
  657. break;
  658. case RyanMqttConnectError:
  659. str = "mqttClient已经连接";
  660. break;
  661. case RyanMqttRecvBufToShortError:
  662. str = "接收缓冲区不足";
  663. break;
  664. case RyanMqttSendBufToShortError:
  665. str = "发送缓冲区不足";
  666. break;
  667. case RyanMqttSocketConnectFailError:
  668. str = "socket连接失败";
  669. break;
  670. case RyanMqttNotEnoughMemError:
  671. str = "动态内存不足";
  672. break;
  673. case RyanMqttFailedError:
  674. str = "mqtt失败, 详细信息请看函数内部";
  675. break;
  676. case RyanMqttSuccessError:
  677. str = "mqtt成功, 详细信息请看函数内部";
  678. break;
  679. case RyanMqttConnectRefusedProtocolVersion:
  680. str = "mqtt断开连接, 服务端不支持客户端请求的 MQTT 协议级别";
  681. break;
  682. case RyanMqttConnectRefusedIdentifier:
  683. str = "mqtt断开连接, 不合格的客户端标识符";
  684. break;
  685. case RyanMqttConnectRefusedServer:
  686. str = "mqtt断开连接, 服务端不可用";
  687. break;
  688. case RyanMqttConnectRefusedUsernamePass:
  689. str = "mqtt断开连接, 无效的用户名或密码";
  690. break;
  691. case RyanMqttConnectRefusedNotAuthorized:
  692. str = "mqtt断开连接, 连接已拒绝,未授权";
  693. break;
  694. case RyanMqttConnectClientInvalid:
  695. str = "mqtt断开连接, 客户端处于无效状态";
  696. break;
  697. case RyanMqttConnectNetWorkFail:
  698. str = "mqtt断开连接, 网络错误";
  699. break;
  700. case RyanMqttConnectDisconnected:
  701. str = "mqtt断开连接, mqtt客户端断开连接";
  702. break;
  703. case RyanMqttKeepaliveTimeout:
  704. str = "mqtt断开连接, 心跳超时断开连接";
  705. break;
  706. case RyanMqttConnectUserDisconnected:
  707. str = "mqtt断开连接, 用户手动断开连接";
  708. break;
  709. case RyanMqttConnectTimeout:
  710. str = "mqtt断开连接, connect超时断开";
  711. break;
  712. default:
  713. str = "未知错误描述";
  714. break;
  715. }
  716. return str;
  717. }