RyanMqtt.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. #include "rtconfig.h"
  2. #define RyanMqttHostName ("broker.emqx.io") // 填写你的mqtt服务器ip
  3. #define RyanMqttPort ("1883") // mqtt服务器端口
  4. #define RyanMqttUserName ("") // 为空时填写""
  5. #define RyanMqttPassword ("") // 为空时填写""
  6. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  7. #include <stdio.h>
  8. #include <stdint.h>
  9. #include <string.h>
  10. #include <board.h>
  11. #include <rtthread.h>
  12. #include <rtdevice.h>
  13. #include <rtdbg.h>
  14. #include "ulog.h"
  15. #include "RyanMqttClient.h"
  16. // ccm内存
  17. // 存放未初始化
  18. #define ccmBss __attribute__((section(".ccmbss")))
  19. #define delay(ms) rt_thread_mdelay(ms)
  20. static const char *TAG = "mqtt";
  21. RyanMqttClient_t *client = NULL;
  22. ccmBss static char mqttRecvBuffer[2048];
  23. ccmBss static char mqttSendBuffer[2048];
  24. // 具体数值计算可以查看事件回调函数
  25. static uint32_t mqttTest[10] = {0};
  26. #define dataEventCount (0) // 接收到几次数据
  27. #define PublishedEventCount (1) // 发布成功的次数
  28. void printfArrStr(char *buf, uint32_t len, char *userData)
  29. {
  30. rt_kprintf("%s", userData);
  31. for (uint32_t i = 0; i < len; i++)
  32. rt_kprintf("%x", buf[i]);
  33. rt_kprintf("\r\n");
  34. }
  35. /**
  36. * @brief mqtt事件回调处理函数
  37. * 事件的详细定义可以查看枚举定义
  38. *
  39. * @param pclient
  40. * @param event
  41. * @param eventData
  42. */
  43. void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
  44. {
  45. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  46. switch (event)
  47. {
  48. case RyanMqttEventError:
  49. break;
  50. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  51. ulog_i(TAG, "mqtt连接成功回调");
  52. break;
  53. case RyanMqttEventDisconnected:
  54. ulog_w(TAG, "mqtt断开连接回调 %d", *(int32_t *)eventData);
  55. break;
  56. case RyanMqttEventSubscribed:
  57. {
  58. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  59. ulog_w(TAG, "mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  60. break;
  61. }
  62. case RyanMqttEventSubscribedFaile:
  63. {
  64. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  65. ulog_w(TAG, "mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  66. break;
  67. }
  68. case RyanMqttEventUnSubscribed:
  69. {
  70. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  71. ulog_w(TAG, "mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  72. break;
  73. }
  74. case RyanMqttEventUnSubscribedFaile:
  75. {
  76. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  77. ulog_w(TAG, "mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  78. break;
  79. }
  80. case RyanMqttEventPublished:
  81. {
  82. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  83. ulog_w(TAG, "qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  84. mqttTest[PublishedEventCount]++;
  85. break;
  86. }
  87. case RyanMqttEventData:
  88. {
  89. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  90. ulog_i(TAG, " RyanMqtt topic recv callback! topic: %s, packetId: %d, payload len: %d \r\n \\
  91. data: %.*s",
  92. msgData->topic, msgData->packetId, msgData->payloadLen,
  93. msgData->payloadLen, msgData->payload);
  94. mqttTest[dataEventCount]++;
  95. break;
  96. }
  97. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  98. {
  99. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  100. ulog_w(TAG, "%s:%d %s()... packetType: %d, packetId: %d, topic: %s, qos: %d",
  101. __FILE__, __LINE__, __FUNCTION__,
  102. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  103. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  104. break;
  105. }
  106. case RyanMqttEventReconnectBefore:
  107. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  108. ulog_i(TAG, "重连前事件回调");
  109. RyanMqttClientConfig_t mqttConfig = {
  110. .clientId = "RyanMqttTest", // 这里只修改了客户端名字
  111. .userName = RyanMqttHostName,
  112. .password = RyanMqttPassword,
  113. .host = RyanMqttHostName,
  114. .port = RyanMqttPort,
  115. .taskName = "mqttThread",
  116. .taskPrio = 16,
  117. .taskStack = 4096,
  118. .recvBufferSize = sizeof(mqttRecvBuffer),
  119. .sendBufferSize = sizeof(mqttSendBuffer),
  120. .recvBuffer = mqttRecvBuffer,
  121. .sendBuffer = mqttSendBuffer,
  122. .recvBufferStaticFlag = RyanTrue,
  123. .sendBufferStaticFlag = RyanTrue,
  124. .mqttVersion = 4,
  125. .ackHandlerRepeatCountWarning = 6,
  126. .ackHandlerCountWarning = 20,
  127. .autoReconnectFlag = RyanTrue,
  128. .cleanSessionFlag = 0,
  129. .reconnectTimeout = 3000,
  130. .recvTimeout = 11000,
  131. .sendTimeout = 2000,
  132. .ackTimeout = 10000,
  133. .keepaliveTimeoutS = 60,
  134. .mqttEventHandle = mqttEventHandle,
  135. .userData = NULL};
  136. RyanMqttSetConfig(client, &mqttConfig);
  137. break;
  138. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  139. {
  140. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  141. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  142. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  143. ulog_i(TAG, "ack记数值超过警戒值回调: %d", ackHandlerCount);
  144. break;
  145. }
  146. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  147. {
  148. // 这里选择直接丢弃该消息
  149. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  150. ulog_i(TAG, "ack重发次数超过警戒值回调");
  151. ulog_w(TAG, "%s:%d %s()... packetType: %d, packetId: %d, topic: %s, qos: %d",
  152. __FILE__, __LINE__, __FUNCTION__,
  153. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  154. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  155. break;
  156. }
  157. case RyanMqttEventAckHandlerdiscard:
  158. {
  159. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  160. ulog_i(TAG, "ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  161. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  162. break;
  163. }
  164. case RyanMqttEventDestoryBefore:
  165. ulog_i(TAG, "销毁mqtt客户端前回调");
  166. break;
  167. default:
  168. break;
  169. }
  170. }
  171. void mqttConnectFun()
  172. {
  173. RyanMqttError_e result = RyanMqttSuccessError;
  174. RyanMqttClientConfig_t mqttConfig = {
  175. .clientId = "RyanMqttTessdfwrt",
  176. .userName = RyanMqttHostName,
  177. .password = RyanMqttPassword,
  178. .host = RyanMqttHostName,
  179. .port = RyanMqttPort,
  180. .taskName = "mqttThread",
  181. .taskPrio = 16,
  182. .taskStack = 4096,
  183. .recvBufferSize = sizeof(mqttRecvBuffer),
  184. .sendBufferSize = sizeof(mqttSendBuffer),
  185. .recvBuffer = mqttRecvBuffer,
  186. .sendBuffer = mqttSendBuffer,
  187. .recvBufferStaticFlag = RyanTrue,
  188. .sendBufferStaticFlag = RyanTrue,
  189. .mqttVersion = 4,
  190. .ackHandlerRepeatCountWarning = 6,
  191. .ackHandlerCountWarning = 20,
  192. .autoReconnectFlag = RyanTrue,
  193. .cleanSessionFlag = 0,
  194. .reconnectTimeout = 3000,
  195. .recvTimeout = 11000,
  196. .sendTimeout = 2000,
  197. .ackTimeout = 10000,
  198. .keepaliveTimeoutS = 60,
  199. .mqttEventHandle = mqttEventHandle,
  200. .userData = NULL};
  201. // 初始化mqtt客户端
  202. result = RyanMqttInit(&client);
  203. RyanMqttCheck(RyanMqttSuccessError == result, result);
  204. // 注册需要的事件回调
  205. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  206. RyanMqttCheck(RyanMqttSuccessError == result, result);
  207. // 设置mqtt客户端config
  208. result = RyanMqttSetConfig(client, &mqttConfig);
  209. RyanMqttCheck(RyanMqttSuccessError == result, result);
  210. // 设置遗嘱消息
  211. result = RyanMqttSetLwt(client, "pub/test", "this is will", strlen("this is will"), QOS0, 0);
  212. RyanMqttCheck(RyanMqttSuccessError == result, result);
  213. // 启动mqtt客户端线程
  214. result = RyanMqttStart(client);
  215. RyanMqttCheck(RyanMqttSuccessError == result, result);
  216. }
  217. /**
  218. * @brief mqtt msh命令
  219. *
  220. */
  221. struct RyanMqttCmdDes
  222. {
  223. const char *cmd;
  224. const char *explain;
  225. int (*fun)(int argc, char *argv[]);
  226. };
  227. static int MqttHelp(int argc, char *argv[]);
  228. /**
  229. * @brief 获取mqtt状态
  230. *
  231. * @param argc
  232. * @param argv
  233. * @return int
  234. */
  235. static int MqttState(int argc, char *argv[])
  236. {
  237. char *str = NULL;
  238. RyanMqttState_e clientState = RyanMqttGetState(client);
  239. switch (clientState)
  240. {
  241. case mqttInvalidState:
  242. str = "无效状态";
  243. break;
  244. case mqttInitState:
  245. str = "初始化状态";
  246. break;
  247. case mqttStartState:
  248. str = "mqtt开始状态";
  249. break;
  250. case mqttConnectState:
  251. str = "连接状态";
  252. break;
  253. case mqttDisconnectState:
  254. str = "断开连接状态";
  255. break;
  256. case mqttReconnectState:
  257. str = "重新连接状态";
  258. break;
  259. default:
  260. RyanMqttCheck(NULL, RyanMqttFailedError);
  261. break;
  262. }
  263. ulog_i(TAG, "client state: %s", clientState);
  264. return 0;
  265. }
  266. /**
  267. * @brief mqtt 连接服务器
  268. *
  269. * @param argc
  270. * @param argv
  271. * @return int
  272. */
  273. static int MqttConnect(int argc, char *argv[])
  274. {
  275. if (mqttConnectState == RyanMqttGetState(client))
  276. {
  277. ulog_w(TAG, "mqtt客户端没有连接");
  278. return 0;
  279. }
  280. mqttConnectFun();
  281. return 0;
  282. }
  283. /**
  284. * @brief mqtt 重连函数,注意事项请看函数简介
  285. *
  286. * @param argc
  287. * @param argv
  288. * @return int
  289. */
  290. static int MqttReconnect(int argc, char *argv[])
  291. {
  292. RyanMqttReconnect(client);
  293. return 0;
  294. }
  295. /**
  296. * @brief mqtt销毁客户端
  297. *
  298. * @param argc
  299. * @param argv
  300. * @return int
  301. */
  302. static int MqttDestroy(int argc, char *argv[])
  303. {
  304. RyanMqttDestroy(client);
  305. return 0;
  306. }
  307. /**
  308. * @brief 断开mqtt客户端连接
  309. *
  310. * @param argc
  311. * @param argv
  312. * @return int
  313. */
  314. static int MqttDisconnect(int argc, char *argv[])
  315. {
  316. if (mqttConnectState != RyanMqttGetState(client))
  317. {
  318. ulog_w(TAG, "mqtt客户端没有连接");
  319. return 0;
  320. }
  321. RyanMqttDisconnect(client, RyanTrue);
  322. return 0;
  323. }
  324. /**
  325. * @brief mqtt发布消息
  326. *
  327. * @param argc
  328. * @param argv
  329. * @return int
  330. */
  331. static int Mqttpublish(int argc, char *argv[])
  332. {
  333. if (argc < 7)
  334. {
  335. ulog_i(TAG, "请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  336. return 0;
  337. }
  338. if (mqttConnectState != RyanMqttGetState(client))
  339. {
  340. ulog_w(TAG, "mqtt客户端没有连接");
  341. return 0;
  342. }
  343. char *topic = argv[2];
  344. RyanMqttQos_e qos = atoi(argv[3]);
  345. char *payload = argv[4];
  346. uint16_t count = atoi(argv[5]);
  347. uint16_t delayTime = atoi(argv[6]);
  348. uint16_t pubCount = 0;
  349. ulog_i(TAG, "qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  350. for (uint16_t i = 0; i < count; i++)
  351. {
  352. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, strlen(payload), qos, 0))
  353. pubCount++;
  354. delay(delayTime);
  355. }
  356. delay(3000);
  357. ulog_e(TAG, "pubCount: %d", pubCount);
  358. return 0;
  359. }
  360. /**
  361. * @brief mqtt订阅主题,支持通配符
  362. *
  363. * @param argc
  364. * @param argv
  365. * @return int
  366. */
  367. static int Mqttsubscribe(int argc, char *argv[])
  368. {
  369. if (argc < 4)
  370. {
  371. ulog_i(TAG, "请输入 topic、 qos ");
  372. return 0;
  373. }
  374. if (mqttConnectState != RyanMqttGetState(client))
  375. {
  376. ulog_w(TAG, "mqtt客户端没有连接");
  377. return 0;
  378. }
  379. RyanMqttSubscribe(client, argv[2], atoi(argv[3]));
  380. return 0;
  381. }
  382. /**
  383. * @brief mqtt取消订阅主题
  384. *
  385. * @param argc
  386. * @param argv
  387. * @return int
  388. */
  389. static int MqttUnSubscribe(int argc, char *argv[])
  390. {
  391. if (argc < 3)
  392. {
  393. ulog_i(TAG, "请输入 取消订阅主题");
  394. return 0;
  395. }
  396. if (mqttConnectState != RyanMqttGetState(client))
  397. {
  398. ulog_w(TAG, "mqtt客户端没有连接");
  399. return 0;
  400. }
  401. RyanMqttUnSubscribe(client, argv[2]);
  402. return 0;
  403. }
  404. /**
  405. * @brief mqtt获取已订阅主题
  406. *
  407. * @param argc
  408. * @param argv
  409. * @return int
  410. */
  411. static int MqttListSubscribe(int argc, char *argv[])
  412. {
  413. if (mqttConnectState != RyanMqttGetState(client))
  414. {
  415. ulog_w(TAG, "mqtt客户端没有连接");
  416. return 0;
  417. }
  418. RyanMqttMsgHandler_t msgHandles[10] = {0};
  419. int32_t subscribeNum = 0;
  420. int32_t result = RyanMqttSuccessError;
  421. result = RyanMqttGetSubscribe(client, msgHandles, sizeof(msgHandles) / sizeof(msgHandles[0]), &subscribeNum);
  422. if (result == RyanMqttNoRescourceError)
  423. ulog_w(TAG, "订阅主题数超过10个,已截断");
  424. ulog_i(TAG, "mqtt客户端已订阅的主题数: %d", subscribeNum);
  425. for (int32_t i = 0; i < subscribeNum; i++)
  426. ulog_i(TAG, "订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  427. return 0;
  428. }
  429. /**
  430. * @brief 打印ack链表
  431. *
  432. * @param argc
  433. * @param argv
  434. * @return int
  435. */
  436. static int MqttListAck(int argc, char *argv[])
  437. {
  438. RyanList_t *curr = NULL,
  439. *next = NULL;
  440. RyanMqttAckHandler_t *ackHandler = NULL;
  441. if (RyanListIsEmpty(&client->ackHandlerList))
  442. {
  443. ulog_i(TAG, "ack链表为空");
  444. return 0;
  445. }
  446. // 遍历链表
  447. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  448. {
  449. // 获取此节点的结构体
  450. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  451. // 发送qos1 / qos2消息服务器ack响应超时。需要重新发送它们。
  452. ulog_w(TAG, " type: %d, packetId is %d ", ackHandler->packetType, ackHandler->packetId);
  453. if (NULL != ackHandler->msgHandler)
  454. ulog_w(TAG, "topic: %s, qos: %d", ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  455. }
  456. return 0;
  457. }
  458. /**
  459. * @brief 打印msg链表
  460. *
  461. * @param argc
  462. * @param argv
  463. * @return int
  464. */
  465. static int MqttListMsg(int argc, char *argv[])
  466. {
  467. RyanList_t *curr = NULL,
  468. *next = NULL;
  469. RyanMqttMsgHandler_t *msgHandler = NULL;
  470. if (RyanListIsEmpty(&client->msgHandlerList))
  471. {
  472. ulog_i(TAG, "msg链表为空");
  473. return 0;
  474. }
  475. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  476. {
  477. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  478. ulog_w(TAG, "topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  479. }
  480. return 0;
  481. }
  482. /**
  483. * @brief 打印接收到的数据计数
  484. *
  485. * @param argc
  486. * @param argv
  487. * @return int
  488. */
  489. static int Mqttdata(int argc, char *argv[])
  490. {
  491. // mqtt data
  492. if (argc == 3)
  493. {
  494. uint32_t num = atoi(argv[2]);
  495. if (num < sizeof(mqttTest) / sizeof(mqttTest[0]) - 1)
  496. mqttTest[num] = 0;
  497. else
  498. ulog_e(TAG, "数组越界");
  499. }
  500. ulog_i(TAG, "dataEventCount: %d, publishCount:%u",
  501. mqttTest[dataEventCount], mqttTest[PublishedEventCount]);
  502. return 0;
  503. }
  504. static const struct RyanMqttCmdDes cmdTab[] =
  505. {
  506. {"help", "打印帮助信息", MqttHelp},
  507. {"state", "打印mqtt客户端状态", MqttState},
  508. {"connect", "mqtt客户端连接服务器", MqttConnect},
  509. {"disc", "mqtt客户端断开连接", MqttDisconnect},
  510. {"reconnect", "mqtt断开连接时,重新连接mqtt服务器", MqttReconnect},
  511. {"destory", "mqtt销毁客户端", MqttDestroy},
  512. {"pub", "mqtt发布消息", Mqttpublish},
  513. {"sub", "mqtt订阅主题", Mqttsubscribe},
  514. {"unsub", "mqtt取消订阅主题", MqttUnSubscribe},
  515. {"listsub", "mqtt获取已订阅主题", MqttListSubscribe},
  516. {"listack", "打印ack链表", MqttListAck},
  517. {"listmsg", "打印msg链表", MqttListMsg},
  518. {"data", "打印测试信息,用户自定义的", Mqttdata},
  519. };
  520. static int MqttHelp(int argc, char *argv[])
  521. {
  522. for (uint8_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  523. rt_kprintf("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  524. return 0;
  525. }
  526. static int RyanMqttMsh(int argc, char *argv[])
  527. {
  528. int32_t i = 0,
  529. result = 0;
  530. const struct RyanMqttCmdDes *runCmd = NULL;
  531. if (argc == 1)
  532. {
  533. MqttHelp(argc, argv);
  534. return 0;
  535. }
  536. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  537. {
  538. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  539. {
  540. runCmd = &cmdTab[i];
  541. break;
  542. }
  543. }
  544. if (runCmd == NULL)
  545. {
  546. MqttHelp(argc, argv);
  547. return 0;
  548. }
  549. if (runCmd->fun != NULL)
  550. result = runCmd->fun(argc, argv);
  551. return 0;
  552. }
  553. #if defined(RT_USING_MSH)
  554. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt command);
  555. #endif
  556. #endif