RyanMqttTestLinux.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. #define RyanMqttClientId ("RyanMqttTest888") // 填写mqtt客户端id,要求唯一
  2. #define RyanMqttHost ("127.0.0.1") // 填写你的mqtt服务器ip
  3. #define RyanMqttPort (1883) // mqtt服务器端口
  4. #define RyanMqttUserName ("test") // 填写你的用户名,没有填NULL
  5. #define RyanMqttPassword ("test") // 填写你的密码,没有填NULL
  6. #define rlogEnable // 是否使能日志
  7. #define rlogColorEnable // 是否使能日志颜色
  8. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  9. #define rlogTag "RyanMqttTest" // 日志tag
  10. #include <stdio.h>
  11. #include <stdint.h>
  12. #include <string.h>
  13. #include <stdlib.h>
  14. #include <unistd.h>
  15. #include <semaphore.h>
  16. #include "RyanMqttLog.h"
  17. #include "RyanMqttClient.h"
  18. #define delay(ms) usleep(ms * 1000)
  19. #define checkMemory \
  20. do \
  21. { \
  22. int area = 0, use = 0; \
  23. v_mcheck(&area, &use); \
  24. if (area != 0 || use != 0) \
  25. { \
  26. rlog_e("内存泄漏"); \
  27. while (1) \
  28. { \
  29. int area = 0, use = 0; \
  30. v_mcheck(&area, &use); \
  31. rlog_w("|||----------->>> area = %d, size = %d", area, use); \
  32. delay(3000); \
  33. } \
  34. } \
  35. } while (0)
  36. static uint32_t mqttTest[10] = {0};
  37. #define dataEventCount (0) // 接收到数据次数统计
  38. #define PublishedEventCount (1) // qos1和qos2发布成功的次数统计
  39. static void printfArrStr(char *buf, uint32_t len, char *userData)
  40. {
  41. rlog_raw("%s", userData);
  42. for (uint32_t i = 0; i < len; i++)
  43. rlog_raw("%x", buf[i]);
  44. rlog_raw("\r\n");
  45. }
  46. /**
  47. * @brief mqtt事件回调处理函数
  48. * 事件的详细定义可以查看枚举定义
  49. *
  50. * @param pclient
  51. * @param event
  52. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  53. */
  54. static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
  55. {
  56. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  57. switch (event)
  58. {
  59. case RyanMqttEventError:
  60. break;
  61. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  62. rlog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  63. break;
  64. case RyanMqttEventDisconnected:
  65. rlog_w("mqtt断开连接回调 %d", *(int32_t *)eventData);
  66. break;
  67. case RyanMqttEventSubscribed:
  68. {
  69. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  70. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  71. break;
  72. }
  73. case RyanMqttEventSubscribedFaile:
  74. {
  75. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  76. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  77. break;
  78. }
  79. case RyanMqttEventUnSubscribed:
  80. {
  81. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  82. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  83. break;
  84. }
  85. case RyanMqttEventUnSubscribedFaile:
  86. {
  87. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  88. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  89. break;
  90. }
  91. case RyanMqttEventPublished:
  92. {
  93. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  94. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  95. mqttTest[PublishedEventCount]++;
  96. break;
  97. }
  98. case RyanMqttEventData:
  99. {
  100. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  101. rlog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d",
  102. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
  103. rlog_i("%.*s", msgData->payloadLen, msgData->payload);
  104. mqttTest[dataEventCount]++;
  105. break;
  106. }
  107. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  108. {
  109. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  110. rlog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  111. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  112. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  113. break;
  114. }
  115. case RyanMqttEventReconnectBefore:
  116. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  117. rlog_i("重连前事件回调");
  118. break;
  119. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  120. {
  121. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  122. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  123. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  124. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  125. break;
  126. }
  127. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  128. {
  129. // 这里选择直接丢弃该消息
  130. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  131. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  132. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  133. break;
  134. }
  135. case RyanMqttEventAckHandlerdiscard:
  136. {
  137. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  138. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  139. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  140. break;
  141. }
  142. case RyanMqttEventDestoryBefore:
  143. rlog_i("销毁mqtt客户端前回调");
  144. free(client->config.sendBuffer);
  145. free(client->config.recvBuffer);
  146. if (client->config.userData)
  147. sem_post((sem_t *)client->config.userData);
  148. break;
  149. default:
  150. break;
  151. }
  152. }
  153. static int32_t RyanMqttInitSync(RyanMqttClient_t **client, RyanMqttBool_e syncFlag)
  154. {
  155. char aaa[64];
  156. // 手动避免count的资源竞争了
  157. static uint32_t count = 0;
  158. snprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
  159. count++;
  160. sem_t *sem = NULL;
  161. if (syncFlag == RyanMqttTrue)
  162. {
  163. sem = (sem_t *)malloc(sizeof(sem_t));
  164. sem_init(sem, 0, 0);
  165. }
  166. RyanMqttError_e result = RyanMqttSuccessError;
  167. RyanMqttClientConfig_t mqttConfig = {
  168. .clientId = aaa,
  169. .userName = RyanMqttUserName,
  170. .password = RyanMqttPassword,
  171. .host = RyanMqttHost,
  172. .port = RyanMqttPort,
  173. .taskName = "mqttThread",
  174. .taskPrio = 16,
  175. .taskStack = 4096,
  176. .recvBufferSize = 1024,
  177. .sendBufferSize = 1024,
  178. .recvBuffer = malloc(1024),
  179. .sendBuffer = malloc(1024),
  180. .mqttVersion = 4,
  181. .ackHandlerRepeatCountWarning = 6,
  182. .ackHandlerCountWarning = 20,
  183. .autoReconnectFlag = RyanMqttTrue,
  184. .cleanSessionFlag = RyanMqttTrue,
  185. .reconnectTimeout = 3000,
  186. .recvTimeout = 5000,
  187. .sendTimeout = 2000,
  188. .ackTimeout = 10000,
  189. .keepaliveTimeoutS = 120,
  190. .mqttEventHandle = mqttEventHandle,
  191. .userData = sem};
  192. // 初始化mqtt客户端
  193. result = RyanMqttInit(client);
  194. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  195. // 注册需要的事件回调
  196. result = RyanMqttRegisterEventId(*client, RyanMqttEventAnyId);
  197. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  198. // 设置mqtt客户端config
  199. result = RyanMqttSetConfig(*client, &mqttConfig);
  200. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  201. // 设置遗嘱消息
  202. result = RyanMqttSetLwt(*client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  203. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  204. // 启动mqtt客户端线程
  205. result = RyanMqttStart(*client);
  206. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  207. while (RyanMqttConnectState != RyanMqttGetState(*client))
  208. {
  209. delay(100);
  210. }
  211. }
  212. static void RyanMqttDestorySync(RyanMqttClient_t *client)
  213. {
  214. sem_t *sem = (sem_t *)client->config.userData;
  215. // 启动mqtt客户端线程
  216. RyanMqttDestroy(client);
  217. sem_wait(sem);
  218. sem_destroy(sem);
  219. free(sem);
  220. delay(3);
  221. }
  222. static RyanMqttError_e RyanMqttSubscribeTest(RyanMqttQos_e qos)
  223. {
  224. #define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
  225. RyanMqttClient_t *client;
  226. RyanMqttInitSync(&client, RyanMqttTrue);
  227. char *subscribeArr[] = {
  228. "testlinux/pub",
  229. "testlinux/pub2",
  230. "testlinux/pub3",
  231. "testlinux/pub4",
  232. "testlinux/pub5",
  233. };
  234. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  235. RyanMqttSubscribe(client, subscribeArr[i], qos);
  236. RyanMqttMsgHandler_t msgHandles[10] = {0};
  237. int32_t subscribeNum = 0;
  238. int32_t result = RyanMqttSuccessError;
  239. delay(100);
  240. for (int32_t i = 0; i < 600; i++)
  241. {
  242. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  243. if (result == RyanMqttNoRescourceError)
  244. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  245. if (subscribeNum == getArraySize(subscribeArr))
  246. break;
  247. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  248. for (int32_t i = 0; i < subscribeNum; i++)
  249. rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  250. if (i > 500)
  251. return RyanMqttFailedError;
  252. delay(100);
  253. }
  254. for (int32_t i = 0; i < subscribeNum; i++)
  255. {
  256. uint8_t flag = 0;
  257. for (uint8_t j = 0; j < getArraySize(subscribeArr); j++)
  258. {
  259. if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
  260. flag = 1;
  261. }
  262. if (flag != 1)
  263. {
  264. rlog_i("主题不匹配: %d", msgHandles[i].topic);
  265. return RyanMqttFailedError;
  266. }
  267. }
  268. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  269. RyanMqttUnSubscribe(client, subscribeArr[i]);
  270. RyanMqttDestorySync(client);
  271. return RyanMqttSuccessError;
  272. }
  273. static RyanMqttError_e RyanMqttUnSubscribeTest(RyanMqttQos_e qos)
  274. {
  275. int count = 2;
  276. #define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
  277. RyanMqttClient_t *client;
  278. RyanMqttInitSync(&client, RyanMqttTrue);
  279. char *subscribeArr[] = {
  280. "testlinux/pub",
  281. "testlinux/pub2",
  282. "testlinux/pub3",
  283. "testlinux/pub4",
  284. "testlinux/pub5",
  285. };
  286. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  287. RyanMqttSubscribe(client, subscribeArr[i], qos);
  288. RyanMqttMsgHandler_t msgHandles[10] = {0};
  289. int32_t subscribeNum = 0;
  290. int32_t result = RyanMqttSuccessError;
  291. delay(100);
  292. for (int32_t i = 0; i < 600; i++)
  293. {
  294. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  295. if (result == RyanMqttNoRescourceError)
  296. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  297. if (subscribeNum == getArraySize(subscribeArr))
  298. break;
  299. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  300. if (i > 500)
  301. return RyanMqttFailedError;
  302. delay(100);
  303. }
  304. for (uint8_t i = 0; i < getArraySize(subscribeArr) - count - 1; i++)
  305. RyanMqttUnSubscribe(client, subscribeArr[i]);
  306. delay(100);
  307. for (int32_t i = 0; i < 600; i++)
  308. {
  309. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  310. if (result == RyanMqttNoRescourceError)
  311. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  312. if (subscribeNum == getArraySize(subscribeArr) - count)
  313. break;
  314. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr) - count);
  315. if (i > 500)
  316. return RyanMqttFailedError;
  317. delay(100);
  318. }
  319. for (int32_t i = 0; i < subscribeNum; i++)
  320. {
  321. uint8_t flag = 0;
  322. for (uint8_t j = count - 1; j < getArraySize(subscribeArr); j++)
  323. {
  324. if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
  325. flag = 1;
  326. }
  327. if (flag != 1)
  328. {
  329. rlog_i("主题不匹配: %d", msgHandles[i].topic);
  330. return RyanMqttFailedError;
  331. }
  332. }
  333. RyanMqttDestorySync(client);
  334. return RyanMqttSuccessError;
  335. }
  336. static int RyanMqttPublishTest(RyanMqttQos_e qos, uint32_t count, uint32_t delayms)
  337. {
  338. RyanMqttClient_t *client;
  339. RyanMqttInitSync(&client, RyanMqttTrue);
  340. RyanMqttSubscribe(client, "testlinux/pub", qos);
  341. mqttTest[PublishedEventCount] = 0;
  342. mqttTest[dataEventCount] = 0;
  343. for (uint32_t i = 0; i < count; i++)
  344. {
  345. RyanMqttError_e result = RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), qos, RyanMqttFalse);
  346. if (RyanMqttSuccessError != result)
  347. {
  348. rlog_e("QOS发布错误 Qos: %d, result: %d", qos, result);
  349. return -1;
  350. }
  351. if (delayms)
  352. delay(delayms);
  353. }
  354. for (uint32_t i = 0; i < 60; i++)
  355. {
  356. delay(1000);
  357. uint8_t result = 0;
  358. if (RyanMqttQos0 == qos)
  359. {
  360. if (count == mqttTest[dataEventCount])
  361. result = 1;
  362. }
  363. else if (mqttTest[PublishedEventCount] == count && mqttTest[PublishedEventCount] == mqttTest[dataEventCount])
  364. result = 1;
  365. if (!result)
  366. {
  367. rlog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos, mqttTest[PublishedEventCount], mqttTest[dataEventCount]);
  368. return -1;
  369. }
  370. else
  371. {
  372. rlog_i("QOS测试成功 Qos: %d", qos);
  373. break;
  374. }
  375. }
  376. RyanMqttUnSubscribe(client, "testlinux/pub");
  377. RyanMqttDestorySync(client);
  378. return 0;
  379. }
  380. static void RyanMqttConnectDestory(uint32_t count, uint32_t delayms)
  381. {
  382. for (uint32_t i = 0; i < count; i++)
  383. {
  384. RyanMqttClient_t *client;
  385. RyanMqttInitSync(&client, i == count - 1 ? RyanMqttTrue : RyanMqttFalse);
  386. RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), RyanMqttQos0, RyanMqttFalse);
  387. if (delayms)
  388. delay(delayms);
  389. if (i == count - 1) // 最后一次同步释放
  390. {
  391. RyanMqttDestorySync(client);
  392. delay(1000);
  393. }
  394. else
  395. RyanMqttDestroy(client);
  396. }
  397. }
  398. static void RyanMqttReconnectTest(uint32_t count, uint32_t delayms)
  399. {
  400. RyanMqttClient_t *client;
  401. RyanMqttInitSync(&client, RyanMqttTrue);
  402. for (uint32_t i = 0; i < count; i++)
  403. {
  404. RyanMqttDisconnect(client, i % 2 == 0);
  405. while (RyanMqttConnectState != RyanMqttGetState(client))
  406. {
  407. delay(1);
  408. }
  409. if (delayms)
  410. delay(delayms);
  411. }
  412. RyanMqttDestorySync(client);
  413. }
  414. static RyanMqttError_e RyanMqttKeepAliveTest()
  415. {
  416. RyanMqttClient_t *client;
  417. RyanMqttError_e result = RyanMqttSuccessError;
  418. sem_t *sem = (sem_t *)malloc(sizeof(sem_t));
  419. sem_init(sem, 0, 0);
  420. RyanMqttClientConfig_t mqttConfig = {
  421. .clientId = "dfawerwdfgaeruyfku",
  422. .userName = RyanMqttUserName,
  423. .password = RyanMqttPassword,
  424. .host = RyanMqttHost,
  425. .port = RyanMqttPort,
  426. .taskName = "mqttThread",
  427. .taskPrio = 16,
  428. .taskStack = 4096,
  429. .recvBufferSize = 1024,
  430. .sendBufferSize = 1024,
  431. .recvBuffer = malloc(1024),
  432. .sendBuffer = malloc(1024),
  433. .mqttVersion = 4,
  434. .ackHandlerRepeatCountWarning = 6,
  435. .ackHandlerCountWarning = 20,
  436. .autoReconnectFlag = RyanMqttTrue,
  437. .cleanSessionFlag = RyanMqttTrue,
  438. .reconnectTimeout = 3000,
  439. .recvTimeout = 5000,
  440. .sendTimeout = 2000,
  441. .ackTimeout = 10000,
  442. .keepaliveTimeoutS = 30,
  443. .mqttEventHandle = mqttEventHandle,
  444. .userData = sem};
  445. // 初始化mqtt客户端
  446. result = RyanMqttInit(&client);
  447. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  448. // 注册需要的事件回调
  449. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  450. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  451. // 设置mqtt客户端config
  452. result = RyanMqttSetConfig(client, &mqttConfig);
  453. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  454. // 启动mqtt客户端线程
  455. result = RyanMqttStart(client);
  456. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  457. while (RyanMqttConnectState != RyanMqttGetState(client))
  458. {
  459. delay(100);
  460. }
  461. // recvTimeout = 5000,每过 5000 ms检查一次心跳周期,如果超过 3 / 4 时间就会进行心跳保活
  462. for (uint32_t i = 0; i < 90; i++)
  463. {
  464. if (RyanMqttConnectState != RyanMqttGetState(client))
  465. {
  466. rlog_e("mqtt断连了");
  467. return RyanMqttFailedError;
  468. }
  469. rlog_w("心跳倒计时: %d", platformTimerRemain(&client->keepaliveTimer));
  470. delay(1000);
  471. }
  472. RyanMqttDestorySync(client);
  473. }
  474. // !当测试程序出错时,并不会回收内存。交由父进程进行回收
  475. int main()
  476. {
  477. vallocInit();
  478. int result = 0;
  479. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_d, { goto __exit; });
  480. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_d, { goto __exit; });
  481. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_d, { goto __exit; });
  482. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_d, { goto __exit; });
  483. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_d, { goto __exit; });
  484. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_d, { goto __exit; });
  485. // 发布 & 订阅 qos 测试
  486. result = RyanMqttPublishTest(RyanMqttQos0, 1000, 0);
  487. if (result != 0)
  488. goto __exit;
  489. checkMemory;
  490. result = RyanMqttPublishTest(RyanMqttQos1, 1000, 1);
  491. if (result != 0)
  492. goto __exit;
  493. checkMemory;
  494. result = RyanMqttPublishTest(RyanMqttQos2, 1000, 1);
  495. if (result != 0)
  496. goto __exit;
  497. checkMemory;
  498. RyanMqttConnectDestory(100, 0);
  499. checkMemory;
  500. RyanMqttReconnectTest(3, 0);
  501. checkMemory;
  502. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttKeepAliveTest(), RyanMqttFailedError, rlog_d, { goto __exit; });
  503. __exit:
  504. while (1)
  505. {
  506. displayMem();
  507. delay(10 * 1000);
  508. }
  509. return 0;
  510. }