RyanMqttMultiThreadMultiClientTest.c 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #include "RyanMqttTest.h"
  2. #define MESSAGES_PER_THREAD 1000 // 发送个数
  3. #define CONCURRENT_CLIENTS 20 // 线程数
  4. // 线程测试统计数据
  5. typedef struct
  6. {
  7. pthread_t threadId;
  8. int threadIndex;
  9. int publishedCount;
  10. int receivedCount;
  11. RyanMqttClient_t *client;
  12. } ThreadTestData_t;
  13. // 多线程测试控制结构
  14. typedef struct
  15. {
  16. volatile int runningThreads;
  17. volatile int totalPublished;
  18. volatile int totalReceived;
  19. pthread_mutex_t statsMutex;
  20. pthread_cond_t completionCond;
  21. volatile int testComplete;
  22. } MultiThreadTestControl_t;
  23. static MultiThreadTestControl_t g_testControl = {0};
  24. // 多线程事件处理函数
  25. static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  26. {
  27. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  28. struct RyanMqttTestEventUserData *eventUserData = (struct RyanMqttTestEventUserData *)client->config.userData;
  29. ThreadTestData_t *testData = (ThreadTestData_t *)eventUserData->userData;
  30. switch (event)
  31. {
  32. case RyanMqttEventPublished: {
  33. // RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  34. // RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  35. RyanMqttTestEnableCritical();
  36. testData->publishedCount += 1;
  37. g_testControl.totalPublished += 1;
  38. RyanMqttTestExitCritical();
  39. }
  40. break;
  41. case RyanMqttEventData: {
  42. // RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  43. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  44. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  45. RyanMqttTestEnableCritical();
  46. testData->receivedCount += 1;
  47. g_testControl.totalReceived += 1;
  48. RyanMqttTestExitCritical();
  49. }
  50. break;
  51. default: mqttEventBaseHandle(pclient, event, eventData); break;
  52. }
  53. }
  54. // 并发发布测试线程
  55. static void *concurrentPublishThread(void *arg)
  56. {
  57. ThreadTestData_t *testData = (ThreadTestData_t *)arg;
  58. RyanMqttError_e result = RyanMqttSuccessError;
  59. char topic[64];
  60. char payload[256];
  61. // 初始化客户端
  62. result =
  63. RyanMqttTestInit(&testData->client, RyanMqttTrue, RyanMqttFalse, 120, multiThreadEventHandle, testData);
  64. if (RyanMqttSuccessError != result)
  65. {
  66. RyanMqttLog_e("Thread %d: Failed to initialize client", testData->threadIndex);
  67. return NULL;
  68. }
  69. // 订阅主题
  70. snprintf(topic, sizeof(topic), "test/thread/%d", testData->threadIndex);
  71. result = RyanMqttSubscribe(testData->client, topic, testData->threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
  72. if (RyanMqttSuccessError != result)
  73. {
  74. RyanMqttLog_e("Thread %d: Failed to subscribe", testData->threadIndex);
  75. goto cleanup;
  76. }
  77. // 发布消息
  78. for (int i = 0; i < MESSAGES_PER_THREAD; i++)
  79. {
  80. snprintf(payload, sizeof(payload), "Message %d from thread %d", i, testData->threadIndex);
  81. result = RyanMqttPublish(testData->client, topic, payload, RyanMqttStrlen(payload),
  82. i % 2 ? RyanMqttQos2 : RyanMqttQos1, RyanMqttFalse);
  83. if (RyanMqttSuccessError != result)
  84. {
  85. RyanMqttLog_e("Thread %d: Failed to publish message %d", testData->threadIndex, i);
  86. }
  87. delay(i % 2 ? 2 : 1);
  88. }
  89. // 等待消息处理完成
  90. int timeoutCount = 0;
  91. while (testData->publishedCount < MESSAGES_PER_THREAD && testData->receivedCount < MESSAGES_PER_THREAD)
  92. {
  93. delay(10);
  94. // 10秒超时
  95. timeoutCount++;
  96. if (timeoutCount > 1000)
  97. {
  98. RyanMqttLog_w("Thread %d: Timeout waiting for messages %d, %d", testData->threadIndex,
  99. testData->publishedCount, testData->receivedCount);
  100. break;
  101. }
  102. }
  103. cleanup:
  104. delay(50); // 让mqtt线程运行
  105. if (testData->client)
  106. {
  107. RyanMqttTestDestroyClient(testData->client);
  108. }
  109. pthread_mutex_lock(&g_testControl.statsMutex);
  110. g_testControl.runningThreads--;
  111. if (g_testControl.runningThreads == 0)
  112. {
  113. pthread_cond_signal(&g_testControl.completionCond);
  114. }
  115. pthread_mutex_unlock(&g_testControl.statsMutex);
  116. return NULL;
  117. }
  118. // 多客户端并发测试
  119. static RyanMqttError_e multiClientConcurrentTest(void)
  120. {
  121. RyanMqttError_e result = RyanMqttSuccessError;
  122. ThreadTestData_t testThreads[CONCURRENT_CLIENTS];
  123. RyanMqttLog_i("Starting multi-client concurrent test with %d clients", CONCURRENT_CLIENTS);
  124. // 初始化测试控制结构
  125. RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
  126. pthread_mutex_init(&g_testControl.statsMutex, NULL);
  127. pthread_cond_init(&g_testControl.completionCond, NULL);
  128. g_testControl.runningThreads = CONCURRENT_CLIENTS;
  129. // 创建测试线程
  130. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  131. {
  132. RyanMqttMemset(&testThreads[i], 0, sizeof(ThreadTestData_t));
  133. testThreads[i].threadIndex = i;
  134. if (pthread_create(&testThreads[i].threadId, NULL, concurrentPublishThread, &testThreads[i]) != 0)
  135. {
  136. RyanMqttLog_e("Failed to create thread %d", i);
  137. result = RyanMqttFailedError;
  138. goto cleanup;
  139. }
  140. }
  141. // 等待所有线程完成
  142. pthread_mutex_lock(&g_testControl.statsMutex);
  143. while (g_testControl.runningThreads > 0)
  144. {
  145. pthread_cond_wait(&g_testControl.completionCond, &g_testControl.statsMutex);
  146. }
  147. pthread_mutex_unlock(&g_testControl.statsMutex);
  148. // 等待线程结束
  149. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  150. {
  151. pthread_join(testThreads[i].threadId, NULL);
  152. }
  153. // 统计结果
  154. RyanMqttLog_i("Multi-client test results:");
  155. RyanMqttLog_i(" Total published: %d", g_testControl.totalPublished);
  156. RyanMqttLog_i(" Total received: %d", g_testControl.totalReceived);
  157. // 详细统计
  158. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  159. {
  160. RyanMqttLog_i(" Thread %d: Published=%d, Received=%d", i, testThreads[i].publishedCount,
  161. testThreads[i].receivedCount);
  162. }
  163. // 验证结果
  164. int expectedTotal = CONCURRENT_CLIENTS * MESSAGES_PER_THREAD;
  165. if (g_testControl.totalPublished != expectedTotal || g_testControl.totalReceived != expectedTotal)
  166. {
  167. RyanMqttLog_e("Test failed: Expected %d published and received, got %d and %d", expectedTotal,
  168. g_testControl.totalPublished, g_testControl.totalReceived);
  169. result = RyanMqttFailedError;
  170. }
  171. cleanup:
  172. pthread_mutex_destroy(&g_testControl.statsMutex);
  173. pthread_cond_destroy(&g_testControl.completionCond);
  174. return result;
  175. }
  176. // 主多线程测试函数
  177. RyanMqttError_e RyanMqttMultiThreadMultiClientTest(void)
  178. {
  179. RyanMqttError_e result = RyanMqttSuccessError;
  180. // 1. 多客户端并发测试
  181. result = multiClientConcurrentTest();
  182. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  183. // 检查内存泄漏
  184. checkMemory;
  185. return result;
  186. }