RyanMqttPubTest.c 10 KB


  1. #include "RyanMqttTest.h"
  2. static int32_t pubTestPublishedEventCount = 0;
  3. static int32_t pubTestDataEventCount = 0;
  4. static char *pubStr = NULL;
  5. static int32_t pubStrLen = 0;
  6. static char *pubStr2 = "a";
  7. static int32_t pubStr2Len = 1;
  8. static RyanMqttQos_e exportQos = RyanMqttSubFail;
  9. static int32_t pubTestDataEventCountNotQos0 = 0;
  10. static void RyanMqttPublishEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  11. {
  12. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  13. switch (event)
  14. {
  15. case RyanMqttEventPublished: {
  16. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  17. RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  18. RyanMqttQos_e qos = (RyanMqttQos_e)(uintptr_t)msgHandler->userData;
  19. if (qos == msgHandler->qos)
  20. {
  21. pubTestPublishedEventCount++;
  22. }
  23. else
  24. {
  25. RyanMqttLog_e("pub测试发送的qos不一致 msgQos: %d, userDataQos: %d", msgHandler->qos, qos);
  26. RyanMqttTestDestroyClient(client);
  27. return;
  28. }
  29. break;
  30. }
  31. case RyanMqttEventData: {
  32. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  33. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  34. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  35. if (RyanMqttSubFail != exportQos && exportQos != msgData->qos)
  36. {
  37. RyanMqttLog_e("pub测试收到qos等级不一致 exportQos: %d, msgQos: %d", exportQos, msgData->qos);
  38. RyanMqttTestDestroyClient(client);
  39. return;
  40. }
  41. if (((msgData->payloadLen == (uint32_t)pubStrLen) &&
  42. 0 == memcmp(msgData->payload, pubStr, pubStrLen)) ||
  43. ((msgData->payloadLen == (uint32_t)pubStr2Len) &&
  44. 0 == memcmp(msgData->payload, pubStr2, pubStr2Len)))
  45. {
  46. pubTestDataEventCount++;
  47. if (RyanMqttQos0 != msgData->qos)
  48. {
  49. pubTestDataEventCountNotQos0++;
  50. }
  51. }
  52. else
  53. {
  54. RyanMqttLog_e("pub测试收到数据不一致 %.*s", msgData->payloadLen, msgData->payload);
  55. RyanMqttTestDestroyClient(client);
  56. return;
  57. }
  58. break;
  59. }
  60. default: mqttEventBaseHandle(pclient, event, eventData); break;
  61. }
  62. }
  63. /**
  64. * @brief 固定qos等级发布测试
  65. *
  66. * @param qos
  67. * @param count
  68. * @param delayms
  69. * @return RyanMqttError_e
  70. */
  71. static RyanMqttError_e RyanMqttPublishTest(RyanMqttQos_e qos, int32_t count, uint32_t delayms)
  72. {
  73. #define RyanMqttPubTestPubTopic "testlinux/aa/pub"
  74. #define RyanMqttPubTestSubTopic "testlinux/+/pub"
  75. RyanMqttError_e result = RyanMqttSuccessError;
  76. RyanMqttClient_t *client;
  77. exportQos = qos;
  78. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttPublishEventHandle, NULL);
  79. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  80. // 等待订阅主题成功
  81. result = RyanMqttSubscribe(client, RyanMqttPubTestSubTopic, qos);
  82. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  83. // !不等待topic订阅成功,检查是否正常接收消息
  84. // for (int32_t i = 0;; i++)
  85. // {
  86. // int32_t subscribeTotal = 0;
  87. // result = RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  88. // RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  89. // { goto __exit; });
  90. // if (1 == subscribeTotal)
  91. // {
  92. // break;
  93. // }
  94. // if (i > 3000)
  95. // {
  96. // RyanMqttLog_e("订阅主题失败");
  97. // result = RyanMqttFailedError;
  98. // goto __exit;
  99. // }
  100. // delay(1);
  101. // }
  102. // 生成随机的数据包大小
  103. {
  104. pubStr = (char *)malloc(2048);
  105. RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
  106. RyanMqttMemset(pubStr, 0, 2048);
  107. for (uint32_t i = 0; i < RyanRand(100, 220); i++)
  108. {
  109. snprintf(pubStr + 4 * i, 2048 - 4 * i, "%c%c%c%c", (char)RyanRand(32, 126),
  110. (char)RyanRand(32, 126), (char)RyanRand(32, 126), (char)RyanRand(32, 126));
  111. }
  112. pubStrLen = (int32_t)RyanMqttStrlen(pubStr);
  113. }
  114. // 开始测试发送数据
  115. pubTestPublishedEventCount = 0;
  116. pubTestDataEventCount = 0;
  117. for (int32_t i = 0; i < count; i++)
  118. {
  119. char *pubTopic = RyanMqttPubTestPubTopic;
  120. uint32_t randNumber = RyanRand(1, 10);
  121. result = RyanMqttPublishWithUserData(
  122. client, pubTopic, RyanMqttStrlen(pubTopic), (0 == i % randNumber) ? pubStr2 : pubStr,
  123. (0 == i % randNumber) ? pubStr2Len : pubStrLen, qos, RyanMqttFalse, (void *)(uintptr_t)qos);
  124. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  125. { goto __exit; });
  126. if (delayms)
  127. {
  128. delay(delayms);
  129. }
  130. }
  131. // 检查收到的数据是否正确
  132. for (int32_t i = 0;; i++)
  133. {
  134. if (RyanMqttQos0 == qos)
  135. {
  136. if (count == pubTestDataEventCount)
  137. {
  138. break;
  139. }
  140. }
  141. // qos1和qos2的发送成功数和接收数应该相等,因为是本机回环测试,理论上应该一致
  142. else if (pubTestPublishedEventCount == count && pubTestPublishedEventCount == pubTestDataEventCount)
  143. {
  144. break;
  145. }
  146. if (i > 300)
  147. {
  148. RyanMqttLog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos,
  149. pubTestPublishedEventCount, pubTestDataEventCount);
  150. result = RyanMqttFailedError;
  151. goto __exit;
  152. }
  153. delay(100);
  154. }
  155. result = RyanMqttUnSubscribe(client, RyanMqttPubTestSubTopic);
  156. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  157. result = checkAckList(client);
  158. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  159. __exit:
  160. free(pubStr);
  161. pubStr = NULL;
  162. RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端");
  163. RyanMqttTestDestroyClient(client);
  164. return result;
  165. }
  166. /**
  167. * @brief 混乱随机的qos消息测试
  168. *
  169. * @param count
  170. * @param delayms
  171. * @return RyanMqttError_e
  172. */
  173. static RyanMqttError_e RyanMqttPublishHybridTest(int32_t count, uint32_t delayms)
  174. {
  175. #define RyanMqttPubHybridTestPubTopic "testlinux/aa/pub/adfa/kkk"
  176. #define RyanMqttPubHybridTestSubTopic "testlinux/aa/+/#"
  177. RyanMqttError_e result = RyanMqttSuccessError;
  178. RyanMqttClient_t *client;
  179. int32_t sendNeedAckCount = 0;
  180. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttPublishEventHandle, NULL);
  181. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  182. // 等待订阅主题成功
  183. result = RyanMqttSubscribe(client, RyanMqttPubHybridTestSubTopic, RyanMqttQos2);
  184. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  185. for (int32_t i = 0;; i++)
  186. {
  187. int32_t subscribeTotal = 0;
  188. result = RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  189. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  190. { goto __exit; });
  191. if (1 == subscribeTotal)
  192. {
  193. break;
  194. }
  195. if (i > 3000)
  196. {
  197. RyanMqttLog_e("订阅主题失败");
  198. result = RyanMqttFailedError;
  199. goto __exit;
  200. }
  201. delay(1);
  202. }
  203. exportQos = RyanMqttSubFail;
  204. // 生成随机的数据包大小
  205. {
  206. pubStr = (char *)malloc(2048);
  207. RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
  208. RyanMqttMemset(pubStr, 0, 2048);
  209. for (uint32_t i = 0; i < RyanRand(100, 220); i++)
  210. {
  211. snprintf(pubStr + 4 * i, 2048 - 4 * i, "%c%c%c%c", (char)RyanRand(32, 126),
  212. (char)RyanRand(32, 126), (char)RyanRand(32, 126), (char)RyanRand(32, 126));
  213. }
  214. pubStrLen = (int32_t)RyanMqttStrlen(pubStr);
  215. }
  216. pubTestPublishedEventCount = 0;
  217. pubTestDataEventCount = 0;
  218. pubTestDataEventCountNotQos0 = 0;
  219. for (int32_t i = 0; i < count; i++)
  220. {
  221. char *pubTopic = RyanMqttPubHybridTestPubTopic;
  222. uint32_t randNumber = RyanRand(1, 10);
  223. result = RyanMqttPublishWithUserData(
  224. client, pubTopic, RyanMqttStrlen(pubTopic), (0 == i % randNumber) ? pubStr2 : pubStr,
  225. (0 == i % randNumber) ? pubStr2Len : pubStrLen, i % 3, RyanMqttFalse,
  226. // NOLINTNEXTLINE(clang-diagnostic-int-to-void-pointer-cast,performance-no-int-to-ptr)
  227. (void *)(uintptr_t)(i % 3));
  228. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  229. { goto __exit; });
  230. if (RyanMqttQos0 != i % 3)
  231. {
  232. sendNeedAckCount++;
  233. }
  234. if (delayms)
  235. {
  236. delay(delayms);
  237. }
  238. }
  239. // 检查收到的数据是否正确
  240. for (int32_t i = 0;; i++)
  241. {
  242. // 发送成功数等于需要ack数,同时也等于接收到非qos0的数
  243. if (pubTestPublishedEventCount == sendNeedAckCount &&
  244. pubTestPublishedEventCount == pubTestDataEventCountNotQos0)
  245. {
  246. break;
  247. }
  248. if (i > 300)
  249. {
  250. RyanMqttLog_e("QOS测试失败, PublishedEventCount: %d, pubTestDataEventCountNotQos0: %d",
  251. pubTestPublishedEventCount, pubTestDataEventCountNotQos0);
  252. result = RyanMqttFailedError;
  253. goto __exit;
  254. }
  255. delay(100);
  256. }
  257. result = RyanMqttUnSubscribe(client, RyanMqttPubHybridTestSubTopic);
  258. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  259. result = checkAckList(client);
  260. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  261. __exit:
  262. free(pubStr);
  263. pubStr = NULL;
  264. RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端");
  265. RyanMqttTestDestroyClient(client);
  266. return result;
  267. }
  268. RyanMqttError_e RyanMqttPubTest(void)
  269. {
  270. RyanMqttError_e result = RyanMqttSuccessError;
  271. // 发布 & 订阅 qos 测试
  272. result = RyanMqttPublishTest(RyanMqttQos0, 1000, 0);
  273. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  274. checkMemory;
  275. result = RyanMqttPublishTest(RyanMqttQos1, 1000, 1);
  276. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  277. checkMemory;
  278. result = RyanMqttPublishTest(RyanMqttQos2, 1000, 1);
  279. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  280. checkMemory;
  281. result = RyanMqttPublishHybridTest(1000, 1);
  282. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  283. checkMemory;
  284. return RyanMqttSuccessError;
  285. __exit:
  286. return RyanMqttFailedError;
  287. }