RyanMqttTestLinux.c 20 KB


  1. #define RyanMqttClientId ("RyanMqttTest1") // 填写mqtt客户端id,要求唯一
  2. #define RyanMqttHost ("39.164.131.143") // 填写你的mqtt服务器ip
  3. #define RyanMqttPort ("1883") // mqtt服务器端口
  4. #define RyanMqttUserName ("test") // 为空时填写""
  5. #define RyanMqttPassword ("test") // 为空时填写""
  6. #include <stdio.h>
  7. #include <stdint.h>
  8. #include <string.h>
  9. #include <stdlib.h>
  10. #include <unistd.h>
  11. #include <semaphore.h>
  12. #define rlogEnable 1 // 是否使能日志
  13. #define rlogColorEnable 1 // 是否使能日志颜色
  14. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  15. #define rlogTag "RyanMqttTest" // 日志tag
  16. #include "RyanMqttLog.h"
  17. #include "RyanMqttClient.h"
  18. #define delay(ms) usleep(ms * 1000)
  19. #define checkMemory \
  20. do \
  21. { \
  22. int area = 0, use = 0; \
  23. v_mcheck(&area, &use); \
  24. if (area != 0 || use != 0) \
  25. { \
  26. rlog_e("内存泄漏"); \
  27. while (1) \
  28. { \
  29. int area = 0, use = 0; \
  30. v_mcheck(&area, &use); \
  31. rlog_w("|||----------->>> area = %d, size = %d", area, use); \
  32. delay(3000); \
  33. } \
  34. } \
  35. } while (0)
  36. static uint32_t mqttTest[10] = {0};
  37. #define dataEventCount (0) // 接收到数据次数统计
  38. #define PublishedEventCount (1) // qos1和qos2发布成功的次数统计
  39. static void printfArrStr(char *buf, uint32_t len, char *userData)
  40. {
  41. rlog_raw("%s", userData);
  42. for (uint32_t i = 0; i < len; i++)
  43. rlog_raw("%x", buf[i]);
  44. rlog_raw("\r\n");
  45. }
  46. /**
  47. * @brief mqtt事件回调处理函数
  48. * 事件的详细定义可以查看枚举定义
  49. *
  50. * @param pclient
  51. * @param event
  52. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  53. */
  54. static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
  55. {
  56. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  57. switch (event)
  58. {
  59. case RyanMqttEventError:
  60. break;
  61. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  62. rlog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  63. break;
  64. case RyanMqttEventDisconnected:
  65. rlog_w("mqtt断开连接回调 %d", *(int32_t *)eventData);
  66. break;
  67. case RyanMqttEventSubscribed:
  68. {
  69. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  70. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  71. break;
  72. }
  73. case RyanMqttEventSubscribedFaile:
  74. {
  75. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  76. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  77. break;
  78. }
  79. case RyanMqttEventUnSubscribed:
  80. {
  81. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  82. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  83. break;
  84. }
  85. case RyanMqttEventUnSubscribedFaile:
  86. {
  87. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  88. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  89. break;
  90. }
  91. case RyanMqttEventPublished:
  92. {
  93. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  94. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  95. mqttTest[PublishedEventCount]++;
  96. break;
  97. }
  98. case RyanMqttEventData:
  99. {
  100. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  101. rlog_i("接收到mqtt消息事件回调 topic: %s, packetId: %d, payload len: %d",
  102. msgData->topic, msgData->packetId, msgData->payloadLen);
  103. rlog_i("%.*s", msgData->payloadLen, msgData->payload);
  104. mqttTest[dataEventCount]++;
  105. break;
  106. }
  107. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  108. {
  109. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  110. rlog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  111. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  112. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  113. break;
  114. }
  115. case RyanMqttEventReconnectBefore:
  116. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  117. rlog_i("重连前事件回调");
  118. break;
  119. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  120. {
  121. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  122. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  123. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  124. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  125. break;
  126. }
  127. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  128. {
  129. // 这里选择直接丢弃该消息
  130. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  131. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  132. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  133. break;
  134. }
  135. case RyanMqttEventAckHandlerdiscard:
  136. {
  137. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  138. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  139. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  140. break;
  141. }
  142. case RyanMqttEventDestoryBefore:
  143. rlog_i("销毁mqtt客户端前回调");
  144. free(client->config.sendBuffer);
  145. free(client->config.recvBuffer);
  146. if (client->config.userData)
  147. sem_post((sem_t *)client->config.userData);
  148. break;
  149. default:
  150. break;
  151. }
  152. }
  153. static void RyanMqttInitSync(RyanMqttClient_t **client, RyanMqttBool_e syncFlag)
  154. {
  155. char aaa[64];
  156. // 手动避免count的资源竞争了
  157. static uint32_t count = 0;
  158. snprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
  159. count++;
  160. sem_t *sem = NULL;
  161. if (syncFlag == RyanMqttTrue)
  162. {
  163. sem = (sem_t *)malloc(sizeof(sem_t));
  164. sem_init(sem, 0, 0);
  165. }
  166. RyanMqttError_e result = RyanMqttSuccessError;
  167. RyanMqttClientConfig_t mqttConfig = {
  168. .clientId = aaa,
  169. .userName = RyanMqttUserName,
  170. .password = RyanMqttPassword,
  171. .host = RyanMqttHost,
  172. .port = RyanMqttPort,
  173. .taskName = "mqttThread",
  174. .taskPrio = 16,
  175. .taskStack = 4096,
  176. .recvBufferSize = 1024,
  177. .sendBufferSize = 1024,
  178. .recvBuffer = malloc(1024),
  179. .sendBuffer = malloc(1024),
  180. .mqttVersion = 4,
  181. .ackHandlerRepeatCountWarning = 6,
  182. .ackHandlerCountWarning = 20,
  183. .autoReconnectFlag = RyanMqttTrue,
  184. .cleanSessionFlag = RyanMqttTrue,
  185. .reconnectTimeout = 3000,
  186. .recvTimeout = 5000,
  187. .sendTimeout = 2000,
  188. .ackTimeout = 10000,
  189. .keepaliveTimeoutS = 120,
  190. .mqttEventHandle = mqttEventHandle,
  191. .userData = sem};
  192. // 初始化mqtt客户端
  193. result = RyanMqttInit(client);
  194. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  195. // 注册需要的事件回调
  196. result = RyanMqttRegisterEventId(*client, RyanMqttEventAnyId);
  197. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  198. // 设置mqtt客户端config
  199. result = RyanMqttSetConfig(*client, &mqttConfig);
  200. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  201. // 设置遗嘱消息
  202. result = RyanMqttSetLwt(*client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  203. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  204. // 启动mqtt客户端线程
  205. result = RyanMqttStart(*client);
  206. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  207. while (RyanMqttConnectState != RyanMqttGetState(*client))
  208. {
  209. delay(100);
  210. }
  211. }
  212. static void RyanMqttDestorySync(RyanMqttClient_t *client)
  213. {
  214. sem_t *sem = (sem_t *)client->config.userData;
  215. // 启动mqtt客户端线程
  216. RyanMqttDestroy(client);
  217. sem_wait(sem);
  218. sem_destroy(sem);
  219. free(sem);
  220. delay(3);
  221. }
  222. static RyanMqttError_e RyanMqttSubscribeTest(RyanMqttQos_e qos)
  223. {
  224. #define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
  225. RyanMqttClient_t *client;
  226. RyanMqttInitSync(&client, RyanMqttTrue);
  227. const char *subscribeArr[] = {
  228. "testlinux/pub",
  229. "testlinux/pub2",
  230. "testlinux/pub3",
  231. "testlinux/pub4",
  232. "testlinux/pub5",
  233. };
  234. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  235. RyanMqttSubscribe(client, subscribeArr[i], qos);
  236. RyanMqttMsgHandler_t msgHandles[10] = {0};
  237. int32_t subscribeNum = 0;
  238. int32_t result = RyanMqttSuccessError;
  239. for (int32_t i = 0; i < 600; i++)
  240. {
  241. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  242. if (result == RyanMqttNoRescourceError)
  243. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  244. if (subscribeNum == getArraySize(subscribeArr))
  245. break;
  246. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  247. for (int32_t i = 0; i < subscribeNum; i++)
  248. rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  249. if (i > 500)
  250. return RyanMqttFailedError;
  251. delay(100);
  252. }
  253. for (int32_t i = 0; i < subscribeNum; i++)
  254. {
  255. uint8_t flag = 0;
  256. for (uint8_t j = 0; j < getArraySize(subscribeArr); j++)
  257. {
  258. if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
  259. flag = 1;
  260. }
  261. if (flag != 1)
  262. {
  263. rlog_i("主题不匹配: %d", msgHandles[i].topic);
  264. return RyanMqttFailedError;
  265. }
  266. }
  267. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  268. RyanMqttUnSubscribe(client, subscribeArr[i]);
  269. RyanMqttDestorySync(client);
  270. return RyanMqttSuccessError;
  271. }
  272. static RyanMqttError_e RyanMqttUnSubscribeTest(RyanMqttQos_e qos)
  273. {
  274. int count = 2;
  275. #define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
  276. RyanMqttClient_t *client;
  277. RyanMqttInitSync(&client, RyanMqttTrue);
  278. const char *subscribeArr[] = {
  279. "testlinux/pub",
  280. "testlinux/pub2",
  281. "testlinux/pub3",
  282. "testlinux/pub4",
  283. "testlinux/pub5",
  284. };
  285. for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
  286. RyanMqttSubscribe(client, subscribeArr[i], qos);
  287. RyanMqttMsgHandler_t msgHandles[10] = {0};
  288. int32_t subscribeNum = 0;
  289. int32_t result = RyanMqttSuccessError;
  290. for (int32_t i = 0; i < 600; i++)
  291. {
  292. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  293. if (result == RyanMqttNoRescourceError)
  294. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  295. if (subscribeNum == getArraySize(subscribeArr))
  296. break;
  297. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  298. if (i > 500)
  299. return RyanMqttFailedError;
  300. delay(100);
  301. }
  302. for (uint8_t i = 0; i < getArraySize(subscribeArr) - count - 1; i++)
  303. RyanMqttUnSubscribe(client, subscribeArr[i]);
  304. for (int32_t i = 0; i < 600; i++)
  305. {
  306. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  307. if (result == RyanMqttNoRescourceError)
  308. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  309. if (subscribeNum == getArraySize(subscribeArr) - count)
  310. break;
  311. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr) - count);
  312. if (i > 500)
  313. return RyanMqttFailedError;
  314. delay(100);
  315. }
  316. for (int32_t i = 0; i < subscribeNum; i++)
  317. {
  318. uint8_t flag = 0;
  319. for (uint8_t j = count - 1; j < getArraySize(subscribeArr); j++)
  320. {
  321. if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
  322. flag = 1;
  323. }
  324. if (flag != 1)
  325. {
  326. rlog_i("主题不匹配: %d", msgHandles[i].topic);
  327. return RyanMqttFailedError;
  328. }
  329. }
  330. RyanMqttDestorySync(client);
  331. return RyanMqttSuccessError;
  332. }
  333. static void RyanMqttPublishTest(RyanMqttQos_e qos, uint32_t count, uint32_t delayms)
  334. {
  335. RyanMqttClient_t *client;
  336. RyanMqttInitSync(&client, RyanMqttTrue);
  337. RyanMqttSubscribe(client, "testlinux/pub", qos);
  338. mqttTest[PublishedEventCount] = 0;
  339. mqttTest[dataEventCount] = 0;
  340. for (uint32_t i = 0; i < count; i++)
  341. {
  342. RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), qos, RyanMqttFalse);
  343. if (delayms)
  344. delay(delayms);
  345. }
  346. for (uint32_t i = 0; i < 60; i++)
  347. {
  348. delay(1000);
  349. uint8_t result = 0;
  350. if (RyanMqttQos0 == qos)
  351. {
  352. if (count == mqttTest[dataEventCount])
  353. result = 1;
  354. }
  355. else if (mqttTest[PublishedEventCount] == count && mqttTest[PublishedEventCount] == mqttTest[dataEventCount])
  356. result = 1;
  357. if (!result)
  358. {
  359. rlog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos, mqttTest[PublishedEventCount], mqttTest[dataEventCount]);
  360. }
  361. else
  362. {
  363. rlog_i("QOS测试成功 Qos: %d", qos);
  364. break;
  365. }
  366. }
  367. RyanMqttUnSubscribe(client, "testlinux/pub");
  368. RyanMqttDestorySync(client);
  369. }
  370. static void RyanMqttConnectDestory(uint32_t count, uint32_t delayms)
  371. {
  372. for (uint32_t i = 0; i < count; i++)
  373. {
  374. RyanMqttClient_t *client;
  375. RyanMqttInitSync(&client, i == count - 1 ? RyanMqttTrue : RyanMqttFalse);
  376. RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), RyanMqttQos0, RyanMqttFalse);
  377. if (delayms)
  378. delay(delayms);
  379. if (i == count - 1) // 最后一次同步释放
  380. {
  381. RyanMqttDestorySync(client);
  382. delay(1000);
  383. }
  384. else
  385. RyanMqttDestroy(client);
  386. }
  387. }
  388. static void RyanMqttReconnectTest(uint32_t count, uint32_t delayms)
  389. {
  390. RyanMqttClient_t *client;
  391. RyanMqttInitSync(&client, RyanMqttTrue);
  392. for (uint32_t i = 0; i < count; i++)
  393. {
  394. RyanMqttDisconnect(client, i % 2 == 0);
  395. while (RyanMqttConnectState != RyanMqttGetState(client))
  396. {
  397. delay(1);
  398. }
  399. if (delayms)
  400. delay(delayms);
  401. }
  402. RyanMqttDestorySync(client);
  403. }
  404. static RyanMqttError_e RyanMqttKeepAliveTest()
  405. {
  406. RyanMqttClient_t *client;
  407. RyanMqttError_e result = RyanMqttSuccessError;
  408. sem_t *sem = (sem_t *)malloc(sizeof(sem_t));
  409. sem_init(sem, 0, 0);
  410. RyanMqttClientConfig_t mqttConfig = {
  411. .clientId = "dfawerwdfgaeruyfku",
  412. .userName = RyanMqttUserName,
  413. .password = RyanMqttPassword,
  414. .host = RyanMqttHost,
  415. .port = RyanMqttPort,
  416. .taskName = "mqttThread",
  417. .taskPrio = 16,
  418. .taskStack = 4096,
  419. .recvBufferSize = 1024,
  420. .sendBufferSize = 1024,
  421. .recvBuffer = malloc(1024),
  422. .sendBuffer = malloc(1024),
  423. .mqttVersion = 4,
  424. .ackHandlerRepeatCountWarning = 6,
  425. .ackHandlerCountWarning = 20,
  426. .autoReconnectFlag = RyanMqttTrue,
  427. .cleanSessionFlag = RyanMqttTrue,
  428. .reconnectTimeout = 3000,
  429. .recvTimeout = 5000,
  430. .sendTimeout = 2000,
  431. .ackTimeout = 10000,
  432. .keepaliveTimeoutS = 30,
  433. .mqttEventHandle = mqttEventHandle,
  434. .userData = sem};
  435. // 初始化mqtt客户端
  436. result = RyanMqttInit(&client);
  437. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  438. // 注册需要的事件回调
  439. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  440. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  441. // 设置mqtt客户端config
  442. result = RyanMqttSetConfig(client, &mqttConfig);
  443. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  444. // 启动mqtt客户端线程
  445. result = RyanMqttStart(client);
  446. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  447. while (RyanMqttConnectState != RyanMqttGetState(client))
  448. {
  449. delay(100);
  450. }
  451. for (uint32_t i = 0; i < 90; i++)
  452. {
  453. if (RyanMqttConnectState != RyanMqttGetState(client))
  454. {
  455. rlog_e("mqtt断连了");
  456. return RyanMqttFailedError;
  457. }
  458. rlog_w("心跳倒计时: %d", platformTimerRemain(&client->keepaliveTimer));
  459. delay(1000);
  460. }
  461. RyanMqttDestorySync(client);
  462. }
  463. // !当测试程序出错时,并不会回收内存。交由父进程进行回收
  464. int main()
  465. {
  466. vallocInit();
  467. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_d, { goto __exit; });
  468. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_d, { goto __exit; });
  469. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_d, { goto __exit; });
  470. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_d, { goto __exit; });
  471. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_d, { goto __exit; });
  472. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_d, { goto __exit; });
  473. // 发布 & 订阅 qos 测试
  474. RyanMqttPublishTest(RyanMqttQos0, 100, 0);
  475. checkMemory;
  476. RyanMqttPublishTest(RyanMqttQos1, 100, 1);
  477. checkMemory;
  478. RyanMqttPublishTest(RyanMqttQos2, 100, 1);
  479. checkMemory;
  480. RyanMqttConnectDestory(100, 0);
  481. checkMemory;
  482. RyanMqttReconnectTest(3, 0);
  483. checkMemory;
  484. RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttKeepAliveTest(), RyanMqttFailedError, rlog_d, { goto __exit; });
  485. __exit:
  486. while (1)
  487. {
  488. displayMem();
  489. delay(10 * 1000);
  490. }
  491. return 0;
  492. }