RyanMqttMultiThreadSafetyTest.c 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. #include "RyanMqttTest.h"
  2. #define MESSAGES_PER_THREAD 1000 // 发送个数
  3. #define CONCURRENT_CLIENTS 20 // 线程数
  4. // 线程测试统计数据
  5. typedef struct
  6. {
  7. int threadIndex;
  8. int publishedCount;
  9. int receivedCount;
  10. pthread_attr_t attr;
  11. pthread_t threadId;
  12. } ThreadTestData_t;
  13. // 多线程测试控制结构
  14. typedef struct
  15. {
  16. int32_t runningThreads;
  17. int32_t totalPublished;
  18. int32_t totalReceived;
  19. int32_t threadIndex;
  20. pthread_mutex_t statsMutex;
  21. pthread_cond_t completionCond;
  22. int32_t testComplete;
  23. RyanMqttClient_t *client;
  24. } MultiThreadTestControl_t;
  25. static MultiThreadTestControl_t g_testControl = {0};
  26. static ThreadTestData_t g_threadTestData[CONCURRENT_CLIENTS + 1] = {0};
  27. // 多线程事件处理函数
  28. static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  29. {
  30. switch (event)
  31. {
  32. case RyanMqttEventPublished: {
  33. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  34. // RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  35. int thread_id;
  36. // NOLINTNEXTLINE(cert-err34-c)
  37. if (1 == sscanf(msgHandler->topic, "testThread/%d/tttt", &thread_id))
  38. {
  39. ThreadTestData_t *testData = &g_threadTestData[thread_id];
  40. RyanMqttTestEnableCritical();
  41. testData->publishedCount += 1;
  42. g_testControl.totalPublished += 1;
  43. RyanMqttTestExitCritical();
  44. }
  45. }
  46. break;
  47. case RyanMqttEventData: {
  48. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  49. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  50. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  51. int thread_id;
  52. // NOLINTNEXTLINE(cert-err34-c)
  53. if (1 == sscanf(msgData->topic, "testThread/%d/tttt", &thread_id))
  54. {
  55. ThreadTestData_t *testData = &g_threadTestData[thread_id];
  56. RyanMqttTestEnableCritical();
  57. testData->receivedCount += 1;
  58. g_testControl.totalReceived += 1;
  59. RyanMqttTestExitCritical();
  60. }
  61. }
  62. break;
  63. default: mqttEventBaseHandle(pclient, event, eventData); break;
  64. }
  65. }
  66. // 并发发布测试线程
  67. static void *concurrentPublishThread(void *arg)
  68. {
  69. RyanMqttError_e result = RyanMqttSuccessError;
  70. char topic[64];
  71. char payload[256];
  72. int32_t threadIndex = 0;
  73. RyanMqttTestEnableCritical();
  74. threadIndex = g_testControl.threadIndex;
  75. g_testControl.threadIndex += 1;
  76. RyanMqttTestExitCritical();
  77. ThreadTestData_t *testData = &g_threadTestData[threadIndex];
  78. // 订阅主题
  79. RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
  80. result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
  81. if (RyanMqttSuccessError != result)
  82. {
  83. RyanMqttLog_e("Thread %d: Failed to subscribe", threadIndex);
  84. goto cleanup;
  85. }
  86. // 发布消息
  87. for (int i = 0; i < MESSAGES_PER_THREAD; i++)
  88. {
  89. RyanMqttSnprintf(payload, sizeof(payload), "M %d %d", i, threadIndex);
  90. RyanMqttQos_e qos = (RyanMqttQos_e)(i % 3);
  91. result = RyanMqttPublish(g_testControl.client, topic, payload, RyanMqttStrlen(payload), qos,
  92. RyanMqttFalse);
  93. if (RyanMqttSuccessError != result)
  94. {
  95. RyanMqttLog_e("Thread %d: Failed to publish message %d", threadIndex, i);
  96. }
  97. else
  98. {
  99. if (RyanMqttQos0 == qos)
  100. {
  101. RyanMqttTestEnableCritical();
  102. testData->publishedCount += 1;
  103. g_testControl.totalPublished += 1;
  104. RyanMqttTestExitCritical();
  105. }
  106. }
  107. delay_us(700); // 电脑配置不一样需要的时间也就不一样
  108. }
  109. // 等待消息处理完成
  110. int timeoutCount = 0;
  111. while (testData->publishedCount < MESSAGES_PER_THREAD && testData->receivedCount < MESSAGES_PER_THREAD)
  112. {
  113. delay(10);
  114. // 10秒超时
  115. timeoutCount++;
  116. if (timeoutCount > 1000)
  117. {
  118. RyanMqttLog_w("Thread %d: Timeout waiting for messages %d, %d", testData->threadIndex,
  119. testData->publishedCount, testData->receivedCount);
  120. break;
  121. }
  122. }
  123. cleanup:
  124. delay(50); // 让mqtt线程运行
  125. pthread_mutex_lock(&g_testControl.statsMutex);
  126. g_testControl.runningThreads--;
  127. if (g_testControl.runningThreads == 0)
  128. {
  129. pthread_cond_signal(&g_testControl.completionCond);
  130. }
  131. pthread_mutex_unlock(&g_testControl.statsMutex);
  132. return NULL;
  133. }
  134. // 多客户端并发测试
  135. static RyanMqttError_e multiClientConcurrentTest(void)
  136. {
  137. RyanMqttError_e result = RyanMqttSuccessError;
  138. RyanMqttLog_i("Starting multi-client concurrent test with %d clients", CONCURRENT_CLIENTS);
  139. // 初始化测试控制结构
  140. RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
  141. pthread_mutex_init(&g_testControl.statsMutex, NULL);
  142. pthread_cond_init(&g_testControl.completionCond, NULL);
  143. g_testControl.runningThreads = CONCURRENT_CLIENTS;
  144. // 初始化客户端
  145. result =
  146. RyanMqttTestInit(&g_testControl.client, RyanMqttTrue, RyanMqttFalse, 120, multiThreadEventHandle, NULL);
  147. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  148. // 创建测试线程
  149. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  150. {
  151. struct sched_param param;
  152. // 初始化线程属性
  153. pthread_attr_init(&g_threadTestData[i].attr);
  154. // 设置线程为显式调度属性(否则可能忽略优先级)
  155. pthread_attr_setinheritsched(&g_threadTestData[i].attr, PTHREAD_EXPLICIT_SCHED);
  156. // 设置调度策略,例如 SCHED_FIFO 或 SCHED_RR(实时策略)
  157. pthread_attr_setschedpolicy(&g_threadTestData[i].attr, SCHED_FIFO);
  158. // 设置优先级(范围依赖于调度策略)
  159. param.sched_priority = 1; // 实时优先级范围通常是 1 ~ 99
  160. pthread_attr_setschedparam(&g_threadTestData[i].attr, &param);
  161. if (pthread_create(&g_threadTestData[i].threadId, NULL, concurrentPublishThread, NULL) != 0)
  162. {
  163. RyanMqttLog_e("Failed to create thread %d", i);
  164. result = RyanMqttFailedError;
  165. goto cleanup;
  166. }
  167. }
  168. // 等待所有线程完成
  169. pthread_mutex_lock(&g_testControl.statsMutex);
  170. while (g_testControl.runningThreads > 0)
  171. {
  172. pthread_cond_wait(&g_testControl.completionCond, &g_testControl.statsMutex);
  173. }
  174. pthread_mutex_unlock(&g_testControl.statsMutex);
  175. // 等待线程结束
  176. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  177. {
  178. pthread_join(g_threadTestData[i].threadId, NULL);
  179. pthread_attr_destroy(&g_threadTestData[i].attr);
  180. }
  181. RyanMqttTestDestroyClient(g_testControl.client);
  182. // 统计结果
  183. RyanMqttLog_i("Multi-client test results:");
  184. RyanMqttLog_i(" Total published: %d", g_testControl.totalPublished);
  185. RyanMqttLog_i(" Total received: %d", g_testControl.totalReceived);
  186. // 详细统计
  187. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  188. {
  189. RyanMqttLog_i(" Thread %d: Published=%d, Received=%d", i, g_threadTestData[i].publishedCount,
  190. g_threadTestData[i].receivedCount);
  191. }
  192. // 验证结果
  193. int expectedTotal = CONCURRENT_CLIENTS * MESSAGES_PER_THREAD;
  194. if (g_testControl.totalPublished != expectedTotal || g_testControl.totalReceived != expectedTotal)
  195. {
  196. RyanMqttLog_e("Test failed: Expected %d published and received, got %d and %d", expectedTotal,
  197. g_testControl.totalPublished, g_testControl.totalReceived);
  198. result = RyanMqttFailedError;
  199. }
  200. cleanup:
  201. pthread_mutex_destroy(&g_testControl.statsMutex);
  202. pthread_cond_destroy(&g_testControl.completionCond);
  203. return result;
  204. }
  205. // 主多线程测试函数
  206. RyanMqttError_e RyanMqttMultiThreadSafetyTest(void)
  207. {
  208. RyanMqttError_e result = RyanMqttSuccessError;
  209. // 1. 多客户端并发测试
  210. result = multiClientConcurrentTest();
  211. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  212. // 检查内存泄漏
  213. checkMemory;
  214. return result;
  215. }