RyanMqttSubTest.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #include "RyanMqttTest.h"
  2. static RyanMqttSubscribeData_t *subscribeManyData = NULL;
  3. static int32_t subTestCount = 0;
  4. static RyanMqttSubscribeData_t *topicIsSubscribeArr(char *topic, uint32_t topicLen)
  5. {
  6. if (NULL == topic)
  7. {
  8. return NULL;
  9. }
  10. for (int32_t i = 0; i < subTestCount; i++)
  11. {
  12. if (0 == RyanMqttStrncmp(topic, subscribeManyData[i].topic, topicLen))
  13. {
  14. return &subscribeManyData[i];
  15. }
  16. }
  17. return NULL;
  18. }
  19. static void RyanMqttSubEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  20. {
  21. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  22. switch (event)
  23. {
  24. case RyanMqttEventSubscribed: {
  25. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  26. RyanMqttLog_i("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  27. RyanMqttSubscribeData_t *subscribeData = topicIsSubscribeArr(msgHandler->topic, msgHandler->topicLen);
  28. if (NULL == subscribeData)
  29. {
  30. RyanMqttLog_e("mqtt 订阅主题非法 topic: %s", msgHandler->topic);
  31. RyanMqttTestDestroyClient(client);
  32. return;
  33. }
  34. if (subscribeData->qos != msgHandler->qos)
  35. {
  36. RyanMqttLog_e("mqtt 订阅主题降级 topic: %s, exportQos: %d, qos: %d", msgHandler->topic,
  37. subscribeData->qos, msgHandler->qos);
  38. RyanMqttTestDestroyClient(client);
  39. return;
  40. }
  41. break;
  42. }
  43. case RyanMqttEventSubscribedFailed: {
  44. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  45. RyanMqttLog_i("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  46. break;
  47. }
  48. case RyanMqttEventUnSubscribed: {
  49. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  50. RyanMqttLog_i("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  51. RyanMqttSubscribeData_t *subscribeData = topicIsSubscribeArr(msgHandler->topic, msgHandler->topicLen);
  52. if (NULL == subscribeData)
  53. {
  54. RyanMqttLog_e("mqtt 订阅主题非法 topic: %s", msgHandler->topic);
  55. RyanMqttTestDestroyClient(client);
  56. return;
  57. }
  58. if (msgHandler->qos != RyanMqttSubFail && subscribeData->qos != msgHandler->qos)
  59. {
  60. RyanMqttLog_e("mqtt 取消订阅主题信息不对 topic: %s, exportQos: %d, qos: %d", msgHandler->topic,
  61. subscribeData->qos, msgHandler->qos);
  62. RyanMqttTestDestroyClient(client);
  63. return;
  64. }
  65. break;
  66. }
  67. case RyanMqttEventUnSubscribedFailed: {
  68. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  69. RyanMqttLog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  70. break;
  71. }
  72. default: mqttEventBaseHandle(pclient, event, eventData); break;
  73. }
  74. }
  75. static RyanMqttError_e RyanMqttSubscribeCheckMsgHandle(RyanMqttClient_t *client)
  76. {
  77. RyanMqttError_e result = RyanMqttSuccessError;
  78. int32_t subscribeNum = 0;
  79. RyanMqttMsgHandler_t *msgHandles = NULL;
  80. delay(100);
  81. for (int32_t i = 0; i < 600; i++)
  82. {
  83. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  84. if (RyanMqttSuccessError != result)
  85. {
  86. RyanMqttLog_e("获取订阅主题数失败!!!");
  87. }
  88. else
  89. {
  90. for (int32_t j = 0; j < subscribeNum; j++)
  91. {
  92. RyanMqttCheckCodeNoReturn(
  93. NULL != msgHandles[j].topic &&
  94. RyanMqttStrlen(msgHandles[j].topic) == msgHandles[j].topicLen,
  95. RyanMqttFailedError, RyanMqttLog_e, {
  96. RyanMqttLog_e("topic: %s, topicLen: %d, topicLen2: %d",
  97. msgHandles[j].topic, RyanMqttStrlen(msgHandles[j].topic),
  98. msgHandles[j].topicLen);
  99. result = RyanMqttFailedError;
  100. goto __exit;
  101. });
  102. }
  103. RyanMqttLog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, subTestCount);
  104. // for (int32_t i = 0; i < subscribeNum; i++)
  105. // RyanMqttLog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic,
  106. // msgHandles[i].qos);
  107. int32_t subscribeTotalCount = 0;
  108. RyanMqttGetSubscribeTotalCount(client, &subscribeTotalCount);
  109. if (subscribeNum == subTestCount && subscribeTotalCount == subTestCount)
  110. {
  111. break;
  112. }
  113. }
  114. if (i > 500)
  115. {
  116. result = RyanMqttFailedError;
  117. goto __exit;
  118. }
  119. if (subscribeNum > 0)
  120. {
  121. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  122. msgHandles = NULL;
  123. }
  124. delay(100);
  125. }
  126. // 检查订阅主题是否正确
  127. for (int32_t i = 0; i < subscribeNum; i++)
  128. {
  129. if (NULL == topicIsSubscribeArr(msgHandles[i].topic, msgHandles[i].topicLen))
  130. {
  131. RyanMqttLog_e("主题不匹配或者qos不对, topic: %s, qos: %d", msgHandles[i].topic,
  132. msgHandles[i].qos);
  133. result = RyanMqttFailedError;
  134. goto __exit;
  135. }
  136. }
  137. __exit:
  138. if (NULL != msgHandles)
  139. {
  140. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  141. }
  142. return result;
  143. }
  144. static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count, int32_t testCount)
  145. {
  146. RyanMqttError_e result = RyanMqttSuccessError;
  147. RyanMqttClient_t *client;
  148. RyanMqttUnSubscribeData_t *unSubscribeManyData = NULL;
  149. subTestCount = count;
  150. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttSubEventHandle, NULL);
  151. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  152. // 生成需要订阅的主题数据
  153. {
  154. subscribeManyData =
  155. (RyanMqttSubscribeData_t *)malloc(sizeof(RyanMqttSubscribeData_t) * count);
  156. if (NULL == subscribeManyData)
  157. {
  158. RyanMqttLog_e("内存不足");
  159. result = RyanMqttNotEnoughMemError;
  160. goto __exit;
  161. }
  162. for (int32_t i = 0; i < count; i++)
  163. {
  164. subscribeManyData[i].qos = i % 3;
  165. char *topic = (char *)malloc(32);
  166. if (NULL == topic)
  167. {
  168. RyanMqttLog_e("内存不足");
  169. result = RyanMqttNotEnoughMemError;
  170. goto __exit;
  171. }
  172. RyanMqttSnprintf(topic, 32, "test/subscribe/%d", i);
  173. subscribeManyData[i].topic = topic;
  174. subscribeManyData[i].topicLen = RyanMqttStrlen(topic);
  175. }
  176. }
  177. // 生成取消所有订阅消息
  178. unSubscribeManyData = malloc(sizeof(RyanMqttUnSubscribeData_t) * count);
  179. if (NULL == unSubscribeManyData)
  180. {
  181. RyanMqttLog_e("内存不足");
  182. result = RyanMqttNotEnoughMemError;
  183. goto __exit;
  184. }
  185. for (int32_t i = 0; i < count; i++)
  186. {
  187. char *topic = (char *)malloc(32);
  188. if (NULL == topic)
  189. {
  190. RyanMqttLog_e("内存不足");
  191. result = RyanMqttNotEnoughMemError;
  192. goto __exit;
  193. }
  194. RyanMqttSnprintf(topic, 32, "test/subscribe/%d", i);
  195. unSubscribeManyData[i].topic = topic;
  196. unSubscribeManyData[i].topicLen = RyanMqttStrlen(topic);
  197. }
  198. for (int32_t testCount2 = 0; testCount2 < testCount; testCount2++)
  199. {
  200. // 订阅全部主题
  201. result = RyanMqttSubscribeMany(client, count - 1, subscribeManyData);
  202. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  203. { goto __exit; });
  204. result =
  205. RyanMqttSubscribe(client, subscribeManyData[count - 1].topic, subscribeManyData[count - 1].qos);
  206. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  207. { goto __exit; });
  208. result = RyanMqttSubscribeCheckMsgHandle(client);
  209. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  210. { goto __exit; });
  211. // // delay(10);
  212. // 测试重复订阅,不修改qos等级
  213. result = RyanMqttSubscribeMany(client, count / 2, subscribeManyData);
  214. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  215. { goto __exit; });
  216. result = RyanMqttSubscribeCheckMsgHandle(client);
  217. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  218. { goto __exit; });
  219. // 测试重复订阅并且修改qos等级
  220. for (int32_t i = count; i > 0; i--)
  221. {
  222. subscribeManyData[count - i].qos = i % 3;
  223. }
  224. result = RyanMqttSubscribeMany(client, count, subscribeManyData);
  225. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  226. { goto __exit; });
  227. result = RyanMqttSubscribeCheckMsgHandle(client);
  228. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  229. { goto __exit; });
  230. // 测试取消订阅
  231. result = RyanMqttUnSubscribeMany(client, count - 1, unSubscribeManyData);
  232. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  233. { goto __exit; });
  234. result = RyanMqttUnSubscribe(client, unSubscribeManyData[count - 1].topic);
  235. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  236. { goto __exit; });
  237. // !emqx服务器有时候会误判新的订阅请求为重复订阅主题,导致订阅失败
  238. // 重复取消订阅主题
  239. result = RyanMqttUnSubscribeMany(client, count / 2, unSubscribeManyData);
  240. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  241. { goto __exit; });
  242. for (int32_t i = 0; i < 600; i++)
  243. {
  244. delay(100);
  245. int32_t subscribeNum = 0;
  246. RyanMqttGetSubscribeTotalCount(client, &subscribeNum);
  247. if (0 == subscribeNum)
  248. {
  249. break;
  250. }
  251. if (i > 500)
  252. {
  253. result = RyanMqttFailedError;
  254. goto __exit;
  255. }
  256. }
  257. // 有重复取消订阅主题,增加延时,防止emqx服务器误判新的订阅请求
  258. delay(100);
  259. }
  260. result = checkAckList(client);
  261. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  262. __exit:
  263. // 删除
  264. for (int32_t i = 0; i < count; i++)
  265. {
  266. if (NULL != subscribeManyData && NULL != subscribeManyData[i].topic)
  267. {
  268. free(subscribeManyData[i].topic);
  269. }
  270. if (NULL != unSubscribeManyData && NULL != unSubscribeManyData[i].topic)
  271. {
  272. free(unSubscribeManyData[i].topic);
  273. }
  274. }
  275. if (NULL != subscribeManyData)
  276. {
  277. free(subscribeManyData);
  278. }
  279. if (NULL != unSubscribeManyData)
  280. {
  281. free(unSubscribeManyData);
  282. }
  283. RyanMqttLog_i("mqtt 订阅测试,销毁mqtt客户端");
  284. RyanMqttTestDestroyClient(client);
  285. return result;
  286. }
  287. RyanMqttError_e RyanMqttSubTest(void)
  288. {
  289. RyanMqttError_e result = RyanMqttSuccessError;
  290. result = RyanMqttSubscribeHybridTest(1000, 3);
  291. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  292. checkMemory;
  293. return RyanMqttSuccessError;
  294. __exit:
  295. return RyanMqttFailedError;
  296. }