| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- #include "RyanMqttTest.h"
- static int32_t pubTestPublishedEventCount = 0;
- static int32_t pubTestDataEventCount = 0;
- static char *pubStr = NULL;
- static int32_t pubStrLen = 0;
- static char *pubStr2 = "a";
- static int32_t pubStr2Len = 1;
- static RyanMqttQos_e exportQos = RyanMqttSubFail;
- static int32_t pubTestDataEventCountNotQos0 = 0;
- static void RyanMqttPublishEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
- {
- RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
- switch (event)
- {
- case RyanMqttEventPublished: {
- RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
- RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
- RyanMqttQos_e qos = (RyanMqttQos_e)(uintptr_t)msgHandler->userData;
- if (qos == msgHandler->qos)
- {
- pubTestPublishedEventCount++;
- }
- else
- {
- RyanMqttLog_e("pub测试发送的qos不一致 msgQos: %d, userDataQos: %d", msgHandler->qos, qos);
- RyanMqttTestDestroyClient(client);
- return;
- }
- break;
- }
- case RyanMqttEventData: {
- RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
- // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
- // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
- if (RyanMqttSubFail != exportQos && exportQos != msgData->qos)
- {
- RyanMqttLog_e("pub测试收到qos等级不一致 exportQos: %d, msgQos: %d", exportQos, msgData->qos);
- RyanMqttTestDestroyClient(client);
- return;
- }
- if (((msgData->payloadLen == (uint32_t)pubStrLen) &&
- 0 == memcmp(msgData->payload, pubStr, pubStrLen)) ||
- ((msgData->payloadLen == (uint32_t)pubStr2Len) &&
- 0 == memcmp(msgData->payload, pubStr2, pubStr2Len)))
- {
- pubTestDataEventCount++;
- if (RyanMqttQos0 != msgData->qos)
- {
- pubTestDataEventCountNotQos0++;
- }
- }
- else
- {
- RyanMqttLog_e("pub测试收到数据不一致 %.*s", msgData->payloadLen, msgData->payload);
- RyanMqttTestDestroyClient(client);
- return;
- }
- break;
- }
- default: mqttEventBaseHandle(pclient, event, eventData); break;
- }
- }
- /**
- * @brief 混乱随机的qos消息测试
- *
- * @param count
- * @param delayms
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttNetworkFaultQosResiliencePublishTest(int32_t count, uint32_t delayus, RyanMqttQos_e qos)
- {
- #define RyanMqttPubHybridTestPubTopic "testlinux/aa/pub/adfa/ddd"
- #define RyanMqttPubHybridTestSubTopic "testlinux/aa/+/#"
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttClient_t *client;
- int32_t sendNeedAckCount = 0;
- result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, RyanMqttPublishEventHandle, NULL);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
- // 等待订阅主题成功
- result = RyanMqttSubscribe(client, RyanMqttPubHybridTestSubTopic, qos);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
- for (int32_t i = 0;; i++)
- {
- int32_t subscribeTotal = 0;
- result = RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e,
- { goto __exit; });
- if (1 == subscribeTotal)
- {
- break;
- }
- if (i > 3000)
- {
- RyanMqttLog_e("订阅主题失败");
- result = RyanMqttFailedError;
- goto __exit;
- }
- delay(1);
- }
- exportQos = qos;
- // 生成随机的数据包大小
- {
- pubStr = (char *)malloc(2048);
- RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
- RyanMqttMemset(pubStr, 0, 2048);
- for (uint32_t i = 0; i < RyanRand(20, 220); i++)
- {
- snprintf(pubStr + 4 * i, 2048 - 4 * i, "%c%c%c%c", (char)RyanRand(32, 126),
- (char)RyanRand(32, 126), (char)RyanRand(32, 126), (char)RyanRand(32, 126));
- }
- pubStrLen = (int32_t)RyanMqttStrlen(pubStr);
- }
- pubTestPublishedEventCount = 0;
- pubTestDataEventCount = 0;
- pubTestDataEventCountNotQos0 = 0;
- for (int32_t i = 0; i < count; i++)
- {
- if (RyanMqttConnectState != RyanMqttGetState(client))
- {
- RyanMqttLog_e("mqtt 发布测试,销毁mqtt客户端 %d", i);
- result = RyanMqttFailedError;
- goto __exit;
- }
- if (delayus)
- {
- delay_us(delayus);
- }
- if (0 == i % (count / 5))
- {
- toggleRandomNetworkFault();
- }
- char *pubTopic = RyanMqttPubHybridTestPubTopic;
- uint32_t randNumber = RyanRand(1, 10);
- result = RyanMqttPublishWithUserData(
- client, pubTopic, RyanMqttStrlen(pubTopic), (0 == i % randNumber) ? pubStr2 : pubStr,
- (0 == i % randNumber) ? pubStr2Len : pubStrLen, qos, RyanMqttFalse,
- // NOLINTNEXTLINE(clang-diagnostic-int-to-void-pointer-cast,performance-no-int-to-ptr)
- (void *)(uintptr_t)(qos));
- if (RyanMqttSuccessError == result)
- {
- if (RyanMqttQos0 != qos)
- {
- sendNeedAckCount++;
- }
- }
- // else
- // {
- // RyanMqttLog_e("mqtt 发布测试,网络故障 %d", i);
- // }
- }
- delay(RyanMqttAckTimeout * 1 + 100);
- disableRandomNetworkFault();
- // 检查收到的数据是否正确
- for (int32_t i = 0;; i++)
- {
- if (RyanMqttQos2 == qos)
- {
- // qos2 发送成功数等于需要ack数,同时也等于接收到的数
- if (pubTestPublishedEventCount == sendNeedAckCount &&
- pubTestPublishedEventCount == pubTestDataEventCount)
- {
- break;
- }
- }
- else if (RyanMqttQos1 == qos)
- {
- if (pubTestPublishedEventCount >= sendNeedAckCount && pubTestDataEventCount >= sendNeedAckCount)
- {
- break;
- }
- }
- else
- {
- break;
- }
- if (i > (RyanMqttAckTimeout * 5 / 100 + 500))
- {
- RyanMqttLog_e("QOS测试失败, PublishedEventCount: %d, pubTestDataEventCountNotQos0: %d, "
- "sendNeedAckCount: %d, pubTestDataEventCount: %d",
- pubTestPublishedEventCount, pubTestDataEventCountNotQos0, sendNeedAckCount,
- pubTestDataEventCount);
- result = RyanMqttFailedError;
- goto __exit;
- }
- // 测试代码可以临时访问ack链表
- platformMutexLock(client->config.userData, &client->ackHandleLock);
- if (RyanMqttListIsEmpty(&client->ackHandlerList))
- {
- RyanMqttLog_e("ack链表为空, PublishedEventCount: %d, pubTestDataEventCountNotQos0: %d, "
- "sendNeedAckCount: %d, pubTestDataEventCount: %d, pubStrLen: %d",
- pubTestPublishedEventCount, pubTestDataEventCountNotQos0, sendNeedAckCount,
- pubTestDataEventCount, pubStrLen);
- }
- platformMutexUnLock(client->config.userData, &client->ackHandleLock);
- delay(100);
- }
- result = RyanMqttUnSubscribe(client, RyanMqttPubHybridTestSubTopic);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
- result = checkAckList(client);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
- __exit:
- disableRandomNetworkFault();
- free(pubStr);
- pubStr = NULL;
- RyanMqttLog_i("mqtt 测试,销毁mqtt客户端");
- RyanMqttTestDestroyClient(client);
- return result;
- }
- /**
- * @brief mqtt网络故障QOS韧性测试
- *
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttNetworkFaultQosResilienceTest(void)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- result = RyanMqttNetworkFaultQosResiliencePublishTest(700, 0, RyanMqttQos0);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
- checkMemory;
- result = RyanMqttNetworkFaultQosResiliencePublishTest(1300, 2, RyanMqttQos1);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
- checkMemory;
- result = RyanMqttNetworkFaultQosResiliencePublishTest(1700, 4, RyanMqttQos2);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
- checkMemory;
- return RyanMqttSuccessError;
- __exit:
- return RyanMqttFailedError;
- }
|