RyanMqttPubTest.c 10 KB


  1. #include "RyanMqttTest.h"
  2. static int32_t pubTestPublishedEventCount = 0;
  3. static int32_t pubTestDataEventCount = 0;
  4. static char *pubStr = NULL;
  5. static int32_t pubStrLen = 0;
  6. static char *pubStr2 = "a";
  7. static int32_t pubStr2Len = 1;
  8. static RyanMqttQos_e exportQos = RyanMqttSubFail;
  9. static int32_t pubTestDataEventCountNotQos0 = 0;
  10. static void RyanMqttPublishEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  11. {
  12. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  13. switch (event)
  14. {
  15. case RyanMqttEventPublished: {
  16. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  17. RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  18. RyanMqttQos_e qos = (RyanMqttQos_e)(uintptr_t)msgHandler->userData;
  19. if (qos == msgHandler->qos)
  20. {
  21. pubTestPublishedEventCount++;
  22. }
  23. else
  24. {
  25. RyanMqttLog_e("pub测试发送的qos不一致 msgQos: %d, userDataQos: %d", msgHandler->qos, qos);
  26. RyanMqttTestDestroyClient(client);
  27. return;
  28. }
  29. break;
  30. }
  31. case RyanMqttEventData: {
  32. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  33. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  34. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  35. if (RyanMqttSubFail != exportQos && exportQos != msgData->qos)
  36. {
  37. RyanMqttLog_e("pub测试收到qos等级不一致 exportQos: %d, msgQos: %d", exportQos, msgData->qos);
  38. RyanMqttTestDestroyClient(client);
  39. return;
  40. }
  41. if (((msgData->payloadLen == (uint32_t)pubStrLen) &&
  42. 0 == memcmp(msgData->payload, pubStr, pubStrLen)) ||
  43. ((msgData->payloadLen == (uint32_t)pubStr2Len) &&
  44. 0 == memcmp(msgData->payload, pubStr2, pubStr2Len)))
  45. {
  46. pubTestDataEventCount++;
  47. if (RyanMqttQos0 != msgData->qos)
  48. {
  49. pubTestDataEventCountNotQos0++;
  50. }
  51. }
  52. else
  53. {
  54. RyanMqttLog_e("pub测试收到数据不一致 %.*s", msgData->payloadLen, msgData->payload);
  55. RyanMqttTestDestroyClient(client);
  56. return;
  57. }
  58. break;
  59. }
  60. default: mqttEventBaseHandle(pclient, event, eventData); break;
  61. }
  62. }
  63. /**
  64. * @brief 固定qos等级发布测试
  65. *
  66. * @param qos
  67. * @param count
  68. * @param delayms
  69. * @return RyanMqttError_e
  70. */
  71. static RyanMqttError_e RyanMqttPublishTest(RyanMqttQos_e qos, int32_t count, uint32_t delayms)
  72. {
  73. #define RyanMqttPubTestPubTopic "testlinux/aa/pub"
  74. #define RyanMqttPubTestSubTopic "testlinux/+/pub"
  75. RyanMqttError_e result = RyanMqttSuccessError;
  76. RyanMqttClient_t *client;
  77. exportQos = qos;
  78. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttPublishEventHandle, NULL);
  79. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  80. // 等待订阅主题成功
  81. result = RyanMqttSubscribe(client, RyanMqttPubTestSubTopic, qos);
  82. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  83. // !不等待topic订阅成功,检查是否正常接收消息
  84. // for (int32_t i = 0;; i++)
  85. // {
  86. // int32_t subscribeTotal = 0;
  87. // result = RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  88. // RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  89. // { goto __exit; });
  90. // if (1 == subscribeTotal)
  91. // {
  92. // break;
  93. // }
  94. // if (i > 3000)
  95. // {
  96. // RyanMqttLog_e("订阅主题失败");
  97. // result = RyanMqttFailedError;
  98. // goto __exit;
  99. // }
  100. // delay(1);
  101. // }
  102. // 生成随机的数据包大小
  103. {
  104. pubStr = (char *)malloc(2048);
  105. RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
  106. RyanMqttMemset(pubStr, 0, 2048);
  107. for (uint32_t i = 0; i < RyanRand(100, 220); i++)
  108. {
  109. snprintf(pubStr + 4 * i, 2048 - 4 * i, "%c%c%c%c", (char)RyanRand(32, 126),
  110. (char)RyanRand(32, 126), (char)RyanRand(32, 126), (char)RyanRand(32, 126));
  111. }
  112. pubStrLen = (int32_t)RyanMqttStrlen(pubStr);
  113. }
  114. // 开始测试发送数据
  115. pubTestPublishedEventCount = 0;
  116. pubTestDataEventCount = 0;
  117. for (int32_t i = 0; i < count; i++)
  118. {
  119. char *pubTopic = RyanMqttPubTestPubTopic;
  120. uint32_t randNumber = RyanRand(1, 10);
  121. result = RyanMqttPublishWithUserData(client, pubTopic, RyanMqttStrlen(pubTopic),
  122. (0 == i % randNumber) ? pubStr2 : pubStr,
  123. (0 == i % randNumber) ? pubStr2Len : pubStrLen, qos, RyanMqttFalse,
  124. // NOLINTNEXTLINE(performance-no-int-to-ptr)
  125. (void *)qos);
  126. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  127. { goto __exit; });
  128. if (delayms)
  129. {
  130. delay(delayms);
  131. }
  132. }
  133. // 检查收到的数据是否正确
  134. for (int32_t i = 0;; i++)
  135. {
  136. if (RyanMqttQos0 == qos)
  137. {
  138. if (count == pubTestDataEventCount)
  139. {
  140. break;
  141. }
  142. }
  143. // qos1和qos2的发送成功数和接收数应该相等,因为是本机回环测试,理论上应该一致
  144. else if (pubTestPublishedEventCount == count && pubTestPublishedEventCount == pubTestDataEventCount)
  145. {
  146. break;
  147. }
  148. if (i > 300)
  149. {
  150. RyanMqttLog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos,
  151. pubTestPublishedEventCount, pubTestDataEventCount);
  152. result = RyanMqttFailedError;
  153. goto __exit;
  154. }
  155. delay(100);
  156. }
  157. result = RyanMqttUnSubscribe(client, RyanMqttPubTestSubTopic);
  158. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  159. result = checkAckList(client);
  160. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  161. __exit:
  162. free(pubStr);
  163. pubStr = NULL;
  164. RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端");
  165. RyanMqttTestDestroyClient(client);
  166. return result;
  167. }
  168. /**
  169. * @brief 混乱随机的qos消息测试
  170. *
  171. * @param count
  172. * @param delayms
  173. * @return RyanMqttError_e
  174. */
  175. static RyanMqttError_e RyanMqttPublishHybridTest(int32_t count, uint32_t delayms)
  176. {
  177. #define RyanMqttPubHybridTestPubTopic "testlinux/aa/pub/adfa/kkk"
  178. #define RyanMqttPubHybridTestSubTopic "testlinux/aa/+/#"
  179. RyanMqttError_e result = RyanMqttSuccessError;
  180. RyanMqttClient_t *client;
  181. int32_t sendNeedAckCount = 0;
  182. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttPublishEventHandle, NULL);
  183. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  184. // 等待订阅主题成功
  185. result = RyanMqttSubscribe(client, RyanMqttPubHybridTestSubTopic, RyanMqttQos2);
  186. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  187. for (int32_t i = 0;; i++)
  188. {
  189. int32_t subscribeTotal = 0;
  190. result = RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  191. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  192. { goto __exit; });
  193. if (1 == subscribeTotal)
  194. {
  195. break;
  196. }
  197. if (i > 3000)
  198. {
  199. RyanMqttLog_e("订阅主题失败");
  200. result = RyanMqttFailedError;
  201. goto __exit;
  202. }
  203. delay(1);
  204. }
  205. exportQos = RyanMqttSubFail;
  206. // 生成随机的数据包大小
  207. {
  208. pubStr = (char *)malloc(2048);
  209. RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
  210. RyanMqttMemset(pubStr, 0, 2048);
  211. for (uint32_t i = 0; i < RyanRand(100, 220); i++)
  212. {
  213. snprintf(pubStr + 4 * i, 2048 - 4 * i, "%c%c%c%c", (char)RyanRand(32, 126),
  214. (char)RyanRand(32, 126), (char)RyanRand(32, 126), (char)RyanRand(32, 126));
  215. }
  216. pubStrLen = (int32_t)RyanMqttStrlen(pubStr);
  217. }
  218. pubTestPublishedEventCount = 0;
  219. pubTestDataEventCount = 0;
  220. pubTestDataEventCountNotQos0 = 0;
  221. for (int32_t i = 0; i < count; i++)
  222. {
  223. char *pubTopic = RyanMqttPubHybridTestPubTopic;
  224. uint32_t randNumber = RyanRand(1, 10);
  225. result = RyanMqttPublishWithUserData(
  226. client, pubTopic, RyanMqttStrlen(pubTopic), (0 == i % randNumber) ? pubStr2 : pubStr,
  227. (0 == i % randNumber) ? pubStr2Len : pubStrLen, i % 3, RyanMqttFalse,
  228. // NOLINTNEXTLINE(clang-diagnostic-int-to-void-pointer-cast,performance-no-int-to-ptr)
  229. (void *)(uintptr_t)(i % 3));
  230. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  231. { goto __exit; });
  232. if (RyanMqttQos0 != i % 3)
  233. {
  234. sendNeedAckCount++;
  235. }
  236. if (delayms)
  237. {
  238. delay(delayms);
  239. }
  240. }
  241. // 检查收到的数据是否正确
  242. for (int32_t i = 0;; i++)
  243. {
  244. // 发送成功数等于需要ack数,同时也等于接收到非qos0的数
  245. if (pubTestPublishedEventCount == sendNeedAckCount &&
  246. pubTestPublishedEventCount == pubTestDataEventCountNotQos0)
  247. {
  248. break;
  249. }
  250. if (i > 300)
  251. {
  252. RyanMqttLog_e("QOS测试失败, PublishedEventCount: %d, pubTestDataEventCountNotQos0: %d",
  253. pubTestPublishedEventCount, pubTestDataEventCountNotQos0);
  254. result = RyanMqttFailedError;
  255. goto __exit;
  256. }
  257. delay(100);
  258. }
  259. result = RyanMqttUnSubscribe(client, RyanMqttPubHybridTestSubTopic);
  260. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  261. result = checkAckList(client);
  262. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  263. __exit:
  264. free(pubStr);
  265. pubStr = NULL;
  266. RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端");
  267. RyanMqttTestDestroyClient(client);
  268. return result;
  269. }
  270. RyanMqttError_e RyanMqttPubTest(void)
  271. {
  272. RyanMqttError_e result = RyanMqttSuccessError;
  273. // 发布 & 订阅 qos 测试
  274. result = RyanMqttPublishTest(RyanMqttQos0, 1000, 0);
  275. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  276. checkMemory;
  277. result = RyanMqttPublishTest(RyanMqttQos1, 1000, 1);
  278. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  279. checkMemory;
  280. result = RyanMqttPublishTest(RyanMqttQos2, 1000, 1);
  281. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  282. checkMemory;
  283. result = RyanMqttPublishHybridTest(1000, 1);
  284. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  285. checkMemory;
  286. return RyanMqttSuccessError;
  287. __exit:
  288. return RyanMqttFailedError;
  289. }