RyanMqttTestLinux.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. #define RyanMqttClientId ("RyanMqttTest888") // 填写mqtt客户端id,要求唯一
  2. #define RyanMqttHost ("127.0.0.1") // 填写你的mqtt服务器ip
  3. #define RyanMqttPort (1883) // mqtt服务器端口
  4. #define RyanMqttUserName ("test") // 填写你的用户名,没有填NULL
  5. #define RyanMqttPassword ("test") // 填写你的密码,没有填NULL
  6. #define rlogEnable // 是否使能日志
  7. #define rlogColorEnable // 是否使能日志颜色
  8. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  9. #define rlogTag "RyanMqttTest" // 日志tag
  10. #include <stdio.h>
  11. #include <stdint.h>
  12. #include <string.h>
  13. #include <stdlib.h>
  14. #include <unistd.h>
  15. #include <semaphore.h>
  16. #include "RyanMqttLog.h"
  17. #include "RyanMqttClient.h"
  18. #define delay(ms) usleep(ms * 1000)
  19. #define checkMemory \
  20. do \
  21. { \
  22. int area = 0, use = 0; \
  23. v_mcheck(&area, &use); \
  24. if (area != 0 || use != 0) \
  25. { \
  26. rlog_e("内存泄漏"); \
  27. while (1) \
  28. { \
  29. int area = 0, use = 0; \
  30. v_mcheck(&area, &use); \
  31. rlog_w("|||----------->>> area = %d, size = %d", area, use); \
  32. delay(3000); \
  33. } \
  34. } \
  35. } while (0)
  36. static uint32_t mqttTest[10] = {0};
  37. #define dataEventCount (0) // 接收到数据次数统计
  38. #define PublishedEventCount (1) // qos1和qos2发布成功的次数统计
  39. static void printfArrStr(char *buf, uint32_t len, char *userData)
  40. {
  41. rlog_raw("%s", userData);
  42. for (uint32_t i = 0; i < len; i++)
  43. rlog_raw("%x", buf[i]);
  44. rlog_raw("\r\n");
  45. }
  46. /**
  47. * @brief mqtt事件回调处理函数
  48. * 事件的详细定义可以查看枚举定义
  49. *
  50. * @param pclient
  51. * @param event
  52. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  53. */
  54. static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  55. {
  56. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  57. switch (event)
  58. {
  59. case RyanMqttEventError:
  60. break;
  61. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  62. rlog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  63. break;
  64. case RyanMqttEventDisconnected:
  65. rlog_w("mqtt断开连接回调 %d", *(int32_t *)eventData);
  66. break;
  67. case RyanMqttEventSubscribed:
  68. {
  69. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  70. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  71. break;
  72. }
  73. case RyanMqttEventSubscribedFaile:
  74. {
  75. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  76. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  77. break;
  78. }
  79. case RyanMqttEventUnSubscribed:
  80. {
  81. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  82. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  83. break;
  84. }
  85. case RyanMqttEventUnSubscribedFaile:
  86. {
  87. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  88. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  89. break;
  90. }
  91. case RyanMqttEventPublished:
  92. {
  93. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  94. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  95. mqttTest[PublishedEventCount]++;
  96. break;
  97. }
  98. case RyanMqttEventData:
  99. {
  100. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  101. rlog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d",
  102. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
  103. rlog_i("%.*s", msgData->payloadLen, msgData->payload);
  104. mqttTest[dataEventCount]++;
  105. break;
  106. }
  107. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  108. {
  109. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  110. rlog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  111. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  112. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  113. break;
  114. }
  115. case RyanMqttEventReconnectBefore:
  116. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  117. rlog_i("重连前事件回调");
  118. break;
  119. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  120. {
  121. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  122. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  123. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  124. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  125. break;
  126. }
  127. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  128. {
  129. // 这里选择直接丢弃该消息
  130. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  131. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  132. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  133. break;
  134. }
  135. case RyanMqttEventAckHandlerdiscard:
  136. {
  137. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  138. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  139. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  140. break;
  141. }
  142. case RyanMqttEventDestoryBefore:
  143. rlog_i("销毁mqtt客户端前回调");
  144. free(client->config.sendBuffer);
  145. free(client->config.recvBuffer);
  146. if (client->config.userData)
  147. sem_post((sem_t *)client->config.userData);
  148. break;
  149. default:
  150. break;
  151. }
  152. }
  153. static int32_t RyanMqttInitSync(RyanMqttClient_t **client, RyanMqttBool_e syncFlag)
  154. {
  155. char aaa[64];
  156. // 手动避免count的资源竞争了
  157. static uint32_t count = 0;
  158. snprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
  159. count++;
  160. sem_t *sem = NULL;
  161. if (syncFlag == RyanMqttTrue)
  162. {
  163. sem = (sem_t *)malloc(sizeof(sem_t));
  164. sem_init(sem, 0, 0);
  165. }
  166. RyanMqttError_e result = RyanMqttSuccessError;
  167. RyanMqttClientConfig_t mqttConfig = {
  168. .clientId = aaa,
  169. .userName = RyanMqttUserName,
  170. .password = RyanMqttPassword,
  171. .host = RyanMqttHost,
  172. .port = RyanMqttPort,
  173. .taskName = "mqttThread",
  174. .taskPrio = 16,
  175. .taskStack = 4096,
  176. .recvBufferSize = 1024,
  177. .sendBufferSize = 1024,
  178. .recvBuffer = malloc(1024),
  179. .sendBuffer = malloc(1024),
  180. .mqttVersion = 4,
  181. .ackHandlerRepeatCountWarning = 6,
  182. .ackHandlerCountWarning = 20,
  183. .autoReconnectFlag = RyanMqttTrue,
  184. .cleanSessionFlag = RyanMqttTrue,
  185. .reconnectTimeout = 3000,
  186. .recvTimeout = 5000,
  187. .sendTimeout = 2000,
  188. .ackTimeout = 10000,
  189. .keepaliveTimeoutS = 120,
  190. .mqttEventHandle = mqttEventHandle,
  191. .userData = sem};
  192. // 初始化mqtt客户端
  193. result = RyanMqttInit(client);
  194. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  195. // 注册需要的事件回调
  196. result = RyanMqttRegisterEventId(*client, RyanMqttEventAnyId);
  197. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  198. // 设置mqtt客户端config
  199. result = RyanMqttSetConfig(*client, &mqttConfig);
  200. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  201. // 设置遗嘱消息
  202. result = RyanMqttSetLwt(*client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  203. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  204. // 启动mqtt客户端线程
  205. result = RyanMqttStart(*client);
  206. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  207. while (RyanMqttConnectState != RyanMqttGetState(*client))
  208. {
  209. delay(100);
  210. }
  211. return 0;
  212. }
  213. static void RyanMqttDestorySync(RyanMqttClient_t *client)
  214. {
  215. sem_t *sem = (sem_t *)client->config.userData;
  216. // 启动mqtt客户端线程
  217. RyanMqttDestroy(client);
  218. sem_wait(sem);
  219. sem_destroy(sem);
  220. free(sem);
  221. delay(3);
  222. }
  223. static RyanMqttError_e RyanMqttSubscribeTest(RyanMqttQos_e qos)
  224. {
  225. #define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
  226. RyanMqttClient_t *client;
  227. RyanMqttInitSync(&client, RyanMqttTrue);
  228. char *subscribeArr[] = {
  229. "testlinux/pub",
  230. "testlinux/pub2",
  231. "testlinux/pub3",
  232. "testlinux/pub4",
  233. "testlinux/pub5",
  234. };
  235. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  236. RyanMqttSubscribe(client, subscribeArr[i], qos);
  237. RyanMqttMsgHandler_t msgHandles[10] = {0};
  238. int32_t subscribeNum = 0;
  239. int32_t result = RyanMqttSuccessError;
  240. delay(100);
  241. for (int32_t i = 0; i < 600; i++)
  242. {
  243. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  244. if (result == RyanMqttNoRescourceError)
  245. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  246. if (subscribeNum == getArraySize(subscribeArr))
  247. break;
  248. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  249. for (int32_t i = 0; i < subscribeNum; i++)
  250. rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  251. if (i > 500)
  252. return RyanMqttFailedError;
  253. delay(100);
  254. }
  255. for (int32_t i = 0; i < subscribeNum; i++)
  256. {
  257. uint8_t flag = 0;
  258. for (uint8_t j = 0; j < getArraySize(subscribeArr); j++)
  259. {
  260. if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
  261. flag = 1;
  262. }
  263. if (flag != 1)
  264. {
  265. rlog_i("主题不匹配: %d", msgHandles[i].topic);
  266. return RyanMqttFailedError;
  267. }
  268. }
  269. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  270. RyanMqttUnSubscribe(client, subscribeArr[i]);
  271. for (int32_t i = 0; i < 600; i++)
  272. {
  273. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  274. if (result == RyanMqttNoRescourceError)
  275. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  276. if (0 == subscribeNum)
  277. break;
  278. if (i > 500)
  279. return RyanMqttFailedError;
  280. delay(100);
  281. }
  282. RyanMqttDestorySync(client);
  283. return RyanMqttSuccessError;
  284. }
  285. static RyanMqttError_e RyanMqttUnSubscribeTest(RyanMqttQos_e qos)
  286. {
  287. int count = 2;
  288. #define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
  289. RyanMqttClient_t *client;
  290. RyanMqttInitSync(&client, RyanMqttTrue);
  291. char *subscribeArr[] = {
  292. "testlinux/pub",
  293. "testlinux/pub2",
  294. "testlinux/pub3",
  295. "testlinux/pub4",
  296. "testlinux/pub5",
  297. };
  298. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  299. RyanMqttSubscribe(client, subscribeArr[i], qos);
  300. RyanMqttMsgHandler_t msgHandles[10] = {0};
  301. int32_t subscribeNum = 0;
  302. int32_t result = RyanMqttSuccessError;
  303. delay(100);
  304. for (int32_t i = 0; i < 600; i++)
  305. {
  306. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  307. if (result == RyanMqttNoRescourceError)
  308. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  309. if (subscribeNum == getArraySize(subscribeArr))
  310. break;
  311. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  312. if (i > 500)
  313. return RyanMqttFailedError;
  314. delay(100);
  315. }
  316. // 取消订阅指定的数值
  317. for (uint8_t i = 0; i < getArraySize(subscribeArr) - count - 1; i++)
  318. RyanMqttUnSubscribe(client, subscribeArr[i]);
  319. delay(100);
  320. for (int32_t i = 0; i < 600; i++)
  321. {
  322. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  323. if (result == RyanMqttNoRescourceError)
  324. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  325. if (subscribeNum == getArraySize(subscribeArr) - count)
  326. break;
  327. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr) - count);
  328. if (i > 500)
  329. return RyanMqttFailedError;
  330. delay(100);
  331. }
  332. for (int32_t i = 0; i < subscribeNum; i++)
  333. {
  334. uint8_t flag = 0;
  335. for (uint8_t j = count - 1; j < getArraySize(subscribeArr); j++)
  336. {
  337. if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
  338. flag = 1;
  339. }
  340. if (flag != 1)
  341. {
  342. rlog_i("主题不匹配: %d", msgHandles[i].topic);
  343. return RyanMqttFailedError;
  344. }
  345. }
  346. RyanMqttDestorySync(client);
  347. return RyanMqttSuccessError;
  348. }
  349. static int RyanMqttPublishTest(RyanMqttQos_e qos, uint32_t count, uint32_t delayms)
  350. {
  351. RyanMqttClient_t *client;
  352. RyanMqttInitSync(&client, RyanMqttTrue);
  353. RyanMqttSubscribe(client, "testlinux/pub", qos);
  354. mqttTest[PublishedEventCount] = 0;
  355. mqttTest[dataEventCount] = 0;
  356. for (uint32_t i = 0; i < count; i++)
  357. {
  358. RyanMqttError_e result = RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), qos, RyanMqttFalse);
  359. if (RyanMqttSuccessError != result)
  360. {
  361. rlog_e("QOS发布错误 Qos: %d, result: %d", qos, result);
  362. return -1;
  363. }
  364. if (delayms)
  365. delay(delayms);
  366. }
  367. for (uint32_t i = 0; i < 60; i++)
  368. {
  369. delay(1000);
  370. uint8_t result = 0;
  371. if (RyanMqttQos0 == qos)
  372. {
  373. if (count == mqttTest[dataEventCount])
  374. result = 1;
  375. }
  376. else if (mqttTest[PublishedEventCount] == count && mqttTest[PublishedEventCount] == mqttTest[dataEventCount])
  377. result = 1;
  378. if (!result)
  379. {
  380. rlog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos, mqttTest[PublishedEventCount], mqttTest[dataEventCount]);
  381. return -1;
  382. }
  383. else
  384. {
  385. rlog_i("QOS测试成功 Qos: %d", qos);
  386. break;
  387. }
  388. }
  389. RyanMqttUnSubscribe(client, "testlinux/pub");
  390. RyanMqttDestorySync(client);
  391. return 0;
  392. }
  393. static void RyanMqttConnectDestory(uint32_t count, uint32_t delayms)
  394. {
  395. for (uint32_t i = 0; i < count; i++)
  396. {
  397. RyanMqttClient_t *client;
  398. RyanMqttInitSync(&client, i == count - 1 ? RyanMqttTrue : RyanMqttFalse);
  399. RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), RyanMqttQos0, RyanMqttFalse);
  400. if (delayms)
  401. delay(delayms);
  402. if (i == count - 1) // 最后一次同步释放
  403. {
  404. RyanMqttDestorySync(client);
  405. delay(1000);
  406. }
  407. else
  408. RyanMqttDestroy(client);
  409. }
  410. }
  411. static void RyanMqttReconnectTest(uint32_t count, uint32_t delayms)
  412. {
  413. RyanMqttClient_t *client;
  414. RyanMqttInitSync(&client, RyanMqttTrue);
  415. for (uint32_t i = 0; i < count; i++)
  416. {
  417. RyanMqttDisconnect(client, i % 2 == 0);
  418. while (RyanMqttConnectState != RyanMqttGetState(client))
  419. {
  420. delay(1);
  421. }
  422. if (delayms)
  423. delay(delayms);
  424. }
  425. RyanMqttDestorySync(client);
  426. }
  427. static RyanMqttError_e RyanMqttKeepAliveTest()
  428. {
  429. RyanMqttClient_t *client;
  430. RyanMqttError_e result = RyanMqttSuccessError;
  431. sem_t *sem = (sem_t *)malloc(sizeof(sem_t));
  432. sem_init(sem, 0, 0);
  433. RyanMqttClientConfig_t mqttConfig = {
  434. .clientId = "dfawerwdfgaeruyfku",
  435. .userName = RyanMqttUserName,
  436. .password = RyanMqttPassword,
  437. .host = RyanMqttHost,
  438. .port = RyanMqttPort,
  439. .taskName = "mqttThread",
  440. .taskPrio = 16,
  441. .taskStack = 4096,
  442. .recvBufferSize = 1024,
  443. .sendBufferSize = 1024,
  444. .recvBuffer = malloc(1024),
  445. .sendBuffer = malloc(1024),
  446. .mqttVersion = 4,
  447. .ackHandlerRepeatCountWarning = 6,
  448. .ackHandlerCountWarning = 20,
  449. .autoReconnectFlag = RyanMqttTrue,
  450. .cleanSessionFlag = RyanMqttTrue,
  451. .reconnectTimeout = 3000,
  452. .recvTimeout = 5000,
  453. .sendTimeout = 2000,
  454. .ackTimeout = 10000,
  455. .keepaliveTimeoutS = 30,
  456. .mqttEventHandle = mqttEventHandle,
  457. .userData = sem};
  458. // 初始化mqtt客户端
  459. result = RyanMqttInit(&client);
  460. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  461. // 注册需要的事件回调
  462. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  463. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  464. // 设置mqtt客户端config
  465. result = RyanMqttSetConfig(client, &mqttConfig);
  466. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  467. // 启动mqtt客户端线程
  468. result = RyanMqttStart(client);
  469. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  470. while (RyanMqttConnectState != RyanMqttGetState(client))
  471. {
  472. delay(100);
  473. }
  474. // recvTimeout = 5000,每过 5000 ms检查一次心跳周期,如果超过 3 / 4 时间就会进行心跳保活
  475. for (uint32_t i = 0; i < 90; i++)
  476. {
  477. if (RyanMqttConnectState != RyanMqttGetState(client))
  478. {
  479. rlog_e("mqtt断连了");
  480. return RyanMqttFailedError;
  481. }
  482. rlog_w("心跳倒计时: %d", platformTimerRemain(&client->keepaliveTimer));
  483. delay(1000);
  484. }
  485. RyanMqttDestorySync(client);
  486. return RyanMqttSuccessError;
  487. }
  488. // !当测试程序出错时,并不会回收内存。交由父进程进行回收
  489. int main()
  490. {
  491. vallocInit();
  492. int result = 0;
  493. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_e, { goto __exit; });
  494. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_e, { goto __exit; });
  495. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_e, { goto __exit; });
  496. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_e, { goto __exit; });
  497. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_e, { goto __exit; });
  498. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_e, { goto __exit; });
  499. // 发布 & 订阅 qos 测试
  500. result = RyanMqttPublishTest(RyanMqttQos0, 1000, 0);
  501. if (result != 0)
  502. goto __exit;
  503. checkMemory;
  504. result = RyanMqttPublishTest(RyanMqttQos1, 1000, 1);
  505. if (result != 0)
  506. goto __exit;
  507. checkMemory;
  508. result = RyanMqttPublishTest(RyanMqttQos2, 1000, 1);
  509. if (result != 0)
  510. goto __exit;
  511. checkMemory;
  512. RyanMqttConnectDestory(100, 0);
  513. checkMemory;
  514. RyanMqttReconnectTest(3, 0);
  515. checkMemory;
  516. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttKeepAliveTest(), RyanMqttFailedError, rlog_e, { goto __exit; });
  517. __exit:
  518. while (1)
  519. {
  520. displayMem();
  521. delay(10 * 1000);
  522. }
  523. return 0;
  524. }