RyanMqttUtileMsg.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  3. #include "RyanMqttUtil.h"
  4. #include "RyanMqttLog.h"
  5. #include "RyanMqttThread.h"
  6. /**
  7. * @brief 根据 MQTT 3.1.1 协议规范确定传递的主题过滤器和主题名称是否匹配的实用程序函数,
  8. * 应仅在strcmp / strncmp不相等时再进行通配符匹配
  9. *
  10. * @param topic 要检查的主题名称
  11. * @param topicLength 主题名称的长度。
  12. * @param topicFilter 要检查的主题过滤器。
  13. * @param topicFilterLength 要检查的主题过滤器长度
  14. * @return RyanMqttBool_e
  15. */
  16. static RyanMqttBool_e RyanMqttMatchTopic(const char *topic, const uint16_t topicLength, const char *topicFilter,
  17. const uint16_t topicFilterLength)
  18. {
  19. RyanMqttBool_e topicFilterStartsWithWildcard = RyanMqttFalse, matchFound = RyanMqttFalse,
  20. shouldStopMatching = RyanMqttFalse;
  21. uint16_t topicIndex = 0, topicFilterIndex = 0;
  22. RyanMqttAssert((NULL != topic) && (topicLength != 0u));
  23. RyanMqttAssert((NULL != topicFilter) && (topicFilterLength != 0u));
  24. // 确定主题过滤器是否以通配符开头。
  25. topicFilterStartsWithWildcard = (RyanMqttBool_e)((topicFilter[0] == '+') || (topicFilter[0] == '#'));
  26. // 不能将 $ 字符开头的主题名匹配通配符 (#或+) 开头的主题过滤器
  27. if ((topic[0] == '$') && (topicFilterStartsWithWildcard == RyanMqttTrue))
  28. {
  29. return RyanMqttFalse;
  30. }
  31. // 匹配主题名称和主题过滤器,允许使用通配符。
  32. while ((topicIndex < topicLength) && (topicFilterIndex < topicFilterLength))
  33. {
  34. // 检查主题名称中的字符是否与主题筛选器字符串中的对应字符匹配。
  35. if (topic[topicIndex] == topicFilter[topicFilterIndex])
  36. {
  37. // 当主题名称已被消耗但主题过滤器中还有剩余字符需要匹配时,此功能处理以下两种情况:
  38. // -当主题过滤器以"/+"或"/#"字符结尾时,主题名称以"/"结尾。
  39. // -当主题过滤器以"/#"字符结尾时,主题名称以父级别结尾。
  40. if (topicIndex == (topicLength - 1U))
  41. {
  42. // 检查主题筛选器是否有2个剩余字符,并且以"/#"结尾。
  43. // 此检查处理将筛选器"sport/#"与主题"sport"匹配的情况。
  44. // 原因是"#"通配符表示主题名称中的父级和任意数量的子级。
  45. if ((topicFilterLength >= 3U) && (topicFilterIndex == (topicFilterLength - 3U)) &&
  46. (topicFilter[topicFilterIndex + 1U] == '/') &&
  47. (topicFilter[topicFilterIndex + 2U] == '#'))
  48. {
  49. matchFound = RyanMqttTrue;
  50. }
  51. // 检查下一个字符是否为"#"或"+",主题过滤器以"/#"或"/+"结尾。
  52. // 此检查处理要匹配的情况:
  53. // -主题过滤器"sport/+"与主题"sport/"。
  54. // -主题过滤器"sport/#",主题为"sport/"。
  55. if ((topicFilterIndex == (topicFilterLength - 2U)) &&
  56. (topicFilter[topicFilterIndex] == '/'))
  57. {
  58. // 检查最后一个字符是否为通配符
  59. matchFound = (RyanMqttBool_e)((topicFilter[topicFilterIndex + 1U] == '+') ||
  60. (topicFilter[topicFilterIndex + 1U] == '#'));
  61. }
  62. }
  63. }
  64. else
  65. {
  66. // 检查是否匹配通配符
  67. RyanMqttBool_e locationIsValidForWildcard;
  68. // 主题过滤器中的通配符仅在起始位置或前面有"/"时有效。
  69. locationIsValidForWildcard = (RyanMqttBool_e)((topicFilterIndex == 0u) ||
  70. (topicFilter[topicFilterIndex - 1U] == '/'));
  71. if ((topicFilter[topicFilterIndex] == '+') && (locationIsValidForWildcard == RyanMqttTrue))
  72. {
  73. RyanMqttBool_e nextLevelExistsInTopicName = RyanMqttFalse;
  74. RyanMqttBool_e nextLevelExistsinTopicFilter = RyanMqttFalse;
  75. // 将主题名称索引移动到当前级别的末尾,
  76. // 当前级别的结束由下一个级别分隔符"/"之前的最后一个字符标识。
  77. while (topicIndex < topicLength)
  78. {
  79. // 如果我们碰到级别分隔符,则退出循环
  80. if (topic[topicIndex] == '/')
  81. {
  82. nextLevelExistsInTopicName = RyanMqttTrue;
  83. break;
  84. }
  85. (topicIndex)++;
  86. }
  87. // 确定主题过滤器是否包含在由"+"通配符表示的当前级别之后的子级别。
  88. if ((topicFilterIndex < (topicFilterLength - 1U)) &&
  89. (topicFilter[topicFilterIndex + 1U] == '/'))
  90. {
  91. nextLevelExistsinTopicFilter = RyanMqttTrue;
  92. }
  93. // 如果主题名称包含子级别但主题过滤器在当前级别结束,则不存在匹配项。
  94. if ((nextLevelExistsInTopicName == RyanMqttTrue) &&
  95. (nextLevelExistsinTopicFilter == RyanMqttFalse))
  96. {
  97. matchFound = RyanMqttFalse;
  98. shouldStopMatching = RyanMqttTrue;
  99. }
  100. // 如果主题名称和主题过滤器有子级别,则将过滤器索引推进到主题过滤器中的级别分隔符,以便在下一个级别进行匹配。
  101. // 注意:名称索引已经指向主题名称中的级别分隔符。
  102. else if (nextLevelExistsInTopicName == RyanMqttTrue)
  103. {
  104. (topicFilterIndex)++;
  105. }
  106. // 如果我们已经到达这里,循环以(*pNameIndex <
  107. // topicLength)条件终止,
  108. // 这意味着已经超过主题名称的末尾,因此,我们将索引缩减为主题名称中的最后一个字符。
  109. else
  110. {
  111. (topicIndex)--;
  112. }
  113. }
  114. // "#"匹配主题名称中剩余的所有内容。它必须是主题过滤器中的最后一个字符。
  115. else if ((topicFilter[topicFilterIndex] == '#') &&
  116. (topicFilterIndex == (topicFilterLength - 1U)) &&
  117. (locationIsValidForWildcard == RyanMqttTrue))
  118. {
  119. // 后续字符不需要检查多级通配符。
  120. matchFound = RyanMqttTrue;
  121. shouldStopMatching = RyanMqttTrue;
  122. }
  123. else
  124. {
  125. // 除"+"或"#"以外的任何字符不匹配均表示主题名称与主题过滤器不匹配。
  126. matchFound = RyanMqttFalse;
  127. shouldStopMatching = RyanMqttTrue;
  128. }
  129. }
  130. if ((matchFound == RyanMqttTrue) || (shouldStopMatching == RyanMqttTrue))
  131. {
  132. break;
  133. }
  134. // 增量索引
  135. topicIndex++;
  136. topicFilterIndex++;
  137. }
  138. // 如果已到达两个字符串的末尾,则它们匹配。这表示当主题过滤器在非起始位置包含 "+"
  139. // 通配符时的情况。 例如,当将 "sport/+/player" 或 "sport/hockey/+" 主题过滤器与
  140. // "sport/hockey/player" 主题名称匹配时。
  141. if (matchFound == RyanMqttFalse)
  142. {
  143. matchFound = (RyanMqttBool_e)((topicIndex == topicLength) && (topicFilterIndex == topicFilterLength));
  144. }
  145. return matchFound;
  146. }
  147. /**
  148. * @brief 检查消息处理程序的主题是否与给定主题匹配
  149. *
  150. * @param msgHandler
  151. * @param topic
  152. * @param topicLen
  153. * @param isTopicMatchedFlag
  154. * @return RyanMqttBool_e
  155. */
  156. static RyanMqttBool_e RyanMqttMsgTopicIsMatch(RyanMqttMsgHandler_t *msgHandler, const char *topic, uint16_t topicLen,
  157. RyanMqttBool_e isTopicMatchedFlag)
  158. {
  159. // 不进行通配符匹配
  160. if (RyanMqttTrue != isTopicMatchedFlag)
  161. {
  162. // 不相等跳过
  163. if (topicLen != msgHandler->topicLen)
  164. {
  165. return RyanMqttFalse;
  166. }
  167. // 主题名称不相等且没有使能通配符匹配
  168. if (0 != RyanMqttStrncmp(topic, msgHandler->topic, topicLen))
  169. {
  170. return RyanMqttFalse;
  171. }
  172. }
  173. else
  174. {
  175. // 进行通配符匹配
  176. if (RyanMqttTrue != RyanMqttMatchTopic(topic, topicLen, msgHandler->topic, msgHandler->topicLen))
  177. {
  178. return RyanMqttFalse;
  179. }
  180. }
  181. return RyanMqttTrue;
  182. }
  183. /**
  184. * @brief 创建msg句柄
  185. *
  186. * @param topic
  187. * @param topicLen
  188. * @param qos
  189. * @param pMsgHandler
  190. * @return RyanMqttError_e
  191. */
  192. RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, const char *topic, uint16_t topicLen,
  193. uint16_t packetId, RyanMqttQos_e qos, void *userData,
  194. RyanMqttMsgHandler_t **pMsgHandler)
  195. {
  196. RyanMqttAssert(NULL != client);
  197. RyanMqttAssert(NULL != topic);
  198. RyanMqttAssert(NULL != pMsgHandler);
  199. RyanMqttAssert(RyanMqttQos0 == qos || RyanMqttQos1 == qos || RyanMqttQos2 == qos || RyanMqttSubFail == qos);
  200. uint32_t mallocSize = sizeof(RyanMqttMsgHandler_t) + topicLen + 1;
  201. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)platformMemoryMalloc(mallocSize);
  202. RyanMqttCheck(NULL != msgHandler, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  203. msgHandler->packetId = packetId;
  204. msgHandler->topicLen = topicLen;
  205. msgHandler->qos = qos;
  206. RyanMqttListInit(&msgHandler->list); // 初始化链表
  207. msgHandler->userData = userData;
  208. msgHandler->topic = (char *)msgHandler + sizeof(RyanMqttMsgHandler_t);
  209. RyanMqttMemcpy(msgHandler->topic, topic, topicLen);
  210. msgHandler->topic[topicLen] = '\0'; // 兼容旧版本
  211. *pMsgHandler = msgHandler;
  212. return RyanMqttSuccessError;
  213. }
  214. /**
  215. * @brief 销毁 msg 句柄
  216. *
  217. * @param msgHandler
  218. */
  219. void RyanMqttMsgHandlerDestroy(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
  220. {
  221. RyanMqttAssert(NULL != client);
  222. RyanMqttAssert(NULL != msgHandler);
  223. platformMemoryFree(msgHandler);
  224. }
  225. /**
  226. * @brief 查找msg句柄
  227. *
  228. * @param client
  229. * @param msgMatchCriteria
  230. * @param isTopicMatchedFlag
  231. * @param pMsgHandler
  232. * @param removeOnMatch
  233. * @return RyanMqttError_e
  234. */
  235. RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
  236. RyanMqttBool_e isTopicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler,
  237. RyanMqttBool_e removeOnMatch)
  238. {
  239. RyanMqttError_e result = RyanMqttSuccessError;
  240. RyanMqttList_t *curr, *next;
  241. RyanMqttMsgHandler_t *msgHandler;
  242. RyanMqttAssert(NULL != client);
  243. RyanMqttAssert(NULL != msgMatchCriteria);
  244. RyanMqttAssert(NULL != msgMatchCriteria->topic && 0 != msgMatchCriteria->topicLen);
  245. RyanMqttAssert(NULL != pMsgHandler);
  246. platformMutexLock(client->config.userData, &client->msgHandleLock);
  247. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  248. {
  249. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  250. if (RyanMqttFalse == RyanMqttMsgTopicIsMatch(msgHandler, msgMatchCriteria->topic,
  251. msgMatchCriteria->topicLen, isTopicMatchedFlag))
  252. {
  253. continue;
  254. }
  255. if (RyanMqttTrue == removeOnMatch)
  256. {
  257. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  258. }
  259. *pMsgHandler = msgHandler;
  260. result = RyanMqttSuccessError;
  261. goto __exit;
  262. }
  263. result = RyanMqttNoRescourceError;
  264. __exit:
  265. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  266. return result;
  267. }
  268. /**
  269. * @brief 查找同名进行删除,并根据skipSamePacketId查找packetid一样和不一样的进行删除后
  270. * packetid不一样保证msg列表只有一个, packetid一样清除所有同名同packetid msg。用于清空批量订阅和取消订阅
  271. *
  272. * @param client
  273. * @param msgMatchCriteria
  274. * @param skipSamePacketId
  275. */
  276. void RyanMqttMsgHandlerFindAndDestroyByPacketId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
  277. RyanMqttBool_e skipSamePacketId)
  278. {
  279. RyanMqttList_t *curr, *next;
  280. RyanMqttMsgHandler_t *msgHandler;
  281. RyanMqttAssert(NULL != client);
  282. RyanMqttAssert(NULL != msgMatchCriteria);
  283. RyanMqttAssert(NULL != msgMatchCriteria->topic && 0 != msgMatchCriteria->topicLen);
  284. platformMutexLock(client->config.userData, &client->msgHandleLock);
  285. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  286. {
  287. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  288. if (RyanMqttFalse == skipSamePacketId)
  289. {
  290. if (msgHandler->packetId != msgMatchCriteria->packetId)
  291. {
  292. continue;
  293. }
  294. }
  295. else
  296. {
  297. if (msgHandler->packetId == msgMatchCriteria->packetId)
  298. {
  299. continue;
  300. }
  301. }
  302. if (RyanMqttFalse == RyanMqttMsgTopicIsMatch(msgHandler, msgMatchCriteria->topic,
  303. msgMatchCriteria->topicLen, RyanMqttFalse))
  304. {
  305. continue;
  306. }
  307. // 到这里就是同名订阅了,直接删除
  308. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  309. RyanMqttMsgHandlerDestroy(client, msgHandler);
  310. // ?理论上最多只会有一个同名订阅,或许也不好说? 比如订阅的时侯数组内部有同名的?
  311. // break;
  312. }
  313. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  314. }
  315. /**
  316. * @brief 将msg句柄存入client msg链表
  317. *
  318. * @param client
  319. * @param msgHandler
  320. * @return int32_t
  321. */
  322. RyanMqttError_e RyanMqttMsgHandlerAddToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
  323. {
  324. RyanMqttAssert(NULL != client);
  325. RyanMqttAssert(NULL != msgHandler);
  326. platformMutexLock(client->config.userData, &client->msgHandleLock);
  327. RyanMqttListAddTail(&msgHandler->list, &client->msgHandlerList); // 将msgHandler节点添加到链表尾部
  328. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  329. return RyanMqttSuccessError;
  330. }
  331. /**
  332. * @brief 从client msg链表中移除msg句柄
  333. *
  334. * @param client
  335. * @param msgHandler
  336. * @return int32_t
  337. */
  338. RyanMqttError_e RyanMqttMsgHandlerRemoveToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
  339. {
  340. RyanMqttAssert(NULL != client);
  341. RyanMqttAssert(NULL != msgHandler);
  342. platformMutexLock(client->config.userData, &client->msgHandleLock);
  343. RyanMqttListDel(&msgHandler->list);
  344. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  345. return RyanMqttSuccessError;
  346. }