RyanMqttSubTest.c 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. #include "RyanMqttTest.h"
  2. static RyanMqttSubscribeData_t *subscribeManyData = NULL;
  3. static int32_t subTestCount = 0;
  4. static RyanMqttSubscribeData_t *topicIsSubscribeArr(char *topic)
  5. {
  6. for (int32_t i = 0; i < subTestCount; i++)
  7. {
  8. if (0 == strcmp(topic, subscribeManyData[i].topic))
  9. {
  10. return &subscribeManyData[i];
  11. }
  12. }
  13. return NULL;
  14. }
  15. static void RyanMqttSubEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  16. {
  17. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  18. switch (event)
  19. {
  20. case RyanMqttEventSubscribed: {
  21. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  22. rlog_i("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  23. RyanMqttSubscribeData_t *subscribeData = topicIsSubscribeArr(msgHandler->topic);
  24. if (NULL == subscribeData)
  25. {
  26. rlog_e("mqtt 订阅主题非法 topic: %s", msgHandler->topic);
  27. RyanMqttDestroy(client);
  28. }
  29. if (subscribeData->qos != msgHandler->qos)
  30. {
  31. rlog_e("mqtt 订阅主题降级 topic: %s, exportQos: %d, qos: %d", msgHandler->topic,
  32. subscribeData->qos, msgHandler->qos);
  33. RyanMqttDestroy(client);
  34. }
  35. break;
  36. }
  37. case RyanMqttEventSubscribedFaile: {
  38. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  39. rlog_i("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  40. break;
  41. }
  42. case RyanMqttEventUnSubscribed: {
  43. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  44. rlog_i("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  45. RyanMqttSubscribeData_t *subscribeData = topicIsSubscribeArr(msgHandler->topic);
  46. if (NULL == subscribeData)
  47. {
  48. rlog_e("mqtt 订阅主题非法 topic: %s", msgHandler->topic);
  49. }
  50. if (subscribeData->qos != msgHandler->qos)
  51. {
  52. rlog_e("mqtt 取消订阅主题信息不对 topic: %s, exportQos: %d, qos: %d", msgHandler->topic,
  53. subscribeData->qos, msgHandler->qos);
  54. RyanMqttDestroy(client);
  55. }
  56. break;
  57. }
  58. case RyanMqttEventUnSubscribedFaile: {
  59. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  60. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  61. break;
  62. }
  63. default: mqttEventBaseHandle(pclient, event, eventData); break;
  64. }
  65. }
  66. static RyanMqttError_e RyanMqttSubscribeCheckMsgHandle(RyanMqttClient_t *client)
  67. {
  68. RyanMqttError_e result = RyanMqttSuccessError;
  69. int32_t subscribeNum = 0;
  70. RyanMqttMsgHandler_t *msgHandles = NULL;
  71. delay(100);
  72. for (int32_t i = 0; i < 600; i++)
  73. {
  74. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  75. if (RyanMqttSuccessError != result)
  76. {
  77. if (RyanMqttNoRescourceError == result)
  78. {
  79. rlog_w("没有订阅的主题");
  80. }
  81. else
  82. {
  83. rlog_e("获取订阅主题数失败!!!");
  84. }
  85. }
  86. else
  87. {
  88. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, subTestCount);
  89. // for (int32_t i = 0; i < subscribeNum; i++)
  90. // rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic,
  91. // msgHandles[i].qos);
  92. int32_t subscribeTotalCount = 0;
  93. RyanMqttGetSubscribeTotalCount(client, &subscribeTotalCount);
  94. if (subscribeNum == subTestCount && subscribeTotalCount == subTestCount)
  95. {
  96. break;
  97. }
  98. }
  99. if (i > 500)
  100. {
  101. result = RyanMqttFailedError;
  102. goto __exit;
  103. }
  104. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  105. msgHandles = NULL;
  106. delay(100);
  107. }
  108. // 检查订阅主题是否正确
  109. for (int32_t i = 0; i < subscribeNum; i++)
  110. {
  111. if (NULL == topicIsSubscribeArr(msgHandles[i].topic))
  112. {
  113. rlog_e("主题不匹配或者qos不对, topic: %s, qos: %d", msgHandles[i].topic, msgHandles[i].qos);
  114. result = RyanMqttFailedError;
  115. goto __exit;
  116. }
  117. }
  118. __exit:
  119. if (NULL != msgHandles)
  120. {
  121. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  122. }
  123. return result;
  124. }
  125. static RyanMqttError_e RyanMqttSubscribeTest(int32_t count)
  126. {
  127. RyanMqttError_e result = RyanMqttSuccessError;
  128. RyanMqttClient_t *client;
  129. RyanMqttUnSubscribeData_t *unSubscribeManyData = NULL;
  130. subTestCount = count;
  131. RyanMqttInitSync(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttSubEventHandle);
  132. subscribeManyData = (RyanMqttSubscribeData_t *)malloc(sizeof(RyanMqttSubscribeData_t) * count);
  133. if (NULL == subscribeManyData)
  134. {
  135. rlog_e("内存不足");
  136. return RyanMqttNotEnoughMemError;
  137. }
  138. for (int32_t i = 0; i < count; i++)
  139. {
  140. subscribeManyData[i].qos = i % 3;
  141. char *topic = (char *)malloc(64);
  142. if (NULL == topic)
  143. {
  144. rlog_e("内存不足");
  145. return RyanMqttNotEnoughMemError;
  146. }
  147. snprintf(topic, 64, "test/subscribe/%d", i);
  148. subscribeManyData[i].topic = topic;
  149. subscribeManyData[i].topicLen = strlen(topic);
  150. }
  151. // 订阅全部主题
  152. RyanMqttSubscribeMany(client, count - 1, subscribeManyData);
  153. RyanMqttSubscribe(client, subscribeManyData[count - 1].topic, subscribeManyData[count - 1].qos);
  154. result = RyanMqttSubscribeCheckMsgHandle(client);
  155. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  156. // 测试重复订阅,不修改qos等级
  157. RyanMqttSubscribeMany(client, count / 2, subscribeManyData);
  158. result = RyanMqttSubscribeCheckMsgHandle(client);
  159. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  160. // 测试重复订阅并且修改qos等级
  161. for (int32_t i = count; i > 0; i--)
  162. {
  163. subscribeManyData[count - i].qos = i % 3;
  164. }
  165. RyanMqttSubscribeMany(client, count, subscribeManyData);
  166. result = RyanMqttSubscribeCheckMsgHandle(client);
  167. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  168. // 测试取消所有订阅消息
  169. unSubscribeManyData = malloc(sizeof(RyanMqttUnSubscribeData_t) * count);
  170. if (NULL == unSubscribeManyData)
  171. {
  172. rlog_e("内存不足");
  173. return RyanMqttNotEnoughMemError;
  174. }
  175. for (int32_t i = 0; i < count; i++)
  176. {
  177. char *topic = (char *)malloc(64);
  178. if (NULL == topic)
  179. {
  180. rlog_e("内存不足");
  181. return RyanMqttNotEnoughMemError;
  182. }
  183. snprintf(topic, 64, "test/subscribe/%d", i);
  184. unSubscribeManyData[i].topic = topic;
  185. unSubscribeManyData[i].topicLen = strlen(topic);
  186. }
  187. RyanMqttUnSubscribeMany(client, count - 1, unSubscribeManyData);
  188. RyanMqttUnSubscribe(client, unSubscribeManyData[count - 1].topic);
  189. // 重复取消订阅主题
  190. RyanMqttUnSubscribeMany(client, count / 2, unSubscribeManyData);
  191. delay(100);
  192. for (int32_t i = 0; i < 600; i++)
  193. {
  194. RyanMqttMsgHandler_t *msgHandles = NULL;
  195. int32_t subscribeNum = 0;
  196. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  197. if (RyanMqttSuccessError != result)
  198. {
  199. if (RyanMqttNoRescourceError == result)
  200. {
  201. rlog_w("没有订阅的主题");
  202. }
  203. else
  204. {
  205. rlog_e("获取订阅主题数失败!!!");
  206. }
  207. }
  208. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  209. // result = RyanMqttGetSubscribe(client, msgHandles, count, &subscribeNum);
  210. // if (RyanMqttNoRescourceError == result)
  211. // {
  212. // rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", count);
  213. // }
  214. if (0 == subscribeNum)
  215. {
  216. break;
  217. }
  218. if (i > 500)
  219. {
  220. result = RyanMqttFailedError;
  221. goto __exit;
  222. }
  223. delay(100);
  224. }
  225. result = checkAckList(client);
  226. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  227. __exit:
  228. // 删除
  229. for (int32_t i = 0; i < count; i++)
  230. {
  231. if (NULL != subscribeManyData)
  232. {
  233. free(subscribeManyData[i].topic);
  234. }
  235. if (NULL != unSubscribeManyData)
  236. {
  237. free(unSubscribeManyData[i].topic);
  238. }
  239. }
  240. if (NULL != subscribeManyData)
  241. {
  242. free(subscribeManyData);
  243. }
  244. if (NULL != unSubscribeManyData)
  245. {
  246. free(unSubscribeManyData);
  247. }
  248. rlog_i("mqtt 订阅测试,销毁mqtt客户端");
  249. RyanMqttDestorySync(client);
  250. return result;
  251. }
  252. RyanMqttError_e RyanMqttSubTest(void)
  253. {
  254. RyanMqttError_e result = RyanMqttSuccessError;
  255. result = RyanMqttSubscribeTest(1000);
  256. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  257. checkMemory;
  258. return RyanMqttSuccessError;
  259. __exit:
  260. return RyanMqttFailedError;
  261. }