RyanMqttUtileAck.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  3. #include "RyanMqttUtil.h"
  4. #include "RyanMqttLog.h"
  5. #include "RyanMqttThread.h"
  6. /**
  7. * @brief 创建ack句柄
  8. *
  9. * @param client
  10. * @param packetType
  11. * @param packetId
  12. * @param packetLen
  13. * @param packet
  14. * @param msgHandler
  15. * @param pAckHandler
  16. * @param packetAllocatedExternally packet 是外部分配的
  17. * @return RyanMqttError_e
  18. */
  19. RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
  20. uint16_t packetLen, uint8_t *packet, RyanMqttMsgHandler_t *msgHandler,
  21. RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e packetAllocatedExternally)
  22. {
  23. RyanMqttAssert(NULL != client);
  24. RyanMqttAssert(NULL != pAckHandler);
  25. uint32_t mallocSize = sizeof(RyanMqttAckHandler_t);
  26. // 为非预分配的数据包分配额外空间
  27. if (RyanMqttTrue != packetAllocatedExternally)
  28. {
  29. mallocSize += packetLen;
  30. }
  31. // 为非预分配包申请额外空间
  32. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)platformMemoryMalloc(mallocSize);
  33. RyanMqttCheck(NULL != ackHandler, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  34. ackHandler->packetAllocatedExternally = packetAllocatedExternally;
  35. ackHandler->packetType = packetType;
  36. ackHandler->repeatCount = 0;
  37. ackHandler->packetId = packetId;
  38. ackHandler->packetLen = packetLen;
  39. RyanMqttListInit(&ackHandler->list);
  40. // 超时内没有响应将被销毁或重新发送
  41. RyanMqttTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
  42. ackHandler->msgHandler = msgHandler;
  43. if (RyanMqttTrue != packetAllocatedExternally)
  44. {
  45. if (NULL != packet && packetLen > 0)
  46. {
  47. ackHandler->packet = (uint8_t *)ackHandler + sizeof(RyanMqttAckHandler_t);
  48. RyanMqttMemcpy(ackHandler->packet, packet, packetLen); // 将packet数据保存到ack中
  49. }
  50. else
  51. {
  52. ackHandler->packet = NULL;
  53. }
  54. }
  55. else
  56. {
  57. ackHandler->packet = packet;
  58. }
  59. *pAckHandler = ackHandler;
  60. return RyanMqttSuccessError;
  61. }
  62. /**
  63. * @brief 销毁ack句柄
  64. *
  65. * @param client
  66. * @param ackHandler
  67. */
  68. void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  69. {
  70. RyanMqttAssert(NULL != client);
  71. RyanMqttAssert(NULL != ackHandler);
  72. RyanMqttAssert(NULL != ackHandler->msgHandler);
  73. RyanMqttMsgHandlerDestroy(client, ackHandler->msgHandler); // 释放msgHandler
  74. // 释放用户预提供的缓冲区
  75. if (RyanMqttTrue == ackHandler->packetAllocatedExternally)
  76. {
  77. // 不加null判断,因为如果是空,一定是用户程序内存访问越界了
  78. platformMemoryFree(ackHandler->packet);
  79. }
  80. platformMemoryFree(ackHandler);
  81. }
  82. /**
  83. * @brief 检查链表中是否存在ack句柄
  84. *
  85. * @param client
  86. * @param packetType
  87. * @param packetId
  88. * @param pAckHandler
  89. * @param removeOnMatch
  90. * @return RyanMqttError_e
  91. */
  92. RyanMqttError_e RyanMqttAckListNodeFind(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
  93. RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e removeOnMatch)
  94. {
  95. RyanMqttError_e result = RyanMqttSuccessError;
  96. RyanMqttList_t *curr, *next;
  97. RyanMqttAckHandler_t *ackHandler;
  98. RyanMqttAssert(NULL != client);
  99. RyanMqttAssert(NULL != pAckHandler);
  100. *pAckHandler = NULL;
  101. platformMutexLock(client->config.userData, &client->ackHandleLock);
  102. RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
  103. {
  104. ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  105. // 对于 qos1 和 qos2 的 mqtt 数据包,使用数据包 ID 和类型作为唯一
  106. // 标识符,用于确定节点是否已存在并避免重复。
  107. if ((packetId == ackHandler->packetId) && (packetType == ackHandler->packetType))
  108. {
  109. *pAckHandler = ackHandler;
  110. result = RyanMqttSuccessError;
  111. if (RyanMqttTrue == removeOnMatch)
  112. {
  113. RyanMqttAckListRemoveToAckList(client, ackHandler);
  114. }
  115. goto __exit;
  116. }
  117. }
  118. result = RyanMqttNoRescourceError;
  119. __exit:
  120. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  121. return result;
  122. }
  123. /**
  124. * @brief 添加等待ack到链表
  125. *
  126. * @param client
  127. * @param ackHandler
  128. * @return RyanMqttError_e
  129. */
  130. RyanMqttError_e RyanMqttAckListAddToAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  131. {
  132. RyanMqttAssert(NULL != client);
  133. RyanMqttAssert(NULL != ackHandler);
  134. uint16_t tmpAckHandlerCount;
  135. platformMutexLock(client->config.userData, &client->ackHandleLock);
  136. // 将ack节点添加到链表尾部
  137. RyanMqttListAddTail(&ackHandler->list, &client->ackHandlerList);
  138. client->ackHandlerCount++;
  139. tmpAckHandlerCount = client->ackHandlerCount;
  140. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  141. if (tmpAckHandlerCount >= client->config.ackHandlerCountWarning)
  142. {
  143. RyanMqttEventMachine(client, RyanMqttEventAckCountWarning, (void *)&tmpAckHandlerCount);
  144. }
  145. return RyanMqttSuccessError;
  146. }
  147. /**
  148. * @brief 从链表移除ack
  149. *
  150. * @param client
  151. * @param ackHandler
  152. * @return RyanMqttError_e
  153. */
  154. RyanMqttError_e RyanMqttAckListRemoveToAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  155. {
  156. RyanMqttAssert(NULL != client);
  157. RyanMqttAssert(NULL != ackHandler);
  158. platformMutexLock(client->config.userData, &client->ackHandleLock);
  159. RyanMqttListDel(&ackHandler->list);
  160. if (client->ackHandlerCount > 0)
  161. {
  162. client->ackHandlerCount--;
  163. }
  164. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  165. return RyanMqttSuccessError;
  166. }
  167. /**
  168. * @brief 检查用户层链表中是否存在ack句柄
  169. *
  170. * @param client
  171. * @param packetType
  172. * @param packetId
  173. * @param pAckHandler
  174. * @param removeOnMatch
  175. * @return RyanMqttError_e
  176. */
  177. RyanMqttError_e RyanMqttAckListNodeFindByUserAckList(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
  178. RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e removeOnMatch)
  179. {
  180. RyanMqttError_e result = RyanMqttSuccessError;
  181. RyanMqttList_t *curr, *next;
  182. RyanMqttAckHandler_t *ackHandler;
  183. RyanMqttAssert(NULL != client);
  184. RyanMqttAssert(NULL != pAckHandler);
  185. *pAckHandler = NULL;
  186. platformMutexLock(client->config.userData, &client->userSessionLock);
  187. RyanMqttListForEachSafe(curr, next, &client->userAckHandlerList)
  188. {
  189. ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  190. // 对于 qos1 和 qos2 的 mqtt 数据包,使用数据包 ID 和类型作为唯一
  191. // 标识符,用于确定节点是否已存在并避免重复。
  192. if ((packetId == ackHandler->packetId) && (packetType == ackHandler->packetType))
  193. {
  194. *pAckHandler = ackHandler;
  195. result = RyanMqttSuccessError;
  196. if (RyanMqttTrue == removeOnMatch)
  197. {
  198. RyanMqttAckListRemoveToUserAckList(client, ackHandler);
  199. }
  200. goto __exit;
  201. }
  202. }
  203. result = RyanMqttNoRescourceError;
  204. __exit:
  205. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  206. return result;
  207. }
  208. RyanMqttError_e RyanMqttAckListAddToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  209. {
  210. RyanMqttAssert(NULL != client);
  211. RyanMqttAssert(NULL != ackHandler);
  212. platformMutexLock(client->config.userData, &client->userSessionLock);
  213. RyanMqttListAddTail(&ackHandler->list, &client->userAckHandlerList); // 将ack节点添加到链表尾部
  214. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  215. return RyanMqttSuccessError;
  216. }
  217. RyanMqttError_e RyanMqttAckListRemoveToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  218. {
  219. RyanMqttAssert(NULL != client);
  220. RyanMqttAssert(NULL != ackHandler);
  221. platformMutexLock(client->config.userData, &client->userSessionLock);
  222. RyanMqttListDel(&ackHandler->list);
  223. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  224. return RyanMqttSuccessError;
  225. }
  226. void RyanMqttClearAckSession(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId)
  227. {
  228. RyanMqttError_e result = RyanMqttSuccessError;
  229. RyanMqttAckHandler_t *ackHandler;
  230. // 清除所有ack链表
  231. do
  232. {
  233. result = RyanMqttAckListNodeFindByUserAckList(client, packetType, packetId, &ackHandler, RyanMqttTrue);
  234. if (RyanMqttSuccessError == result)
  235. {
  236. RyanMqttAckHandlerDestroy(client, ackHandler);
  237. continue;
  238. }
  239. // 不可能同时在userAck和mqttAck,还有可能已经被添加到ack链表了
  240. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
  241. if (RyanMqttSuccessError == result)
  242. {
  243. RyanMqttAckHandlerDestroy(client, ackHandler);
  244. continue;
  245. }
  246. break;
  247. } while (1);
  248. }