RyanMqttNetworkFaultQosResilienceTest.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. #include "RyanMqttTest.h"
  2. static int32_t pubTestPublishedEventCount = 0;
  3. static int32_t pubTestDataEventCount = 0;
  4. static char *pubStr = NULL;
  5. static int32_t pubStrLen = 0;
  6. static char *pubStr2 = "a";
  7. static int32_t pubStr2Len = 1;
  8. static RyanMqttQos_e exportQos = RyanMqttSubFail;
  9. static int32_t pubTestDataEventCountNotQos0 = 0;
  10. static void RyanMqttPublishEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  11. {
  12. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  13. switch (event)
  14. {
  15. case RyanMqttEventPublished: {
  16. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  17. RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  18. RyanMqttQos_e qos = (RyanMqttQos_e)(uintptr_t)msgHandler->userData;
  19. if (qos == msgHandler->qos)
  20. {
  21. pubTestPublishedEventCount++;
  22. }
  23. else
  24. {
  25. RyanMqttLog_e("pub测试发送的qos不一致 msgQos: %d, userDataQos: %d", msgHandler->qos, qos);
  26. RyanMqttTestDestroyClient(client);
  27. return;
  28. }
  29. break;
  30. }
  31. case RyanMqttEventData: {
  32. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  33. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  34. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  35. if (RyanMqttSubFail != exportQos && exportQos != msgData->qos)
  36. {
  37. RyanMqttLog_e("pub测试收到qos等级不一致 exportQos: %d, msgQos: %d", exportQos, msgData->qos);
  38. RyanMqttTestDestroyClient(client);
  39. return;
  40. }
  41. if (((msgData->payloadLen == (uint32_t)pubStrLen) &&
  42. 0 == memcmp(msgData->payload, pubStr, pubStrLen)) ||
  43. ((msgData->payloadLen == (uint32_t)pubStr2Len) &&
  44. 0 == memcmp(msgData->payload, pubStr2, pubStr2Len)))
  45. {
  46. pubTestDataEventCount++;
  47. if (RyanMqttQos0 != msgData->qos)
  48. {
  49. pubTestDataEventCountNotQos0++;
  50. }
  51. }
  52. else
  53. {
  54. RyanMqttLog_e("pub测试收到数据不一致 %.*s", msgData->payloadLen, msgData->payload);
  55. RyanMqttTestDestroyClient(client);
  56. return;
  57. }
  58. break;
  59. }
  60. default: mqttEventBaseHandle(pclient, event, eventData); break;
  61. }
  62. }
  63. /**
  64. * @brief 混乱随机的qos消息测试
  65. *
  66. * @param count
  67. * @param delayms
  68. * @return RyanMqttError_e
  69. */
  70. static RyanMqttError_e RyanMqttNetworkFaultQosResiliencePublishTest(int32_t count, uint32_t delayus, RyanMqttQos_e qos)
  71. {
  72. #define RyanMqttPubHybridTestPubTopic "testlinux/aa/pub/adfa/ddd"
  73. #define RyanMqttPubHybridTestSubTopic "testlinux/aa/+/#"
  74. RyanMqttError_e result = RyanMqttSuccessError;
  75. RyanMqttClient_t *client;
  76. int32_t sendNeedAckCount = 0;
  77. result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttPublishEventHandle, NULL);
  78. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  79. // 等待订阅主题成功
  80. result = RyanMqttSubscribe(client, RyanMqttPubHybridTestSubTopic, qos);
  81. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  82. for (int32_t i = 0;; i++)
  83. {
  84. int32_t subscribeTotal = 0;
  85. result = RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  86. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
  87. { goto __exit; });
  88. if (1 == subscribeTotal)
  89. {
  90. break;
  91. }
  92. if (i > 3000)
  93. {
  94. RyanMqttLog_e("订阅主题失败");
  95. result = RyanMqttFailedError;
  96. goto __exit;
  97. }
  98. delay(1);
  99. }
  100. exportQos = qos;
  101. // 生成随机的数据包大小
  102. {
  103. pubStr = (char *)malloc(2048);
  104. RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
  105. RyanMqttMemset(pubStr, 0, 2048);
  106. for (uint32_t i = 0; i < RyanRand(20, 220); i++)
  107. {
  108. snprintf(pubStr + 4 * i, 2048 - 4 * i, "%c%c%c%c", (char)RyanRand(32, 126),
  109. (char)RyanRand(32, 126), (char)RyanRand(32, 126), (char)RyanRand(32, 126));
  110. }
  111. pubStrLen = (int32_t)RyanMqttStrlen(pubStr);
  112. }
  113. pubTestPublishedEventCount = 0;
  114. pubTestDataEventCount = 0;
  115. pubTestDataEventCountNotQos0 = 0;
  116. for (int32_t i = 0; i < count; i++)
  117. {
  118. if (RyanMqttConnectState != RyanMqttGetState(client))
  119. {
  120. RyanMqttLog_e("mqtt 发布测试,销毁mqtt客户端 %d", i);
  121. result = RyanMqttFailedError;
  122. goto __exit;
  123. }
  124. if (delayus)
  125. {
  126. delay_us(delayus);
  127. }
  128. if (0 == i % (count / 5))
  129. {
  130. toggleRandomNetworkFault();
  131. }
  132. char *pubTopic = RyanMqttPubHybridTestPubTopic;
  133. uint32_t randNumber = RyanRand(1, 10);
  134. result = RyanMqttPublishWithUserData(
  135. client, pubTopic, RyanMqttStrlen(pubTopic), (0 == i % randNumber) ? pubStr2 : pubStr,
  136. (0 == i % randNumber) ? pubStr2Len : pubStrLen, qos, RyanMqttFalse,
  137. // NOLINTNEXTLINE(clang-diagnostic-int-to-void-pointer-cast,performance-no-int-to-ptr)
  138. (void *)(uintptr_t)(qos));
  139. if (RyanMqttSuccessError == result)
  140. {
  141. if (RyanMqttQos0 != qos)
  142. {
  143. sendNeedAckCount++;
  144. }
  145. }
  146. // else
  147. // {
  148. // RyanMqttLog_e("mqtt 发布测试,网络故障 %d", i);
  149. // }
  150. }
  151. delay(RyanMqttAckTimeout * 1 + 100);
  152. disableRandomNetworkFault();
  153. // 检查收到的数据是否正确
  154. for (int32_t i = 0;; i++)
  155. {
  156. if (RyanMqttQos2 == qos)
  157. {
  158. // qos2 发送成功数等于需要ack数,同时也等于接收到的数
  159. if (pubTestPublishedEventCount == sendNeedAckCount &&
  160. pubTestPublishedEventCount == pubTestDataEventCount)
  161. {
  162. break;
  163. }
  164. }
  165. else if (RyanMqttQos1 == qos)
  166. {
  167. if (pubTestPublishedEventCount >= sendNeedAckCount && pubTestDataEventCount >= sendNeedAckCount)
  168. {
  169. break;
  170. }
  171. }
  172. else
  173. {
  174. break;
  175. }
  176. if (i > (RyanMqttAckTimeout * 5 / 100 + 500))
  177. {
  178. RyanMqttLog_e("QOS测试失败, PublishedEventCount: %d, pubTestDataEventCountNotQos0: %d, "
  179. "sendNeedAckCount: %d, pubTestDataEventCount: %d",
  180. pubTestPublishedEventCount, pubTestDataEventCountNotQos0, sendNeedAckCount,
  181. pubTestDataEventCount);
  182. result = RyanMqttFailedError;
  183. goto __exit;
  184. }
  185. // 测试代码可以临时访问ack链表
  186. platformMutexLock(client->config.userData, &client->ackHandleLock);
  187. if (RyanMqttListIsEmpty(&client->ackHandlerList))
  188. {
  189. RyanMqttLog_e("ack链表为空, PublishedEventCount: %d, pubTestDataEventCountNotQos0: %d, "
  190. "sendNeedAckCount: %d, pubTestDataEventCount: %d, pubStrLen: %d",
  191. pubTestPublishedEventCount, pubTestDataEventCountNotQos0, sendNeedAckCount,
  192. pubTestDataEventCount, pubStrLen);
  193. }
  194. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  195. delay(100);
  196. }
  197. result = RyanMqttUnSubscribe(client, RyanMqttPubHybridTestSubTopic);
  198. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  199. result = checkAckList(client);
  200. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
  201. __exit:
  202. disableRandomNetworkFault();
  203. free(pubStr);
  204. pubStr = NULL;
  205. RyanMqttLog_i("mqtt 测试,销毁mqtt客户端");
  206. RyanMqttTestDestroyClient(client);
  207. return result;
  208. }
  209. /**
  210. * @brief mqtt网络故障QOS韧性测试
  211. *
  212. * @return RyanMqttError_e
  213. */
  214. RyanMqttError_e RyanMqttNetworkFaultQosResilienceTest(void)
  215. {
  216. RyanMqttError_e result = RyanMqttSuccessError;
  217. result = RyanMqttNetworkFaultQosResiliencePublishTest(700, 0, RyanMqttQos0);
  218. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
  219. checkMemory;
  220. result = RyanMqttNetworkFaultQosResiliencePublishTest(1300, 2, RyanMqttQos1);
  221. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
  222. checkMemory;
  223. result = RyanMqttNetworkFaultQosResiliencePublishTest(1700, 4, RyanMqttQos2);
  224. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
  225. checkMemory;
  226. return RyanMqttSuccessError;
  227. __exit:
  228. return RyanMqttFailedError;
  229. }