RyanMqttTest.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. #include "rtconfig.h"
  2. #define RyanMqttHost ("broker.emqx.io") // 填写你的mqtt服务器ip
  3. #define RyanMqttPort ("1883") // mqtt服务器端口
  4. #define RyanMqttUserName ("") // 为空时填写""
  5. #define RyanMqttPassword ("") // 为空时填写""
  6. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  7. #include <stdio.h>
  8. #include <stdint.h>
  9. #include <string.h>
  10. #include <stdlib.h>
  11. #include <board.h>
  12. #include <rtthread.h>
  13. #include <rtdevice.h>
  14. #include <rtdbg.h>
  15. #define rlogEnable 1 // 是否使能日志
  16. #define rlogColorEnable 1 // 是否使能日志颜色
  17. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  18. #define rlogTag "RyanMqttTest" // 日志tag
  19. #include "RyanMqttLog.h"
  20. #include "RyanMqttClient.h"
  21. #define delay(ms) rt_thread_mdelay(ms)
  22. static RyanMqttClient_t *client = NULL;
  23. static char mqttRecvBuffer[2048];
  24. static char mqttSendBuffer[2048];
  25. // 具体数值计算可以查看事件回调函数
  26. static uint32_t mqttTest[10] = {0};
  27. #define dataEventCount (0) // 接收到几次数据
  28. #define PublishedEventCount (1) // 发布成功的次数
  29. void printfArrStr(char *buf, uint32_t len, char *userData)
  30. {
  31. rt_kprintf("%s", userData);
  32. for (uint32_t i = 0; i < len; i++)
  33. rt_kprintf("%x", buf[i]);
  34. rt_kprintf("\r\n");
  35. }
  36. /**
  37. * @brief mqtt事件回调处理函数
  38. * 事件的详细定义可以查看枚举定义
  39. *
  40. * @param pclient
  41. * @param event
  42. * @param eventData
  43. */
  44. void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
  45. {
  46. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  47. switch (event)
  48. {
  49. case RyanMqttEventError:
  50. break;
  51. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  52. rlog_i("mqtt连接成功回调");
  53. break;
  54. case RyanMqttEventDisconnected:
  55. rlog_w("mqtt断开连接回调 %d", *(RyanMqttConnectAccepted *)eventData);
  56. break;
  57. case RyanMqttEventSubscribed:
  58. {
  59. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  60. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  61. break;
  62. }
  63. case RyanMqttEventSubscribedFaile:
  64. {
  65. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  66. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  67. break;
  68. }
  69. case RyanMqttEventUnSubscribed:
  70. {
  71. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  72. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  73. break;
  74. }
  75. case RyanMqttEventUnSubscribedFaile:
  76. {
  77. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  78. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  79. break;
  80. }
  81. case RyanMqttEventPublished:
  82. {
  83. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  84. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  85. mqttTest[PublishedEventCount]++;
  86. break;
  87. }
  88. case RyanMqttEventData:
  89. {
  90. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  91. rlog_i(" RyanMqtt topic recv callback! topic: %s, packetId: %d, payload len: %d",
  92. msgData->topic, msgData->packetId, msgData->payloadLen);
  93. rt_kprintf("%.*s\r\n", msgData->payloadLen, msgData->payload);
  94. mqttTest[dataEventCount]++;
  95. break;
  96. }
  97. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  98. {
  99. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  100. rlog_w("packetType: %d, packetId: %d, topic: %s, qos: %d",
  101. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  102. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  103. break;
  104. }
  105. case RyanMqttEventReconnectBefore:
  106. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  107. rlog_i("重连前事件回调");
  108. RyanMqttClientConfig_t mqttConfig = {
  109. .clientId = "RyanMqttTest", // 这里只修改了客户端名字
  110. .userName = RyanMqttUserName,
  111. .password = RyanMqttPassword,
  112. .host = RyanMqttHost,
  113. .port = RyanMqttPort,
  114. .taskName = "mqttThread",
  115. .taskPrio = 16,
  116. .taskStack = 3072,
  117. .recvBufferSize = sizeof(mqttRecvBuffer),
  118. .sendBufferSize = sizeof(mqttSendBuffer),
  119. .recvBuffer = mqttRecvBuffer,
  120. .sendBuffer = mqttSendBuffer,
  121. .recvBufferStaticFlag = RyanMqttTrue,
  122. .sendBufferStaticFlag = RyanMqttTrue,
  123. .mqttVersion = 4,
  124. .ackHandlerRepeatCountWarning = 6,
  125. .ackHandlerCountWarning = 20,
  126. .autoReconnectFlag = RyanMqttTrue,
  127. .cleanSessionFlag = 0,
  128. .reconnectTimeout = 3000,
  129. .recvTimeout = 11000,
  130. .sendTimeout = 2000,
  131. .ackTimeout = 10000,
  132. .keepaliveTimeoutS = 60,
  133. .mqttEventHandle = mqttEventHandle,
  134. .userData = NULL};
  135. RyanMqttSetConfig(client, &mqttConfig);
  136. break;
  137. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  138. {
  139. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  140. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  141. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  142. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  143. break;
  144. }
  145. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  146. {
  147. // 这里选择直接丢弃该消息
  148. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  149. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d" ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  150. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  151. break;
  152. }
  153. case RyanMqttEventAckHandlerdiscard:
  154. {
  155. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  156. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  157. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  158. break;
  159. }
  160. case RyanMqttEventDestoryBefore:
  161. rlog_i("销毁mqtt客户端前回调");
  162. break;
  163. default:
  164. break;
  165. }
  166. }
  167. int mqttConnectFun()
  168. {
  169. RyanMqttError_e result = RyanMqttSuccessError;
  170. RyanMqttClientConfig_t mqttConfig = {
  171. .clientId = "RyanMqttTessdfwrt",
  172. .userName = RyanMqttUserName,
  173. .password = RyanMqttPassword,
  174. .host = RyanMqttHost,
  175. .port = RyanMqttPort,
  176. .taskName = "mqttThread",
  177. .taskPrio = 16,
  178. .taskStack = 3072,
  179. .recvBufferSize = sizeof(mqttRecvBuffer),
  180. .sendBufferSize = sizeof(mqttSendBuffer),
  181. .recvBuffer = mqttRecvBuffer,
  182. .sendBuffer = mqttSendBuffer,
  183. .recvBufferStaticFlag = RyanMqttTrue,
  184. .sendBufferStaticFlag = RyanMqttTrue,
  185. .mqttVersion = 4,
  186. .ackHandlerRepeatCountWarning = 6,
  187. .ackHandlerCountWarning = 20,
  188. .autoReconnectFlag = RyanMqttTrue,
  189. .cleanSessionFlag = RyanMqttFalse,
  190. .reconnectTimeout = 3000,
  191. .recvTimeout = 11000,
  192. .sendTimeout = 2000,
  193. .ackTimeout = 10000,
  194. .keepaliveTimeoutS = 60,
  195. .mqttEventHandle = mqttEventHandle,
  196. .userData = NULL};
  197. // 初始化mqtt客户端
  198. result = RyanMqttInit(&client);
  199. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  200. // 注册需要的事件回调
  201. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  202. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  203. // 设置mqtt客户端config
  204. result = RyanMqttSetConfig(client, &mqttConfig);
  205. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  206. // 设置遗嘱消息
  207. result = RyanMqttSetLwt(client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  208. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  209. // 启动mqtt客户端线程
  210. result = RyanMqttStart(client);
  211. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  212. return 0;
  213. }
  214. /**
  215. * @brief mqtt msh命令
  216. *
  217. */
  218. struct RyanMqttCmdDes
  219. {
  220. const char *cmd;
  221. const char *explain;
  222. int (*fun)(int argc, char *argv[]);
  223. };
  224. static int MqttHelp(int argc, char *argv[]);
  225. /**
  226. * @brief 获取mqtt状态
  227. *
  228. * @param argc
  229. * @param argv
  230. * @return int
  231. */
  232. static int MqttState(int argc, char *argv[])
  233. {
  234. char *str = NULL;
  235. RyanMqttState_e clientState = RyanMqttGetState(client);
  236. switch (clientState)
  237. {
  238. case RyanMqttInvalidState:
  239. str = "无效状态";
  240. break;
  241. case RyanMqttInitState:
  242. str = "初始化状态";
  243. break;
  244. case RyanMqttStartState:
  245. str = "mqtt开始状态";
  246. break;
  247. case RyanMqttConnectState:
  248. str = "连接状态";
  249. break;
  250. case RyanMqttDisconnectState:
  251. str = "断开连接状态";
  252. break;
  253. case RyanMqttReconnectState:
  254. str = "重新连接状态";
  255. break;
  256. default:
  257. RyanMqttCheck(NULL, RyanMqttFailedError, rlog_d);
  258. break;
  259. }
  260. rlog_i("client state: %s", str);
  261. return 0;
  262. }
  263. /**
  264. * @brief mqtt 连接服务器
  265. *
  266. * @param argc
  267. * @param argv
  268. * @return int
  269. */
  270. static int MqttConnect(int argc, char *argv[])
  271. {
  272. if (RyanMqttConnectState == RyanMqttGetState(client))
  273. {
  274. rlog_w("mqtt客户端没有连接");
  275. return 0;
  276. }
  277. mqttConnectFun();
  278. return 0;
  279. }
  280. /**
  281. * @brief mqtt 重连函数,注意事项请看函数简介
  282. *
  283. * @param argc
  284. * @param argv
  285. * @return int
  286. */
  287. static int MqttReconnect(int argc, char *argv[])
  288. {
  289. RyanMqttReconnect(client);
  290. return 0;
  291. }
  292. /**
  293. * @brief mqtt销毁客户端
  294. *
  295. * @param argc
  296. * @param argv
  297. * @return int
  298. */
  299. static int MqttDestroy(int argc, char *argv[])
  300. {
  301. RyanMqttDestroy(client);
  302. return 0;
  303. }
  304. /**
  305. * @brief 断开mqtt客户端连接
  306. *
  307. * @param argc
  308. * @param argv
  309. * @return int
  310. */
  311. static int MqttDisconnect(int argc, char *argv[])
  312. {
  313. if (RyanMqttConnectState != RyanMqttGetState(client))
  314. {
  315. rlog_w("mqtt客户端没有连接");
  316. return 0;
  317. }
  318. RyanMqttDisconnect(client, RyanMqttTrue);
  319. return 0;
  320. }
  321. /**
  322. * @brief mqtt发布消息
  323. *
  324. * @param argc
  325. * @param argv
  326. * @return int
  327. */
  328. static int Mqttpublish(int argc, char *argv[])
  329. {
  330. if (argc < 7)
  331. {
  332. rlog_i("请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  333. return 0;
  334. }
  335. if (RyanMqttConnectState != RyanMqttGetState(client))
  336. {
  337. rlog_w("mqtt客户端没有连接");
  338. return 0;
  339. }
  340. char *topic = argv[2];
  341. RyanMqttQos_e qos = atoi(argv[3]);
  342. char *payload = argv[4];
  343. uint16_t count = atoi(argv[5]);
  344. uint16_t delayTime = atoi(argv[6]);
  345. uint16_t pubCount = 0;
  346. rlog_i("qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  347. for (uint16_t i = 0; i < count; i++)
  348. {
  349. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, strlen(payload), qos, 0))
  350. pubCount++;
  351. delay(delayTime);
  352. }
  353. delay(3000);
  354. LOG_E("pubCount: %d", pubCount);
  355. return 0;
  356. }
  357. /**
  358. * @brief mqtt订阅主题,支持通配符
  359. *
  360. * @param argc
  361. * @param argv
  362. * @return int
  363. */
  364. static int Mqttsubscribe(int argc, char *argv[])
  365. {
  366. if (argc < 4)
  367. {
  368. rlog_i("请输入 topic、 qos ");
  369. return 0;
  370. }
  371. if (RyanMqttConnectState != RyanMqttGetState(client))
  372. {
  373. rlog_w("mqtt客户端没有连接");
  374. return 0;
  375. }
  376. RyanMqttSubscribe(client, argv[2], atoi(argv[3]));
  377. return 0;
  378. }
  379. /**
  380. * @brief mqtt取消订阅主题
  381. *
  382. * @param argc
  383. * @param argv
  384. * @return int
  385. */
  386. static int MqttUnSubscribe(int argc, char *argv[])
  387. {
  388. if (argc < 3)
  389. {
  390. rlog_i("请输入 取消订阅主题");
  391. return 0;
  392. }
  393. if (RyanMqttConnectState != RyanMqttGetState(client))
  394. {
  395. rlog_w("mqtt客户端没有连接");
  396. return 0;
  397. }
  398. RyanMqttUnSubscribe(client, argv[2]);
  399. return 0;
  400. }
  401. /**
  402. * @brief mqtt获取已订阅主题
  403. *
  404. * @param argc
  405. * @param argv
  406. * @return int
  407. */
  408. static int MqttListSubscribe(int argc, char *argv[])
  409. {
  410. if (RyanMqttConnectState != RyanMqttGetState(client))
  411. {
  412. rlog_w("mqtt客户端没有连接");
  413. return 0;
  414. }
  415. RyanMqttMsgHandler_t msgHandles[10] = {0};
  416. int32_t subscribeNum = 0;
  417. int32_t result = RyanMqttSuccessError;
  418. result = RyanMqttGetSubscribe(client, msgHandles, sizeof(msgHandles) / sizeof(msgHandles[0]), &subscribeNum);
  419. if (result == RyanMqttNoRescourceError)
  420. rlog_w("订阅主题数超过10个,已截断");
  421. rlog_i("mqtt客户端已订阅的主题数: %d", subscribeNum);
  422. for (int32_t i = 0; i < subscribeNum; i++)
  423. rlog_i("订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  424. return 0;
  425. }
  426. /**
  427. * @brief 打印ack链表
  428. *
  429. * @param argc
  430. * @param argv
  431. * @return int
  432. */
  433. static int MqttListAck(int argc, char *argv[])
  434. {
  435. RyanList_t *curr = NULL,
  436. *next = NULL;
  437. RyanMqttAckHandler_t *ackHandler = NULL;
  438. if (RyanListIsEmpty(&client->ackHandlerList))
  439. {
  440. rlog_i("ack链表为空");
  441. return 0;
  442. }
  443. // 遍历链表
  444. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  445. {
  446. // 获取此节点的结构体
  447. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  448. // 发送qos1 / qos2消息服务器ack响应超时。需要重新发送它们。
  449. rlog_w(" type: %d, packetId is %d ", ackHandler->packetType, ackHandler->packetId);
  450. if (NULL != ackHandler->msgHandler)
  451. rlog_w("topic: %s, qos: %d", ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  452. }
  453. return 0;
  454. }
  455. /**
  456. * @brief 打印msg链表
  457. *
  458. * @param argc
  459. * @param argv
  460. * @return int
  461. */
  462. static int MqttListMsg(int argc, char *argv[])
  463. {
  464. RyanList_t *curr = NULL,
  465. *next = NULL;
  466. RyanMqttMsgHandler_t *msgHandler = NULL;
  467. if (RyanListIsEmpty(&client->msgHandlerList))
  468. {
  469. rlog_i("msg链表为空");
  470. return 0;
  471. }
  472. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  473. {
  474. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  475. rlog_w("topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  476. }
  477. return 0;
  478. }
  479. /**
  480. * @brief 打印接收到的数据计数
  481. *
  482. * @param argc
  483. * @param argv
  484. * @return int
  485. */
  486. static int Mqttdata(int argc, char *argv[])
  487. {
  488. // mqtt data
  489. if (argc == 3)
  490. {
  491. uint32_t num = atoi(argv[2]);
  492. if (num < sizeof(mqttTest) / sizeof(mqttTest[0]) - 1)
  493. mqttTest[num] = 0;
  494. else
  495. LOG_E("数组越界");
  496. }
  497. rlog_i("dataEventCount: %d, publishCount:%u",
  498. mqttTest[dataEventCount], mqttTest[PublishedEventCount]);
  499. return 0;
  500. }
  501. static const struct RyanMqttCmdDes cmdTab[] =
  502. {
  503. {"help", "打印帮助信息", MqttHelp},
  504. {"state", "打印mqtt客户端状态", MqttState},
  505. {"connect", "mqtt客户端连接服务器", MqttConnect},
  506. {"disc", "mqtt客户端断开连接", MqttDisconnect},
  507. {"reconnect", "mqtt断开连接时,重新连接mqtt服务器", MqttReconnect},
  508. {"destory", "mqtt销毁客户端", MqttDestroy},
  509. {"pub", "mqtt发布消息", Mqttpublish},
  510. {"sub", "mqtt订阅主题", Mqttsubscribe},
  511. {"unsub", "mqtt取消订阅主题", MqttUnSubscribe},
  512. {"listsub", "mqtt获取已订阅主题", MqttListSubscribe},
  513. {"listack", "打印ack链表", MqttListAck},
  514. {"listmsg", "打印msg链表", MqttListMsg},
  515. {"data", "打印测试信息,用户自定义的", Mqttdata},
  516. };
  517. static int MqttHelp(int argc, char *argv[])
  518. {
  519. for (uint8_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  520. rt_kprintf("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  521. return 0;
  522. }
  523. static int RyanMqttMsh(int argc, char *argv[])
  524. {
  525. int32_t i = 0,
  526. result = 0;
  527. const struct RyanMqttCmdDes *runCmd = NULL;
  528. if (argc == 1)
  529. {
  530. MqttHelp(argc, argv);
  531. return 0;
  532. }
  533. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  534. {
  535. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  536. {
  537. runCmd = &cmdTab[i];
  538. break;
  539. }
  540. }
  541. if (runCmd == NULL)
  542. {
  543. MqttHelp(argc, argv);
  544. return 0;
  545. }
  546. if (runCmd->fun != NULL)
  547. result = runCmd->fun(argc, argv);
  548. return result;
  549. }
  550. #if defined(RT_USING_MSH)
  551. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt command);
  552. #endif
  553. #endif