| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- #include "RyanMqttTest.h"
- #define MESSAGES_PER_THREAD 1000 // 发送个数
- #define CONCURRENT_CLIENTS 20 // 线程数
- // 线程测试统计数据
- typedef struct
- {
- int threadIndex;
- int publishedCount;
- int receivedCount;
- pthread_attr_t attr;
- pthread_t threadId;
- } ThreadTestData_t;
- // 多线程测试控制结构
- typedef struct
- {
- int32_t totalPublished;
- int32_t totalReceived;
- int32_t threadIndex;
- int32_t testComplete;
- RyanMqttClient_t *client;
- } MultiThreadTestControl_t;
- static MultiThreadTestControl_t g_testControl = {0};
- static ThreadTestData_t g_threadTestData[CONCURRENT_CLIENTS + 1] = {0};
- static bool safeParseTopic(const char *topic, uint32_t topicLen, int *threadId)
- {
- const char *prefix = "testThread/";
- const char *suffix = "/tttt";
- size_t prefix_len = strlen(prefix);
- size_t suffix_len = strlen(suffix);
- if (topicLen <= prefix_len + suffix_len)
- {
- return false;
- }
- if (strncmp(topic, prefix, prefix_len) != 0)
- {
- return false;
- }
- if (topicLen < prefix_len + suffix_len)
- {
- return false;
- }
- if (strncmp(topic + topicLen - suffix_len, suffix, suffix_len) != 0)
- {
- return false;
- }
- size_t num_len = topicLen - prefix_len - suffix_len;
- if (num_len == 0 || num_len >= 16)
- {
- return false;
- }
- char num_buf[16] = {0};
- memcpy(num_buf, topic + prefix_len, num_len);
- char *endptr = NULL;
- long val = strtol(num_buf, &endptr, 10);
- if (*endptr != '\0')
- {
- return false;
- }
- *threadId = (int)val;
- return true;
- }
- // 多线程事件处理函数
- static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
- {
- switch (event)
- {
- case RyanMqttEventPublished: {
- RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
- // RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
- int threadId;
- // NOLINTNEXTLINE(cert-err34-c)
- // if (1 == sscanf(msgHandler->topic, "testThread/%d/tttt", &threadId))
- if (safeParseTopic(msgHandler->topic, msgHandler->topicLen, &threadId))
- {
- RyanMqttTestEnableCritical();
- ThreadTestData_t *testData = &g_threadTestData[threadId];
- testData->publishedCount += 1;
- g_testControl.totalPublished += 1;
- RyanMqttTestExitCritical();
- }
- }
- break;
- case RyanMqttEventData: {
- RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
- // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
- // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
- int threadId;
- // 非线程安全
- // if (1 == sscanf(msgData->topic, "testThread/%d/tttt", &threadId))
- if (safeParseTopic(msgData->topic, msgData->topicLen, &threadId))
- {
- RyanMqttTestEnableCritical();
- ThreadTestData_t *testData = &g_threadTestData[threadId];
- testData->receivedCount += 1;
- g_testControl.totalReceived += 1;
- RyanMqttTestExitCritical();
- }
- }
- break;
- default: mqttEventBaseHandle(pclient, event, eventData); break;
- }
- }
- // 并发发布测试线程
- static void *concurrentPublishThread(void *arg)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- char topic[64];
- char payload[256];
- int32_t threadIndex = 0;
- RyanMqttTestEnableCritical();
- threadIndex = g_testControl.threadIndex;
- g_testControl.threadIndex += 1;
- RyanMqttTestExitCritical();
- ThreadTestData_t *testData = &g_threadTestData[threadIndex];
- // 订阅主题
- RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
- result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
- if (RyanMqttSuccessError != result)
- {
- RyanMqttLog_e("Thread %d: Failed to subscribe", threadIndex);
- goto cleanup;
- }
- // 发布消息
- for (int i = 0; i < MESSAGES_PER_THREAD; i++)
- {
- RyanMqttSnprintf(payload, sizeof(payload), "M %d %d", i, threadIndex);
- RyanMqttQos_e qos = (RyanMqttQos_e)(i % 3);
- result = RyanMqttPublish(g_testControl.client, topic, payload, RyanMqttStrlen(payload), qos,
- RyanMqttFalse);
- if (RyanMqttSuccessError != result)
- {
- RyanMqttLog_e("Thread %d: Failed to publish message %d", threadIndex, i);
- }
- else
- {
- if (RyanMqttQos0 == qos)
- {
- RyanMqttTestEnableCritical();
- testData->publishedCount += 1;
- g_testControl.totalPublished += 1;
- RyanMqttTestExitCritical();
- }
- }
- delay_us(1100); // 电脑配置不一样需要的时间也就不一样
- }
- // 等待消息处理完成
- int timeoutCount = 0;
- while (testData->publishedCount < MESSAGES_PER_THREAD && testData->receivedCount < MESSAGES_PER_THREAD)
- {
- delay(10);
- // 10秒超时
- timeoutCount++;
- if (timeoutCount > 1000)
- {
- RyanMqttLog_w("Thread %d: Timeout waiting for messages %d, %d", testData->threadIndex,
- testData->publishedCount, testData->receivedCount);
- break;
- }
- }
- cleanup:
- delay(50); // 让mqtt线程运行
- return NULL;
- }
- // 多客户端并发测试
- static RyanMqttError_e multiClientConcurrentTest(void)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttLog_i("Starting multi-client concurrent test with %d clients", CONCURRENT_CLIENTS);
- // 初始化测试控制结构
- RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
- // 初始化客户端
- result =
- RyanMqttTestInit(&g_testControl.client, RyanMqttTrue, RyanMqttFalse, 120, multiThreadEventHandle, NULL);
- RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
- // 创建测试线程
- for (int i = 0; i < CONCURRENT_CLIENTS; i++)
- {
- // struct sched_param param;
- // pthread_attr_init(&g_threadTestData[i].attr);
- // // 设置调度策略为实时策略
- // pthread_attr_setschedpolicy(&g_threadTestData[i].attr, SCHED_FIFO);
- // // 获取该策略的最大优先级
- // int max_prio = sched_get_priority_max(SCHED_FIFO);
- // param.sched_priority = max_prio;
- // // 设置优先级
- // pthread_attr_setschedparam(&g_threadTestData[i].attr, ¶m);
- int result222 = pthread_create(&g_threadTestData[i].threadId, NULL, concurrentPublishThread, NULL);
- // pthread_attr_destroy(&g_threadTestData[i].attr);
- if (result222 != 0)
- {
- RyanMqttLog_e("Failed to create thread %d", i);
- result = RyanMqttFailedError;
- goto cleanup;
- }
- }
- // 等待线程结束
- for (int i = 0; i < CONCURRENT_CLIENTS; i++)
- {
- pthread_join(g_threadTestData[i].threadId, NULL);
- }
- // 统计结果
- RyanMqttLog_i("Multi-client test results:");
- RyanMqttLog_i(" Total published: %d", g_testControl.totalPublished);
- RyanMqttLog_i(" Total received: %d", g_testControl.totalReceived);
- // 详细统计
- for (int i = 0; i < CONCURRENT_CLIENTS; i++)
- {
- RyanMqttLog_i(" Thread %d: Published=%d, Received=%d", i, g_threadTestData[i].publishedCount,
- g_threadTestData[i].receivedCount);
- }
- // 验证结果
- int expectedTotal = CONCURRENT_CLIENTS * MESSAGES_PER_THREAD;
- if (g_testControl.totalPublished != expectedTotal || g_testControl.totalReceived != expectedTotal)
- {
- RyanMqttLog_e("Test failed: Expected %d published and received, got %d and %d", expectedTotal,
- g_testControl.totalPublished, g_testControl.totalReceived);
- result = RyanMqttFailedError;
- }
- RyanMqttTestDestroyClient(g_testControl.client);
- cleanup:
- return result;
- }
- // 主多线程测试函数
- RyanMqttError_e RyanMqttMultiThreadSafetyTest(void)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- // 1. 多客户端并发测试
- result = multiClientConcurrentTest();
- RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
- // 检查内存泄漏
- checkMemory;
- return result;
- }
|