RyanMqttSubTest.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. #include "RyanMqttTest.h"
  2. static RyanMqttSubscribeData_t *subscribeManyData = NULL;
  3. static int32_t subTestCount = 0;
  4. static RyanMqttSubscribeData_t *topicIsSubscribeArr(char *topic)
  5. {
  6. for (int32_t i = 0; i < subTestCount; i++)
  7. {
  8. if (0 == RyanMqttStrcmp(topic, subscribeManyData[i].topic))
  9. {
  10. return &subscribeManyData[i];
  11. }
  12. }
  13. return NULL;
  14. }
  15. static void RyanMqttSubEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  16. {
  17. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  18. switch (event)
  19. {
  20. case RyanMqttEventSubscribed: {
  21. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  22. RyanMqttLog_i("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  23. RyanMqttSubscribeData_t *subscribeData = topicIsSubscribeArr(msgHandler->topic);
  24. if (NULL == subscribeData)
  25. {
  26. RyanMqttLog_e("mqtt 订阅主题非法 topic: %s", msgHandler->topic);
  27. RyanMqttTestDestroyClient(client);
  28. return;
  29. }
  30. if (subscribeData->qos != msgHandler->qos)
  31. {
  32. RyanMqttLog_e("mqtt 订阅主题降级 topic: %s, exportQos: %d, qos: %d", msgHandler->topic,
  33. subscribeData->qos, msgHandler->qos);
  34. RyanMqttTestDestroyClient(client);
  35. return;
  36. }
  37. break;
  38. }
  39. case RyanMqttEventSubscribedFailed: {
  40. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  41. RyanMqttLog_i("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  42. break;
  43. }
  44. case RyanMqttEventUnSubscribed: {
  45. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  46. RyanMqttLog_i("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  47. RyanMqttSubscribeData_t *subscribeData = topicIsSubscribeArr(msgHandler->topic);
  48. if (NULL == subscribeData)
  49. {
  50. RyanMqttLog_e("mqtt 订阅主题非法 topic: %s", msgHandler->topic);
  51. RyanMqttTestDestroyClient(client);
  52. return;
  53. }
  54. if (subscribeData->qos != msgHandler->qos)
  55. {
  56. RyanMqttLog_e("mqtt 取消订阅主题信息不对 topic: %s, exportQos: %d, qos: %d", msgHandler->topic,
  57. subscribeData->qos, msgHandler->qos);
  58. RyanMqttTestDestroyClient(client);
  59. return;
  60. }
  61. break;
  62. }
  63. case RyanMqttEventUnSubscribedFailed: {
  64. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  65. RyanMqttLog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  66. break;
  67. }
  68. default: mqttEventBaseHandle(pclient, event, eventData); break;
  69. }
  70. }
  71. static RyanMqttError_e RyanMqttSubscribeCheckMsgHandle(RyanMqttClient_t *client)
  72. {
  73. RyanMqttError_e result = RyanMqttSuccessError;
  74. int32_t subscribeNum = 0;
  75. RyanMqttMsgHandler_t *msgHandles = NULL;
  76. delay(100);
  77. for (int32_t i = 0; i < 600; i++)
  78. {
  79. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  80. if (RyanMqttSuccessError != result)
  81. {
  82. RyanMqttLog_e("获取订阅主题数失败!!!");
  83. }
  84. else
  85. {
  86. RyanMqttLog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, subTestCount);
  87. // for (int32_t i = 0; i < subscribeNum; i++)
  88. // RyanMqttLog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic,
  89. // msgHandles[i].qos);
  90. int32_t subscribeTotalCount = 0;
  91. RyanMqttGetSubscribeTotalCount(client, &subscribeTotalCount);
  92. if (subscribeNum == subTestCount && subscribeTotalCount == subTestCount)
  93. {
  94. break;
  95. }
  96. }
  97. if (i > 500)
  98. {
  99. result = RyanMqttFailedError;
  100. goto __exit;
  101. }
  102. if (subscribeNum > 0)
  103. {
  104. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  105. msgHandles = NULL;
  106. }
  107. delay(100);
  108. }
  109. // 检查订阅主题是否正确
  110. for (int32_t i = 0; i < subscribeNum; i++)
  111. {
  112. if (NULL == topicIsSubscribeArr(msgHandles[i].topic))
  113. {
  114. RyanMqttLog_e("主题不匹配或者qos不对, topic: %s, qos: %d", msgHandles[i].topic,
  115. msgHandles[i].qos);
  116. result = RyanMqttFailedError;
  117. goto __exit;
  118. }
  119. }
  120. __exit:
  121. if (NULL != msgHandles)
  122. {
  123. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  124. }
  125. return result;
  126. }
  127. static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
  128. {
  129. RyanMqttError_e result = RyanMqttSuccessError;
  130. RyanMqttClient_t *client;
  131. RyanMqttUnSubscribeData_t *unSubscribeManyData = NULL;
  132. subTestCount = count;
  133. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttSubEventHandle, NULL);
  134. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  135. // 生成需要订阅的主题数据
  136. {
  137. subscribeManyData = (RyanMqttSubscribeData_t *)malloc(sizeof(RyanMqttSubscribeData_t) * count);
  138. if (NULL == subscribeManyData)
  139. {
  140. RyanMqttLog_e("内存不足");
  141. result = RyanMqttNotEnoughMemError;
  142. goto __exit;
  143. }
  144. for (int32_t i = 0; i < count; i++)
  145. {
  146. subscribeManyData[i].qos = i % 3;
  147. char *topic = (char *)malloc(64);
  148. if (NULL == topic)
  149. {
  150. RyanMqttLog_e("内存不足");
  151. result = RyanMqttNotEnoughMemError;
  152. goto __exit;
  153. }
  154. snprintf(topic, 64, "test/subscribe/%d", i);
  155. subscribeManyData[i].topic = topic;
  156. subscribeManyData[i].topicLen = RyanMqttStrlen(topic);
  157. }
  158. }
  159. // 订阅全部主题
  160. result = RyanMqttSubscribeMany(client, count - 1, subscribeManyData);
  161. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  162. result = RyanMqttSubscribe(client, subscribeManyData[count - 1].topic, subscribeManyData[count - 1].qos);
  163. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  164. result = RyanMqttSubscribeCheckMsgHandle(client);
  165. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  166. // 测试重复订阅,不修改qos等级
  167. result = RyanMqttSubscribeMany(client, count / 2, subscribeManyData);
  168. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  169. result = RyanMqttSubscribeCheckMsgHandle(client);
  170. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  171. // 测试重复订阅并且修改qos等级
  172. for (int32_t i = count; i > 0; i--)
  173. {
  174. subscribeManyData[count - i].qos = i % 3;
  175. }
  176. result = RyanMqttSubscribeMany(client, count, subscribeManyData);
  177. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  178. result = RyanMqttSubscribeCheckMsgHandle(client);
  179. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  180. // 测试取消所有订阅消息
  181. unSubscribeManyData = malloc(sizeof(RyanMqttUnSubscribeData_t) * count);
  182. if (NULL == unSubscribeManyData)
  183. {
  184. RyanMqttLog_e("内存不足");
  185. result = RyanMqttNotEnoughMemError;
  186. goto __exit;
  187. }
  188. for (int32_t i = 0; i < count; i++)
  189. {
  190. char *topic = (char *)malloc(64);
  191. if (NULL == topic)
  192. {
  193. RyanMqttLog_e("内存不足");
  194. result = RyanMqttNotEnoughMemError;
  195. goto __exit;
  196. }
  197. snprintf(topic, 64, "test/subscribe/%d", i);
  198. unSubscribeManyData[i].topic = topic;
  199. unSubscribeManyData[i].topicLen = RyanMqttStrlen(topic);
  200. }
  201. result = RyanMqttUnSubscribeMany(client, count - 1, unSubscribeManyData);
  202. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  203. result = RyanMqttUnSubscribe(client, unSubscribeManyData[count - 1].topic);
  204. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  205. // 重复取消订阅主题
  206. result = RyanMqttUnSubscribeMany(client, count / 2, unSubscribeManyData);
  207. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  208. delay(100);
  209. for (int32_t i = 0; i < 600; i++)
  210. {
  211. RyanMqttMsgHandler_t *msgHandles = NULL;
  212. int32_t subscribeNum = 0;
  213. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  214. if (RyanMqttSuccessError != result)
  215. {
  216. RyanMqttLog_e("获取订阅主题数失败!!!");
  217. }
  218. if (subscribeNum > 0)
  219. {
  220. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  221. }
  222. // result = RyanMqttGetSubscribe(client, msgHandles, count, &subscribeNum);
  223. // if (RyanMqttNoRescourceError == result)
  224. // {
  225. // RyanMqttLog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", count);
  226. // }
  227. if (0 == subscribeNum)
  228. {
  229. break;
  230. }
  231. if (i > 500)
  232. {
  233. result = RyanMqttFailedError;
  234. goto __exit;
  235. }
  236. delay(100);
  237. }
  238. result = checkAckList(client);
  239. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  240. __exit:
  241. // 删除
  242. for (int32_t i = 0; i < count; i++)
  243. {
  244. if (NULL != subscribeManyData && NULL != subscribeManyData[i].topic)
  245. {
  246. free(subscribeManyData[i].topic);
  247. }
  248. if (NULL != subscribeManyData && NULL != unSubscribeManyData[i].topic)
  249. {
  250. free(unSubscribeManyData[i].topic);
  251. }
  252. }
  253. if (NULL != subscribeManyData)
  254. {
  255. free(subscribeManyData);
  256. }
  257. if (NULL != unSubscribeManyData)
  258. {
  259. free(unSubscribeManyData);
  260. }
  261. RyanMqttLog_i("mqtt 订阅测试,销毁mqtt客户端");
  262. RyanMqttTestDestroyClient(client);
  263. return result;
  264. }
  265. RyanMqttError_e RyanMqttSubTest(void)
  266. {
  267. RyanMqttError_e result = RyanMqttSuccessError;
  268. result = RyanMqttSubscribeHybridTest(1000);
  269. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  270. checkMemory;
  271. return RyanMqttSuccessError;
  272. __exit:
  273. return RyanMqttFailedError;
  274. }