RyanMqttMultiThreadMultiClientTest.c 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. #include "RyanMqttTest.h"
  2. #define MESSAGES_PER_THREAD 1000 // 发送个数
  3. #define CONCURRENT_CLIENTS 20 // 线程数
  4. // 线程测试统计数据
  5. typedef struct
  6. {
  7. pthread_t threadId;
  8. int threadIndex;
  9. int publishedCount;
  10. int receivedCount;
  11. RyanMqttClient_t *client;
  12. } ThreadTestData_t;
  13. // 多线程测试控制结构
  14. typedef struct
  15. {
  16. volatile int runningThreads;
  17. volatile int totalPublished;
  18. volatile int totalReceived;
  19. volatile int testComplete;
  20. } MultiThreadTestControl_t;
  21. static MultiThreadTestControl_t g_testControl = {0};
  22. // 多线程事件处理函数
  23. static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  24. {
  25. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  26. struct RyanMqttTestEventUserData *eventUserData = (struct RyanMqttTestEventUserData *)client->config.userData;
  27. ThreadTestData_t *testData = (ThreadTestData_t *)eventUserData->userData;
  28. switch (event)
  29. {
  30. case RyanMqttEventPublished: {
  31. // RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  32. // RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  33. RyanMqttTestEnableCritical();
  34. testData->publishedCount += 1;
  35. g_testControl.totalPublished += 1;
  36. RyanMqttTestExitCritical();
  37. }
  38. break;
  39. case RyanMqttEventData: {
  40. // RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  41. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  42. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  43. RyanMqttTestEnableCritical();
  44. testData->receivedCount += 1;
  45. g_testControl.totalReceived += 1;
  46. RyanMqttTestExitCritical();
  47. }
  48. break;
  49. default: mqttEventBaseHandle(pclient, event, eventData); break;
  50. }
  51. }
  52. // 并发发布测试线程
  53. static void *concurrentPublishThread(void *arg)
  54. {
  55. ThreadTestData_t *testData = (ThreadTestData_t *)arg;
  56. RyanMqttError_e result = RyanMqttSuccessError;
  57. char topic[64];
  58. char payload[256];
  59. // 初始化客户端
  60. result =
  61. RyanMqttTestInit(&testData->client, RyanMqttTrue, RyanMqttFalse, 120, multiThreadEventHandle, testData);
  62. if (RyanMqttSuccessError != result)
  63. {
  64. RyanMqttLog_e("Thread %d: Failed to initialize client", testData->threadIndex);
  65. return NULL;
  66. }
  67. // 订阅主题
  68. RyanMqttSnprintf(topic, sizeof(topic), "test/thread/%d", testData->threadIndex);
  69. result = RyanMqttSubscribe(testData->client, topic, RyanMqttQos2);
  70. // result = RyanMqttSubscribe(testData->client, topic, testData->threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
  71. if (RyanMqttSuccessError != result)
  72. {
  73. RyanMqttLog_e("Thread %d: Failed to subscribe", testData->threadIndex);
  74. goto cleanup;
  75. }
  76. // 发布消息
  77. for (int i = 0; i < MESSAGES_PER_THREAD; i++)
  78. {
  79. RyanMqttSnprintf(payload, sizeof(payload), "Message %d from thread %d", i, testData->threadIndex);
  80. result = RyanMqttPublish(testData->client, topic, payload, RyanMqttStrlen(payload),
  81. i % 2 ? RyanMqttQos2 : RyanMqttQos1, RyanMqttFalse);
  82. if (RyanMqttSuccessError != result)
  83. {
  84. RyanMqttLog_e("Thread %d: Failed to publish message %d", testData->threadIndex, i);
  85. }
  86. delay_us(1100);
  87. }
  88. // 等待消息处理完成
  89. int timeoutCount = 0;
  90. while (testData->publishedCount < MESSAGES_PER_THREAD && testData->receivedCount < MESSAGES_PER_THREAD)
  91. {
  92. delay(10);
  93. // 10秒超时
  94. timeoutCount++;
  95. if (timeoutCount > 1000)
  96. {
  97. RyanMqttLog_w("Thread %d: Timeout waiting for messages %d, %d", testData->threadIndex,
  98. testData->publishedCount, testData->receivedCount);
  99. break;
  100. }
  101. }
  102. cleanup:
  103. delay(50); // 让mqtt线程运行
  104. if (testData->client)
  105. {
  106. RyanMqttTestDestroyClient(testData->client);
  107. }
  108. return NULL;
  109. }
  110. // 多客户端并发测试
  111. static RyanMqttError_e multiClientConcurrentTest(void)
  112. {
  113. RyanMqttError_e result = RyanMqttSuccessError;
  114. ThreadTestData_t testThreads[CONCURRENT_CLIENTS];
  115. RyanMqttLog_i("Starting multi-client concurrent test with %d clients", CONCURRENT_CLIENTS);
  116. // 初始化测试控制结构
  117. RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
  118. g_testControl.runningThreads = CONCURRENT_CLIENTS;
  119. // 创建测试线程
  120. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  121. {
  122. RyanMqttMemset(&testThreads[i], 0, sizeof(ThreadTestData_t));
  123. testThreads[i].threadIndex = i;
  124. if (pthread_create(&testThreads[i].threadId, NULL, concurrentPublishThread, &testThreads[i]) != 0)
  125. {
  126. RyanMqttLog_e("Failed to create thread %d", i);
  127. result = RyanMqttFailedError;
  128. goto cleanup;
  129. }
  130. }
  131. // 等待线程结束
  132. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  133. {
  134. pthread_join(testThreads[i].threadId, NULL);
  135. }
  136. // 统计结果
  137. RyanMqttLog_i("Multi-client test results:");
  138. RyanMqttLog_i(" Total published: %d", g_testControl.totalPublished);
  139. RyanMqttLog_i(" Total received: %d", g_testControl.totalReceived);
  140. // 详细统计
  141. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  142. {
  143. RyanMqttLog_i(" Thread %d: Published=%d, Received=%d", i, testThreads[i].publishedCount,
  144. testThreads[i].receivedCount);
  145. }
  146. // 验证结果
  147. int expectedTotal = CONCURRENT_CLIENTS * MESSAGES_PER_THREAD;
  148. if (g_testControl.totalPublished != expectedTotal || g_testControl.totalReceived != expectedTotal)
  149. {
  150. RyanMqttLog_e("Test failed: Expected %d published and received, got %d and %d", expectedTotal,
  151. g_testControl.totalPublished, g_testControl.totalReceived);
  152. result = RyanMqttFailedError;
  153. }
  154. cleanup:
  155. return result;
  156. }
  157. // 主多线程测试函数
  158. RyanMqttError_e RyanMqttMultiThreadMultiClientTest(void)
  159. {
  160. RyanMqttError_e result = RyanMqttSuccessError;
  161. // 1. 多客户端并发测试
  162. result = multiClientConcurrentTest();
  163. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  164. // 检查内存泄漏
  165. checkMemory;
  166. return result;
  167. }