RyanMqttTest.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. #include "RyanMqttTest.h"
  2. /**
  3. * @brief mqtt事件回调处理函数
  4. * 事件的详细定义可以查看枚举定义
  5. *
  6. * @param pclient
  7. * @param event
  8. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  9. */
  10. void mqttEventBaseHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  11. {
  12. switch (event)
  13. {
  14. case RyanMqttEventError: break;
  15. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  16. RyanMqttLog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  17. break;
  18. case RyanMqttEventDisconnected: RyanMqttLog_w("mqtt断开连接回调 %d", *(int32_t *)eventData); break;
  19. case RyanMqttEventSubscribed: {
  20. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  21. RyanMqttLog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  22. break;
  23. }
  24. case RyanMqttEventSubscribedFailed: {
  25. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  26. RyanMqttLog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  27. break;
  28. }
  29. case RyanMqttEventUnSubscribed: {
  30. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  31. RyanMqttLog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  32. break;
  33. }
  34. case RyanMqttEventUnSubscribedFailed: {
  35. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  36. RyanMqttLog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  37. break;
  38. }
  39. case RyanMqttEventPublished: {
  40. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  41. RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  42. break;
  43. }
  44. case RyanMqttEventData: {
  45. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  46. RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  47. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  48. RyanMqttLog_i("%.*s", msgData->payloadLen, msgData->payload);
  49. break;
  50. }
  51. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  52. {
  53. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  54. RyanMqttLog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  55. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
  56. ackHandler->msgHandler->qos);
  57. // printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  58. break;
  59. }
  60. case RyanMqttEventReconnectBefore:
  61. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  62. RyanMqttLog_i("重连前事件回调");
  63. break;
  64. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  65. {
  66. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  67. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  68. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  69. RyanMqttLog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  70. break;
  71. }
  72. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  73. {
  74. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  75. // 这里选择直接丢弃该消息
  76. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  77. RyanMqttLog_e("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d",
  78. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
  79. ackHandler->msgHandler->qos);
  80. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  81. break;
  82. }
  83. case RyanMqttEventAckHandlerDiscard: {
  84. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  85. RyanMqttLog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType,
  86. ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  87. break;
  88. }
  89. case RyanMqttEventDestroyBefore:
  90. RyanMqttLog_w("销毁mqtt客户端前回调");
  91. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  92. struct RyanMqttTestEventUserData *eventUserData =
  93. (struct RyanMqttTestEventUserData *)client->config.userData;
  94. if (RyanMqttTestEventUserDataMagic != eventUserData->magic)
  95. {
  96. RyanMqttLog_e("eventUserData野指针");
  97. break;
  98. }
  99. if (eventUserData->syncFlag)
  100. {
  101. sem_post(&eventUserData->sem);
  102. }
  103. break;
  104. case RyanMqttEventUnsubscribedData: {
  105. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  106. RyanMqttLog_i("接收到未匹配任何订阅主题的报文事件 topic: %.*s, packetId: %d, payload len: %d",
  107. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
  108. break;
  109. }
  110. default: break;
  111. }
  112. }
  113. RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncFlag, RyanMqttBool_e autoReconnectFlag,
  114. uint16_t keepaliveTimeoutS, RyanMqttEventHandle mqttEventCallback, void *userData)
  115. {
  116. // 手动避免count的资源竞争了
  117. static uint32_t count = 0;
  118. char aaa[64];
  119. RyanMqttTestEnableCritical();
  120. count++;
  121. RyanMqttTestExitCritical();
  122. RyanMqttSnprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
  123. struct RyanMqttTestEventUserData *eventUserData =
  124. (struct RyanMqttTestEventUserData *)malloc(sizeof(struct RyanMqttTestEventUserData));
  125. if (NULL == eventUserData)
  126. {
  127. RyanMqttLog_e("内存不足");
  128. return RyanMqttNotEnoughMemError;
  129. }
  130. RyanMqttMemset(eventUserData, 0, sizeof(struct RyanMqttTestEventUserData));
  131. eventUserData->magic = RyanMqttTestEventUserDataMagic;
  132. eventUserData->syncFlag = syncFlag;
  133. eventUserData->userData = userData;
  134. if (eventUserData->syncFlag)
  135. {
  136. sem_init(&eventUserData->sem, 0, 0);
  137. }
  138. RyanMqttError_e result = RyanMqttSuccessError;
  139. RyanMqttClientConfig_t mqttConfig = {.clientId = aaa,
  140. .userName = RyanMqttUserName,
  141. .password = RyanMqttPassword,
  142. .host = RyanMqttHost,
  143. .port = RyanMqttPort,
  144. .taskName = "mqttThread",
  145. .taskPrio = 16,
  146. .taskStack = 4096,
  147. .mqttVersion = 4,
  148. .ackHandlerRepeatCountWarning = 600,
  149. .ackHandlerCountWarning = 60000,
  150. .autoReconnectFlag = autoReconnectFlag,
  151. .cleanSessionFlag = RyanMqttTrue,
  152. .reconnectTimeout = RyanMqttReconnectTimeout,
  153. .recvTimeout = RyanMqttRecvTimeout,
  154. .sendTimeout = RyanMqttSendTimeout,
  155. .ackTimeout = RyanMqttAckTimeout,
  156. .keepaliveTimeoutS = keepaliveTimeoutS,
  157. .mqttEventHandle =
  158. mqttEventCallback ? mqttEventCallback : mqttEventBaseHandle,
  159. .userData = eventUserData};
  160. // 初始化mqtt客户端
  161. result = RyanMqttInit(client);
  162. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  163. // 注册需要的事件回调
  164. result = RyanMqttRegisterEventId(*client, RyanMqttEventAnyId);
  165. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  166. // 设置mqtt客户端config
  167. result = RyanMqttSetConfig(*client, &mqttConfig);
  168. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  169. // 重复设定一次测试
  170. result = RyanMqttSetConfig(*client, &mqttConfig);
  171. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  172. // 获取config测试
  173. RyanMqttClientConfig_t *mqttConfig22;
  174. result = RyanMqttGetConfig(*client, &mqttConfig22);
  175. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  176. RyanMqttFreeConfigFromGet(mqttConfig22);
  177. // 设置遗嘱消息
  178. result = RyanMqttSetLwt(*client, "pub/lwt/test", "this is will", RyanMqttStrlen("this is will"), RyanMqttQos2,
  179. 0);
  180. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  181. // 重复设定一次测试
  182. result = RyanMqttSetLwt(*client, "pub/lwt/test", "this is will", RyanMqttStrlen("this is will"), RyanMqttQos2,
  183. 0);
  184. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  185. // 启动mqtt客户端线程
  186. result = RyanMqttStart(*client);
  187. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
  188. uint32_t timeout_ms = 30000; // 30 seconds
  189. uint32_t elapsed = 0;
  190. while (RyanMqttConnectState != RyanMqttGetState(*client) && elapsed < timeout_ms)
  191. {
  192. delay(100);
  193. elapsed += 100;
  194. }
  195. if (RyanMqttConnectState != RyanMqttGetState(*client))
  196. {
  197. // 不处理错误,测试代码
  198. RyanMqttLog_e("Connection timeout after %d ms", timeout_ms);
  199. return RyanMqttFailedError;
  200. }
  201. return RyanMqttSuccessError;
  202. }
  203. typedef struct
  204. {
  205. void *ptr;
  206. timer_t timerid;
  207. } FreeTimerArg;
  208. static void RyanMqttTestFreeTimerCallback(union sigval arg)
  209. {
  210. FreeTimerArg *fta = arg.sival_ptr;
  211. free(fta->ptr);
  212. RyanMqttTestEnableCritical();
  213. destroyCount--;
  214. RyanMqttTestExitCritical();
  215. timer_t timerid = fta->timerid;
  216. free(fta); // 释放参数结构
  217. if (0 != timer_delete(timerid))
  218. {
  219. RyanMqttLog_e("timer_delete failed");
  220. }
  221. }
  222. static void RyanMqttTestScheduleFreeAfterMs(void *ptr, uint32_t delayMs)
  223. {
  224. RyanMqttTestEnableCritical();
  225. destroyCount++;
  226. RyanMqttTestExitCritical();
  227. timer_t timerid;
  228. struct sigevent sev = {0};
  229. struct itimerspec its = {0};
  230. FreeTimerArg *fta = malloc(sizeof(FreeTimerArg));
  231. fta->ptr = ptr;
  232. sev.sigev_notify = SIGEV_THREAD;
  233. sev.sigev_value.sival_ptr = fta; // 传递给回调的参数
  234. sev.sigev_notify_function = RyanMqttTestFreeTimerCallback; // 定时到期时调用的函数
  235. if (0 != timer_create(CLOCK_REALTIME, &sev, &timerid))
  236. {
  237. RyanMqttLog_e("timer_create failed");
  238. free(fta);
  239. return;
  240. }
  241. fta->timerid = timerid;
  242. // 毫秒转秒和纳秒
  243. its.it_value.tv_sec = delayMs / 1000;
  244. its.it_value.tv_nsec = (uint32_t)((delayMs % 1000) * 1000000);
  245. if (0 != timer_settime(timerid, 0, &its, NULL))
  246. {
  247. RyanMqttLog_e("timer_settime failed");
  248. if (0 != timer_delete(timerid))
  249. {
  250. RyanMqttLog_e("timer_delete failed");
  251. }
  252. free(fta);
  253. return;
  254. }
  255. }
  256. RyanMqttError_e RyanMqttTestDestroyClient(RyanMqttClient_t *client)
  257. {
  258. struct RyanMqttTestEventUserData *eventUserData = (struct RyanMqttTestEventUserData *)client->config.userData;
  259. if (RyanMqttTestEventUserDataMagic != eventUserData->magic)
  260. {
  261. RyanMqttLog_e("eventUserData野指针");
  262. }
  263. RyanMqttDisconnect(client, RyanMqttTrue);
  264. // 启动mqtt客户端线程
  265. RyanMqttDestroy(client);
  266. if (eventUserData->syncFlag)
  267. {
  268. sem_wait(&eventUserData->sem);
  269. sem_destroy(&eventUserData->sem);
  270. delay(20); // 等待mqtt线程回收资源
  271. free(eventUserData);
  272. }
  273. else
  274. {
  275. RyanMqttTestScheduleFreeAfterMs(eventUserData, RyanMqttRecvTimeout + 20);
  276. }
  277. return RyanMqttSuccessError;
  278. }
  279. RyanMqttError_e checkAckList(RyanMqttClient_t *client)
  280. {
  281. RyanMqttLog_w("等待检查ack链表,等待 recvTime: %d", client->config.recvTimeout);
  282. delay(RyanMqttRecvTimeout + 50);
  283. platformMutexLock(client->config.userData, &client->ackHandleLock);
  284. int ackEmpty = RyanMqttListIsEmpty(&client->ackHandlerList);
  285. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  286. if (!ackEmpty)
  287. {
  288. RyanMqttLog_e("mqtt空间 ack链表不为空");
  289. return RyanMqttFailedError;
  290. }
  291. platformMutexLock(client->config.userData, &client->userSessionLock);
  292. int userAckEmpty = RyanMqttListIsEmpty(&client->userAckHandlerList);
  293. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  294. if (!userAckEmpty)
  295. {
  296. RyanMqttLog_e("用户空间 ack链表不为空");
  297. return RyanMqttFailedError;
  298. }
  299. platformMutexLock(client->config.userData, &client->msgHandleLock);
  300. int msgEmpty = RyanMqttListIsEmpty(&client->msgHandlerList);
  301. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  302. if (!msgEmpty)
  303. {
  304. RyanMqttLog_e("mqtt空间 msg链表不为空");
  305. return RyanMqttFailedError;
  306. }
  307. return RyanMqttSuccessError;
  308. }
  309. // 注意测试代码只有特定emqx服务器才可以通过,用户的emqx服务器大概率通过不了,
  310. // 因为有些依赖emqx的配置,比如消息重试间隔,最大飞行窗口,最大消息队列等
  311. // todo 增加session测试
  312. // !当测试程序出错时,并不会回收内存。交由父进程进行回收
  313. int main(void)
  314. {
  315. RyanMqttTestUtileInit();
  316. RyanMqttError_e result = RyanMqttSuccessError;
  317. uint32_t testRunCount = 0;
  318. uint32_t funcStartMs;
  319. #define runTestWithLogAndTimer(fun) \
  320. do \
  321. { \
  322. testRunCount++; \
  323. RyanMqttLog_raw("┌── [TEST %d] 开始执行: " #fun "()\r\n", testRunCount); \
  324. funcStartMs = platformUptimeMs(); \
  325. result = fun(); \
  326. RyanMqttLog_raw("└── [TEST %d] 结束执行: 返回值 = %d %s | 耗时: %d ms\x1b[0m\r\n\r\n", testRunCount, \
  327. result, (result == RyanMqttSuccessError) ? "✅" : "❌", \
  328. platformUptimeMs() - funcStartMs); \
  329. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; }); \
  330. } while (0)
  331. uint32_t totalElapsedStartMs = platformUptimeMs();
  332. runTestWithLogAndTimer(RyanMqttPublicApiParamCheckTest);
  333. runTestWithLogAndTimer(RyanMqttMemoryFaultToleranceTest);
  334. runTestWithLogAndTimer(RyanMqttSubTest);
  335. runTestWithLogAndTimer(RyanMqttPubTest);
  336. runTestWithLogAndTimer(RyanMqttDestroyTest);
  337. runTestWithLogAndTimer(RyanMqttNetworkFaultToleranceMemoryTest);
  338. runTestWithLogAndTimer(RyanMqttNetworkFaultQosResilienceTest);
  339. runTestWithLogAndTimer(RyanMqttMultiThreadMultiClientTest);
  340. runTestWithLogAndTimer(RyanMqttMultiThreadSafetyTest);
  341. runTestWithLogAndTimer(RyanMqttReconnectTest);
  342. runTestWithLogAndTimer(RyanMqttKeepAliveTest);
  343. // 暂时不开放出来
  344. // runTestWithLogAndTimer(RyanMqttWildcardTest);
  345. __exit:
  346. RyanMqttLog_raw("测试总耗时: %.3f S\r\n", (platformUptimeMs() - totalElapsedStartMs) / 1000.0);
  347. if (RyanMqttSuccessError == result)
  348. {
  349. RyanMqttLog_raw("测试成功---------------------------\r\n");
  350. }
  351. else
  352. {
  353. RyanMqttLog_raw("测试失败---------------------------\r\n");
  354. }
  355. for (uint32_t i = 0; i < 3; i++)
  356. {
  357. displayMem();
  358. delay(300);
  359. }
  360. RyanMqttTestUtileDeInit();
  361. return 0;
  362. }