#include "RyanMqttTest.h" /** * @brief 混乱随机的qos消息测试 * * @param count * @param delayms * @return RyanMqttError_e */ static RyanMqttError_e RyanMqttNetworkFaultPublishHybridTest(int32_t count, uint32_t delayms) { #define RyanMqttPubHybridTestPubTopic "testlinux/aa/pub/adfa/kkk" #define RyanMqttPubHybridTestSubTopic "testlinux/aa/+/#" RyanMqttError_e result = RyanMqttSuccessError; RyanMqttClient_t *client; char *pubStr = "hello, this is a mqtt publish test string."; uint32_t pubStrLen = strlen(pubStr); result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, NULL, NULL); RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; }); // 等待订阅主题成功 RyanMqttSubscribe(client, RyanMqttPubHybridTestSubTopic, RyanMqttQos2); RyanMqttSubscribe(client, RyanMqttPubHybridTestPubTopic, RyanMqttQos2); delay(2); enableRandomMemoryFault(); for (int32_t i = 0; i < count; i++) { if (RyanMqttConnectState != RyanMqttGetState(client)) { RyanMqttLog_e("mqtt 发布测试,销毁mqtt客户端 %d", i); result = RyanMqttFailedError; goto __exit; } char *pubTopic = RyanMqttPubHybridTestPubTopic; RyanMqttPublishWithUserData(client, pubTopic, RyanMqttStrlen(pubTopic), pubStr, pubStrLen, i % 3, RyanMqttFalse, NULL); if (delayms) { delay(delayms); } } delay(RyanMqttAckTimeout * 2 + 200); RyanMqttUnSubscribe(client, RyanMqttPubHybridTestSubTopic); __exit: disableRandomMemoryFault(); RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端"); RyanMqttTestDestroyClient(client); return result; } static RyanMqttError_e RyanMqttNetworkFaultSubscribeHybridTest(int32_t count, int32_t testCount) { RyanMqttError_e result = RyanMqttSuccessError; RyanMqttClient_t *client; RyanMqttUnSubscribeData_t *unSubscribeManyData = NULL; RyanMqttSubscribeData_t *subscribeManyData = NULL; result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, NULL, NULL); RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; }); // 生成需要订阅的主题数据 { subscribeManyData = (RyanMqttSubscribeData_t *)malloc(sizeof(RyanMqttSubscribeData_t) * count); if (NULL == subscribeManyData) { RyanMqttLog_e("内存不足"); result = RyanMqttNotEnoughMemError; goto __exit; } for (int32_t i = 0; i < count; i++) { subscribeManyData[i].qos = i % 3; char *topic = (char *)malloc(64); if (NULL == topic) { RyanMqttLog_e("内存不足"); result = RyanMqttNotEnoughMemError; goto __exit; } RyanMqttSnprintf(topic, 64, "test/subscribe/%d", i); subscribeManyData[i].topic = topic; subscribeManyData[i].topicLen = RyanMqttStrlen(topic); } } // 生成取消所有订阅消息 unSubscribeManyData = malloc(sizeof(RyanMqttUnSubscribeData_t) * count); if (NULL == unSubscribeManyData) { RyanMqttLog_e("内存不足"); result = RyanMqttNotEnoughMemError; goto __exit; } for (int32_t i = 0; i < count; i++) { char *topic = (char *)malloc(64); if (NULL == topic) { RyanMqttLog_e("内存不足"); result = RyanMqttNotEnoughMemError; goto __exit; } RyanMqttSnprintf(topic, 64, "test/subscribe/%d", i); unSubscribeManyData[i].topic = topic; unSubscribeManyData[i].topicLen = RyanMqttStrlen(topic); } enableRandomMemoryFault(); for (int32_t testCount2 = 0; testCount2 < testCount; testCount2++) { // 订阅全部主题 RyanMqttSubscribeMany(client, count - 1, subscribeManyData); RyanMqttSubscribe(client, subscribeManyData[count - 1].topic, subscribeManyData[count - 1].qos); // 测试重复订阅,不修改qos等级 RyanMqttSubscribeMany(client, count / 2, subscribeManyData); // 测试重复订阅并且修改qos等级 for (int32_t i = count; i > 0; i--) { subscribeManyData[count - i].qos = i % 3; } RyanMqttSubscribeMany(client, count, subscribeManyData); RyanMqttUnSubscribeMany(client, count - 1, unSubscribeManyData); RyanMqttUnSubscribe(client, unSubscribeManyData[count - 1].topic); // 重复取消订阅主题 RyanMqttUnSubscribeMany(client, count / 2, unSubscribeManyData); } delay(RyanMqttAckTimeout * 2 + 200); __exit: disableRandomMemoryFault(); // 删除 for (int32_t i = 0; i < count; i++) { if (NULL != subscribeManyData && NULL != subscribeManyData[i].topic) { free(subscribeManyData[i].topic); } if (NULL != unSubscribeManyData && NULL != unSubscribeManyData[i].topic) { free(unSubscribeManyData[i].topic); } } if (NULL != subscribeManyData) { free(subscribeManyData); } if (NULL != unSubscribeManyData) { free(unSubscribeManyData); } RyanMqttLog_i("mqtt 订阅测试,销毁mqtt客户端"); RyanMqttTestDestroyClient(client); return result; } static RyanMqttError_e RyanMqttNetworkFaultHybridTest(int32_t count) { RyanMqttError_e result = RyanMqttSuccessError; RyanMqttClient_t *client; result = RyanMqttTestInit(&client, RyanMqttTrue, RyanMqttTrue, 120, NULL, NULL); RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; }); // 等待订阅主题成功 RyanMqttSubscribe(client, RyanMqttPubHybridTestSubTopic, RyanMqttQos2); RyanMqttSubscribe(client, RyanMqttPubHybridTestPubTopic, RyanMqttQos2); delay(2); enableRandomMemoryFault(); for (int32_t i = 0; i < count; i++) { RyanMqttMsgHandler_t *msgHandles = NULL; int32_t subscribeNum = 0; RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum); RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum); RyanMqttMsgHandler_t msgHandlesStatic[3] = {0}; RyanMqttGetSubscribe(client, msgHandlesStatic, getArraySize(msgHandlesStatic), &subscribeNum); int32_t subscribeTotalCount = 0; RyanMqttGetSubscribeTotalCount(client, &subscribeTotalCount); RyanMqttGetState(client); uint32_t keepAliveRemain = 0; RyanMqttGetKeepAliveRemain(client, &keepAliveRemain); RyanMqttClientConfig_t *pclientConfig = NULL; RyanMqttGetConfig(client, &pclientConfig); // 不管 pclientConfig 是否有效 RyanMqttSetConfig(client, pclientConfig); RyanMqttFreeConfigFromGet(pclientConfig); RyanMqttSetLwt(client, "pub/lwt/test", "this is will", RyanMqttStrlen("this is will"), RyanMqttQos2, 0); RyanMqttDiscardAckHandler(client, MQTT_PACKET_TYPE_PUBACK, 1); RyanMqttEventId_e eventId = RyanMqttEventAnyId; RyanMqttGetEventId(client, &eventId); RyanMqttCancelEventId(client, eventId); RyanMqttRegisterEventId(client, eventId); } RyanMqttUnSubscribe(client, RyanMqttPubHybridTestSubTopic); __exit: disableRandomMemoryFault(); RyanMqttTestDestroyClient(client); return result; } /** * @brief 内存故障测试 * * @return RyanMqttError_e */ RyanMqttError_e RyanMqttMemoryFaultToleranceTest(void) { RyanMqttError_e result = RyanMqttSuccessError; result = RyanMqttNetworkFaultPublishHybridTest(2000, 1); RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; }); checkMemory; result = RyanMqttNetworkFaultSubscribeHybridTest(2000, 5); RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; }); checkMemory; result = RyanMqttNetworkFaultHybridTest(5000); RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; }); checkMemory; return RyanMqttSuccessError; __exit: return RyanMqttFailedError; }