RyanMqtt.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. #include "rtconfig.h"
  2. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  3. #include <stdio.h>
  4. #include <stdint.h>
  5. #include <string.h>
  6. #include <board.h>
  7. #include <rtthread.h>
  8. #include <rtdevice.h>
  9. #include <rtdbg.h>
  10. #include "ulog.h"
  11. #include "RyanMqttClient.h"
  12. // ccm内存
  13. // 存放未初始化
  14. #define ccmBss __attribute__((section(".ccmbss")))
  15. #define delay(ms) rt_thread_mdelay(ms)
  16. static const char *TAG = "mqtt";
  17. RyanMqttClient_t *client = NULL;
  18. ccmBss static char mqttRecvBuffer[2048];
  19. ccmBss static char mqttSendBuffer[2048];
  20. // 具体数值计算可以查看事件回调函数
  21. static uint32_t mqttTest[10] = {0};
  22. #define dataEventCount (0) // 接收到几次数据
  23. #define PublishedEventCount (1) // 发布成功的次数
  24. /**
  25. * @brief mqtt事件回调处理函数
  26. * 事件的详细定义可以查看枚举定义
  27. *
  28. * @param pclient
  29. * @param event
  30. * @param eventData
  31. */
  32. void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
  33. {
  34. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  35. switch (event)
  36. {
  37. case RyanMqttEventError:
  38. break;
  39. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  40. ulog_i(TAG, "mqtt连接成功回调");
  41. RyanMqttSubscribe(client, "sub/#", QOS0);
  42. break;
  43. case RyanMqttEventDisconnected:
  44. ulog_w(TAG, "mqtt断开连接回调 %d", *(int32_t *)eventData);
  45. break;
  46. case RyanMqttEventSubscribed:
  47. {
  48. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  49. ulog_w(TAG, "mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  50. break;
  51. }
  52. case RyanMqttEventSubscribedFaile:
  53. {
  54. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  55. ulog_w(TAG, "mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  56. break;
  57. }
  58. case RyanMqttEventUnSubscribed:
  59. {
  60. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  61. ulog_w(TAG, "mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  62. break;
  63. }
  64. case RyanMqttEventUnSubscribedFaile:
  65. {
  66. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  67. ulog_w(TAG, "mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  68. break;
  69. }
  70. case RyanMqttEventPublished:
  71. {
  72. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  73. ulog_w(TAG, "qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  74. mqttTest[PublishedEventCount]++;
  75. break;
  76. }
  77. case RyanMqttEventData:
  78. {
  79. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  80. ulog_i(TAG, " umqtt topic recv callback! topic: %s, packetId: %d, payload len: %d \r\n \\
  81. data: %.*s",
  82. msgData->topic, msgData->packetId, msgData->payloadLen,
  83. msgData->payloadLen, msgData->payload);
  84. mqttTest[dataEventCount]++;
  85. break;
  86. }
  87. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  88. {
  89. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  90. ulog_w(TAG, "%s:%d %s()... packetType: %d, packetId: %d, topic: %s, qos: %d",
  91. __FILE__, __LINE__, __FUNCTION__,
  92. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  93. printfArrStr(ackHandler->packet, ackHandler->packetLen, "data: ");
  94. break;
  95. }
  96. case RyanMqttEventReconnectBefore:
  97. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  98. ulog_i(TAG, "重连前事件回调");
  99. RyanMqttClientConfig_t mqttConfig = {
  100. .clientId = "RyanMqttTest", // 这里只修改了客户端名字
  101. .userName = "test",
  102. .password = "test",
  103. .host = "39.164.131.143",
  104. .port = "1883",
  105. .taskName = "mqttThread",
  106. .taskPrio = 16,
  107. .taskStack = 4096,
  108. .recvBufferSize = sizeof(mqttRecvBuffer),
  109. .sendBufferSize = sizeof(mqttSendBuffer),
  110. .recvBuffer = mqttRecvBuffer,
  111. .sendBuffer = mqttSendBuffer,
  112. .recvBufferStaticFlag = RyanTrue,
  113. .sendBufferStaticFlag = RyanTrue,
  114. .mqttVersion = 4,
  115. .ackHandlerRepeatCountWarning = 6,
  116. .ackHandlerCountWarning = 20,
  117. .autoReconnectFlag = RyanTrue,
  118. .cleanSessionFlag = 0,
  119. .reconnectTimeout = 3000,
  120. .recvTimeout = 11000,
  121. .sendTimeout = 2000,
  122. .ackTimeout = 10000,
  123. .keepaliveTimeoutS = 60,
  124. .mqttEventHandle = mqttEventHandle,
  125. .userData = NULL};
  126. RyanMqttSetConfig(client, &mqttConfig);
  127. break;
  128. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  129. {
  130. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  131. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  132. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  133. ulog_i(TAG, "ack记数值超过警戒值回调: %d", ackHandlerCount);
  134. break;
  135. }
  136. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  137. {
  138. // 这里选择直接丢弃该消息
  139. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  140. ulog_i(TAG, "ack重发次数超过警戒值回调");
  141. ulog_w(TAG, "%s:%d %s()... packetType: %d, packetId: %d, topic: %s, qos: %d",
  142. __FILE__, __LINE__, __FUNCTION__,
  143. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  144. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  145. break;
  146. }
  147. case RyanMqttEventAckHandlerdiscard:
  148. {
  149. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  150. ulog_i(TAG, "ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  151. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  152. break;
  153. }
  154. case RyanMqttEventDestoryBefore:
  155. ulog_i(TAG, "销毁mqtt客户端前回调");
  156. break;
  157. default:
  158. break;
  159. }
  160. }
  161. void mqttConnectFun()
  162. {
  163. RyanMqttError_e result = RyanMqttSuccessError;
  164. RyanMqttClientConfig_t mqttConfig = {
  165. .clientId = "RyanMqttTessdfwrt",
  166. .userName = "test",
  167. .password = "test",
  168. .host = "39.164.131.143",
  169. .port = "1883",
  170. .taskName = "mqttThread",
  171. .taskPrio = 16,
  172. .taskStack = 4096,
  173. .recvBufferSize = sizeof(mqttRecvBuffer),
  174. .sendBufferSize = sizeof(mqttSendBuffer),
  175. .recvBuffer = mqttRecvBuffer,
  176. .sendBuffer = mqttSendBuffer,
  177. .recvBufferStaticFlag = RyanTrue,
  178. .sendBufferStaticFlag = RyanTrue,
  179. .mqttVersion = 4,
  180. .ackHandlerRepeatCountWarning = 6,
  181. .ackHandlerCountWarning = 20,
  182. .autoReconnectFlag = RyanTrue,
  183. .cleanSessionFlag = 0,
  184. .reconnectTimeout = 3000,
  185. .recvTimeout = 11000,
  186. .sendTimeout = 2000,
  187. .ackTimeout = 10000,
  188. .keepaliveTimeoutS = 60,
  189. .mqttEventHandle = mqttEventHandle,
  190. .userData = NULL};
  191. // 初始化mqtt客户端
  192. result = RyanMqttInit(&client);
  193. RyanMqttCheck(RyanMqttSuccessError == result, result);
  194. // 注册需要的事件回调
  195. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  196. RyanMqttCheck(RyanMqttSuccessError == result, result);
  197. // 设置mqtt客户端config
  198. result = RyanMqttSetConfig(client, &mqttConfig);
  199. RyanMqttCheck(RyanMqttSuccessError == result, result);
  200. // 设置遗嘱消息
  201. result = RyanMqttSetLwt(client, "pub/test", "this is will", strlen("this is will"), QOS0, 0);
  202. RyanMqttCheck(RyanMqttSuccessError == result, result);
  203. // 启动mqtt客户端线程
  204. result = RyanMqttStart(client);
  205. RyanMqttCheck(RyanMqttSuccessError == result, result);
  206. }
  207. /**
  208. * @brief mqtt msh命令
  209. *
  210. */
  211. struct RyanMqttCmdDes
  212. {
  213. const char *cmd;
  214. const char *explain;
  215. int (*fun)(int argc, char *argv[]);
  216. };
  217. static int MqttHelp(int argc, char *argv[]);
  218. /**
  219. * @brief 获取mqtt状态
  220. *
  221. * @param argc
  222. * @param argv
  223. * @return int
  224. */
  225. static int MqttState(int argc, char *argv[])
  226. {
  227. char *str = NULL;
  228. RyanMqttState_e clientState = RyanMqttGetState(client);
  229. switch (clientState)
  230. {
  231. case mqttInvalidState:
  232. str = "无效状态";
  233. break;
  234. case mqttInitState:
  235. str = "初始化状态";
  236. break;
  237. case mqttStartState:
  238. str = "mqtt开始状态";
  239. break;
  240. case mqttConnectState:
  241. str = "连接状态";
  242. break;
  243. case mqttDisconnectState:
  244. str = "断开连接状态";
  245. break;
  246. case mqttReconnectState:
  247. str = "重新连接状态";
  248. break;
  249. default:
  250. RyanMqttCheck(NULL, RyanMqttFailedError);
  251. break;
  252. }
  253. ulog_i(TAG, "client state: %s", clientState);
  254. return 0;
  255. }
  256. /**
  257. * @brief mqtt 连接服务器
  258. *
  259. * @param argc
  260. * @param argv
  261. * @return int
  262. */
  263. static int MqttConnect(int argc, char *argv[])
  264. {
  265. if (mqttConnectState == RyanMqttGetState(client))
  266. {
  267. ulog_w(TAG, "mqtt客户端没有连接");
  268. return 0;
  269. }
  270. mqttConnectFun();
  271. return 0;
  272. }
  273. /**
  274. * @brief mqtt 重连函数,注意事项请看函数简介
  275. *
  276. * @param argc
  277. * @param argv
  278. * @return int
  279. */
  280. static int MqttReconnect(int argc, char *argv[])
  281. {
  282. RyanMqttReconnect(client);
  283. return 0;
  284. }
  285. /**
  286. * @brief mqtt销毁客户端
  287. *
  288. * @param argc
  289. * @param argv
  290. * @return int
  291. */
  292. static int MqttDestroy(int argc, char *argv[])
  293. {
  294. RyanMqttDestroy(client);
  295. return 0;
  296. }
  297. /**
  298. * @brief 断开mqtt客户端连接
  299. *
  300. * @param argc
  301. * @param argv
  302. * @return int
  303. */
  304. static int MqttDisconnect(int argc, char *argv[])
  305. {
  306. if (mqttConnectState != RyanMqttGetState(client))
  307. {
  308. ulog_w(TAG, "mqtt客户端没有连接");
  309. return 0;
  310. }
  311. RyanMqttDisconnect(client, RyanTrue);
  312. return 0;
  313. }
  314. /**
  315. * @brief mqtt发布消息
  316. *
  317. * @param argc
  318. * @param argv
  319. * @return int
  320. */
  321. static int Mqttpublish(int argc, char *argv[])
  322. {
  323. if (argc < 7)
  324. {
  325. ulog_i(TAG, "请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  326. return 0;
  327. }
  328. if (mqttConnectState != RyanMqttGetState(client))
  329. {
  330. ulog_w(TAG, "mqtt客户端没有连接");
  331. return 0;
  332. }
  333. char *topic = argv[2];
  334. RyanMqttQos_e qos = atoi(argv[3]);
  335. char *payload = argv[4];
  336. uint16_t count = atoi(argv[5]);
  337. uint16_t delayTime = atoi(argv[6]);
  338. uint16_t pubCount = 0;
  339. ulog_i(TAG, "qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  340. for (uint16_t i = 0; i < count; i++)
  341. {
  342. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, strlen(payload), qos, 0))
  343. pubCount++;
  344. delay(delayTime);
  345. }
  346. delay(3000);
  347. ulog_e(TAG, "pubCount: %d", pubCount);
  348. return 0;
  349. }
  350. /**
  351. * @brief mqtt订阅主题,支持通配符
  352. *
  353. * @param argc
  354. * @param argv
  355. * @return int
  356. */
  357. static int Mqttsubscribe(int argc, char *argv[])
  358. {
  359. if (argc < 4)
  360. {
  361. ulog_i(TAG, "请输入 topic、 qos ");
  362. return 0;
  363. }
  364. if (mqttConnectState != RyanMqttGetState(client))
  365. {
  366. ulog_w(TAG, "mqtt客户端没有连接");
  367. return 0;
  368. }
  369. RyanMqttSubscribe(client, argv[2], atoi(argv[3]));
  370. return 0;
  371. }
  372. /**
  373. * @brief mqtt取消订阅主题
  374. *
  375. * @param argc
  376. * @param argv
  377. * @return int
  378. */
  379. static int MqttUnSubscribe(int argc, char *argv[])
  380. {
  381. if (argc < 3)
  382. {
  383. ulog_i(TAG, "请输入 取消订阅主题");
  384. return 0;
  385. }
  386. if (mqttConnectState != RyanMqttGetState(client))
  387. {
  388. ulog_w(TAG, "mqtt客户端没有连接");
  389. return 0;
  390. }
  391. RyanMqttUnSubscribe(client, argv[2]);
  392. return 0;
  393. }
  394. /**
  395. * @brief mqtt获取已订阅主题
  396. *
  397. * @param argc
  398. * @param argv
  399. * @return int
  400. */
  401. static int MqttListSubscribe(int argc, char *argv[])
  402. {
  403. if (mqttConnectState != RyanMqttGetState(client))
  404. {
  405. ulog_w(TAG, "mqtt客户端没有连接");
  406. return 0;
  407. }
  408. RyanMqttMsgHandler_t msgHandles[10] = {0};
  409. int32_t subscribeNum = 0;
  410. int32_t result = RyanMqttSuccessError;
  411. result = RyanMqttGetSubscribe(client, msgHandles, sizeof(msgHandles) / sizeof(msgHandles[0]), &subscribeNum);
  412. if (result == RyanMqttNoRescourceError)
  413. ulog_w(TAG, "订阅主题数超过10个,已截断");
  414. ulog_i(TAG, "mqtt客户端已订阅的主题数: %d", subscribeNum);
  415. for (int32_t i = 0; i < subscribeNum; i++)
  416. ulog_i(TAG, "订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  417. return 0;
  418. }
  419. /**
  420. * @brief 打印ack链表
  421. *
  422. * @param argc
  423. * @param argv
  424. * @return int
  425. */
  426. static int MqttListAck(int argc, char *argv[])
  427. {
  428. RyanList_t *curr = NULL,
  429. *next = NULL;
  430. RyanMqttAckHandler_t *ackHandler = NULL;
  431. if (RyanListIsEmpty(&client->ackHandlerList))
  432. {
  433. ulog_i(TAG, "ack链表为空");
  434. return 0;
  435. }
  436. // 遍历链表
  437. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  438. {
  439. // 获取此节点的结构体
  440. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  441. // 发送qos1 / qos2消息服务器ack响应超时。需要重新发送它们。
  442. ulog_w(TAG, " type: %d, packetId is %d ", ackHandler->packetType, ackHandler->packetId);
  443. if (NULL != ackHandler->msgHandler)
  444. ulog_w(TAG, "topic: %s, qos: %d", ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  445. }
  446. return 0;
  447. }
  448. /**
  449. * @brief 打印msg链表
  450. *
  451. * @param argc
  452. * @param argv
  453. * @return int
  454. */
  455. static int MqttListMsg(int argc, char *argv[])
  456. {
  457. RyanList_t *curr = NULL,
  458. *next = NULL;
  459. RyanMqttMsgHandler_t *msgHandler = NULL;
  460. if (RyanListIsEmpty(&client->msgHandlerList))
  461. {
  462. ulog_i(TAG, "msg链表为空");
  463. return 0;
  464. }
  465. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  466. {
  467. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  468. ulog_w(TAG, "topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  469. }
  470. return 0;
  471. }
  472. /**
  473. * @brief 打印接收到的数据计数
  474. *
  475. * @param argc
  476. * @param argv
  477. * @return int
  478. */
  479. static int Mqttdata(int argc, char *argv[])
  480. {
  481. // mqtt data
  482. if (argc == 3)
  483. {
  484. uint32_t num = atoi(argv[2]);
  485. if (num < sizeof(mqttTest) / sizeof(mqttTest[0]) - 1)
  486. mqttTest[num] = 0;
  487. else
  488. ulog_e(TAG, "数组越界");
  489. }
  490. ulog_i(TAG, "dataEventCount: %d, publishCount:%u",
  491. mqttTest[dataEventCount], mqttTest[PublishedEventCount]);
  492. return 0;
  493. }
  494. static const struct RyanMqttCmdDes cmdTab[] =
  495. {
  496. {"help", "打印帮助信息", MqttHelp},
  497. {"state", "打印mqtt客户端状态", MqttState},
  498. {"connect", "mqtt客户端连接服务器", MqttConnect},
  499. {"disc", "mqtt客户端断开连接", MqttDisconnect},
  500. {"reconnect", "mqtt断开连接时,重新连接mqtt服务器", MqttReconnect},
  501. {"destory", "mqtt销毁客户端", MqttDestroy},
  502. {"pub", "mqtt发布消息", Mqttpublish},
  503. {"sub", "mqtt订阅主题", Mqttsubscribe},
  504. {"unsub", "mqtt取消订阅主题", MqttUnSubscribe},
  505. {"listsub", "mqtt获取已订阅主题", MqttListSubscribe},
  506. {"listack", "打印ack链表", MqttListAck},
  507. {"listmsg", "打印msg链表", MqttListMsg},
  508. {"data", "打印测试信息,用户自定义的", Mqttdata},
  509. };
  510. static int MqttHelp(int argc, char *argv[])
  511. {
  512. for (uint8_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  513. rt_kprintf("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  514. return 0;
  515. }
  516. static int RyanMqttMsh(int argc, char *argv[])
  517. {
  518. int32_t i = 0,
  519. result = 0;
  520. const struct RyanMqttCmdDes *runCmd = NULL;
  521. if (argc == 1)
  522. {
  523. MqttHelp(argc, argv);
  524. return 0;
  525. }
  526. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  527. {
  528. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  529. {
  530. runCmd = &cmdTab[i];
  531. break;
  532. }
  533. }
  534. if (runCmd == NULL)
  535. {
  536. MqttHelp(argc, argv);
  537. return 0;
  538. }
  539. if (runCmd->fun != NULL)
  540. result = runCmd->fun(argc, argv);
  541. return 0;
  542. }
  543. #if defined(RT_USING_MSH)
  544. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt command);
  545. #endif
  546. #endif