RyanMqttMultiThreadSafetyTest.c 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #include "RyanMqttTest.h"
  2. #define MESSAGES_PER_THREAD 1000 // 发送个数
  3. #define CONCURRENT_CLIENTS 20 // 线程数
  4. // 线程测试统计数据
  5. typedef struct
  6. {
  7. int threadIndex;
  8. int publishedCount;
  9. int receivedCount;
  10. pthread_attr_t attr;
  11. pthread_t threadId;
  12. } ThreadTestData_t;
  13. // 多线程测试控制结构
  14. typedef struct
  15. {
  16. int32_t totalPublished;
  17. int32_t totalReceived;
  18. int32_t threadIndex;
  19. int32_t testComplete;
  20. RyanMqttClient_t *client;
  21. } MultiThreadTestControl_t;
  22. static MultiThreadTestControl_t g_testControl = {0};
  23. static ThreadTestData_t g_threadTestData[CONCURRENT_CLIENTS + 1] = {0};
  24. static bool safeParseTopic(const char *topic, uint32_t topicLen, int *threadId)
  25. {
  26. const char *prefix = "testThread/";
  27. const char *suffix = "/tttt";
  28. size_t prefix_len = strlen(prefix);
  29. size_t suffix_len = strlen(suffix);
  30. if (topicLen <= prefix_len + suffix_len)
  31. {
  32. return false;
  33. }
  34. if (strncmp(topic, prefix, prefix_len) != 0)
  35. {
  36. return false;
  37. }
  38. if (topicLen < prefix_len + suffix_len)
  39. {
  40. return false;
  41. }
  42. if (strncmp(topic + topicLen - suffix_len, suffix, suffix_len) != 0)
  43. {
  44. return false;
  45. }
  46. size_t num_len = topicLen - prefix_len - suffix_len;
  47. if (num_len == 0 || num_len >= 16)
  48. {
  49. return false;
  50. }
  51. char num_buf[16] = {0};
  52. memcpy(num_buf, topic + prefix_len, num_len);
  53. char *endptr = NULL;
  54. long val = strtol(num_buf, &endptr, 10);
  55. if (*endptr != '\0')
  56. {
  57. return false;
  58. }
  59. *threadId = (int)val;
  60. return true;
  61. }
  62. // 多线程事件处理函数
  63. static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  64. {
  65. switch (event)
  66. {
  67. case RyanMqttEventPublished: {
  68. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  69. // RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  70. int threadId;
  71. // NOLINTNEXTLINE(cert-err34-c)
  72. // if (1 == sscanf(msgHandler->topic, "testThread/%d/tttt", &threadId))
  73. if (safeParseTopic(msgHandler->topic, msgHandler->topicLen, &threadId))
  74. {
  75. RyanMqttTestEnableCritical();
  76. ThreadTestData_t *testData = &g_threadTestData[threadId];
  77. testData->publishedCount += 1;
  78. g_testControl.totalPublished += 1;
  79. RyanMqttTestExitCritical();
  80. }
  81. }
  82. break;
  83. case RyanMqttEventData: {
  84. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  85. // RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  86. // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  87. int threadId;
  88. // 非线程安全
  89. // if (1 == sscanf(msgData->topic, "testThread/%d/tttt", &threadId))
  90. if (safeParseTopic(msgData->topic, msgData->topicLen, &threadId))
  91. {
  92. RyanMqttTestEnableCritical();
  93. ThreadTestData_t *testData = &g_threadTestData[threadId];
  94. testData->receivedCount += 1;
  95. g_testControl.totalReceived += 1;
  96. RyanMqttTestExitCritical();
  97. }
  98. }
  99. break;
  100. default: mqttEventBaseHandle(pclient, event, eventData); break;
  101. }
  102. }
  103. // 并发发布测试线程
  104. static void *concurrentPublishThread(void *arg)
  105. {
  106. RyanMqttError_e result = RyanMqttSuccessError;
  107. char topic[64];
  108. char payload[256];
  109. int32_t threadIndex = 0;
  110. RyanMqttTestEnableCritical();
  111. threadIndex = g_testControl.threadIndex;
  112. g_testControl.threadIndex += 1;
  113. RyanMqttTestExitCritical();
  114. ThreadTestData_t *testData = &g_threadTestData[threadIndex];
  115. // 订阅主题
  116. RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
  117. result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
  118. if (RyanMqttSuccessError != result)
  119. {
  120. RyanMqttLog_e("Thread %d: Failed to subscribe", threadIndex);
  121. goto cleanup;
  122. }
  123. // 发布消息
  124. for (int i = 0; i < MESSAGES_PER_THREAD; i++)
  125. {
  126. RyanMqttSnprintf(payload, sizeof(payload), "M %d %d", i, threadIndex);
  127. RyanMqttQos_e qos = (RyanMqttQos_e)(i % 3);
  128. result = RyanMqttPublish(g_testControl.client, topic, payload, RyanMqttStrlen(payload), qos,
  129. RyanMqttFalse);
  130. if (RyanMqttSuccessError != result)
  131. {
  132. RyanMqttLog_e("Thread %d: Failed to publish message %d", threadIndex, i);
  133. }
  134. else
  135. {
  136. if (RyanMqttQos0 == qos)
  137. {
  138. RyanMqttTestEnableCritical();
  139. testData->publishedCount += 1;
  140. g_testControl.totalPublished += 1;
  141. RyanMqttTestExitCritical();
  142. }
  143. }
  144. delay_us(1100); // 电脑配置不一样需要的时间也就不一样
  145. }
  146. // 等待消息处理完成
  147. int timeoutCount = 0;
  148. while (testData->publishedCount < MESSAGES_PER_THREAD && testData->receivedCount < MESSAGES_PER_THREAD)
  149. {
  150. delay(10);
  151. // 10秒超时
  152. timeoutCount++;
  153. if (timeoutCount > 1000)
  154. {
  155. RyanMqttLog_w("Thread %d: Timeout waiting for messages %d, %d", testData->threadIndex,
  156. testData->publishedCount, testData->receivedCount);
  157. break;
  158. }
  159. }
  160. cleanup:
  161. delay(50); // 让mqtt线程运行
  162. return NULL;
  163. }
  164. // 多客户端并发测试
  165. static RyanMqttError_e multiClientConcurrentTest(void)
  166. {
  167. RyanMqttError_e result = RyanMqttSuccessError;
  168. RyanMqttLog_i("Starting multi-client concurrent test with %d clients", CONCURRENT_CLIENTS);
  169. // 初始化测试控制结构
  170. RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
  171. // 初始化客户端
  172. result =
  173. RyanMqttTestInit(&g_testControl.client, RyanMqttTrue, RyanMqttFalse, 120, multiThreadEventHandle, NULL);
  174. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  175. // 创建测试线程
  176. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  177. {
  178. // struct sched_param param;
  179. // pthread_attr_init(&g_threadTestData[i].attr);
  180. // // 设置调度策略为实时策略
  181. // pthread_attr_setschedpolicy(&g_threadTestData[i].attr, SCHED_FIFO);
  182. // // 获取该策略的最大优先级
  183. // int max_prio = sched_get_priority_max(SCHED_FIFO);
  184. // param.sched_priority = max_prio;
  185. // // 设置优先级
  186. // pthread_attr_setschedparam(&g_threadTestData[i].attr, &param);
  187. int result222 = pthread_create(&g_threadTestData[i].threadId, NULL, concurrentPublishThread, NULL);
  188. // pthread_attr_destroy(&g_threadTestData[i].attr);
  189. if (result222 != 0)
  190. {
  191. RyanMqttLog_e("Failed to create thread %d", i);
  192. result = RyanMqttFailedError;
  193. goto cleanup;
  194. }
  195. }
  196. // 等待线程结束
  197. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  198. {
  199. pthread_join(g_threadTestData[i].threadId, NULL);
  200. }
  201. // 统计结果
  202. RyanMqttLog_i("Multi-client test results:");
  203. RyanMqttLog_i(" Total published: %d", g_testControl.totalPublished);
  204. RyanMqttLog_i(" Total received: %d", g_testControl.totalReceived);
  205. // 详细统计
  206. for (int i = 0; i < CONCURRENT_CLIENTS; i++)
  207. {
  208. RyanMqttLog_i(" Thread %d: Published=%d, Received=%d", i, g_threadTestData[i].publishedCount,
  209. g_threadTestData[i].receivedCount);
  210. }
  211. // 验证结果
  212. int expectedTotal = CONCURRENT_CLIENTS * MESSAGES_PER_THREAD;
  213. if (g_testControl.totalPublished != expectedTotal || g_testControl.totalReceived != expectedTotal)
  214. {
  215. RyanMqttLog_e("Test failed: Expected %d published and received, got %d and %d", expectedTotal,
  216. g_testControl.totalPublished, g_testControl.totalReceived);
  217. result = RyanMqttFailedError;
  218. }
  219. RyanMqttTestDestroyClient(g_testControl.client);
  220. cleanup:
  221. return result;
  222. }
  223. // 主多线程测试函数
  224. RyanMqttError_e RyanMqttMultiThreadSafetyTest(void)
  225. {
  226. RyanMqttError_e result = RyanMqttSuccessError;
  227. // 1. 多客户端并发测试
  228. result = multiClientConcurrentTest();
  229. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  230. // 检查内存泄漏
  231. checkMemory;
  232. return result;
  233. }