RyanMqttExample.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. // NOLINTBEGIN(cert-err34-c,llvm-include-order)
  2. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  3. #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  4. #include <stdint.h>
  5. #include <string.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include "rtconfig.h"
  9. #include <board.h>
  10. #include <rtthread.h>
  11. #include <rtdevice.h>
  12. #include <rtdbg.h>
  13. #include "RyanMqttLog.h"
  14. #include "RyanMqttClient.h"
  15. #define RyanMqttClientId ("RyanMqttTessdfwrt") // 填写mqtt客户端id,要求唯一
  16. #define RyanMqttHost ("broker.emqx.io") // 填写你的mqtt服务器ip
  17. #define RyanMqttPort (1883) // mqtt服务器端口
  18. #define RyanMqttUserName (NULL) // 填写你的用户名,没有填NULL
  19. #define RyanMqttPassword (NULL) // 填写你的密码,没有填NULL
  20. #define delay(ms) rt_thread_mdelay(ms)
  21. static RyanMqttClient_t *client = NULL;
  22. static int32_t pubTestPublishedEventCount = 0; // qos1和qos2发布成功的次数统计
  23. static int32_t pubTestDataEventCount = 0; // 接收到数据次数统计
  24. static void printfArrStr(char *buf, uint32_t len, char *userData)
  25. {
  26. RyanMqttLog_raw("%s", userData);
  27. for (uint32_t i = 0; i < len; i++)
  28. {
  29. RyanMqttLog_raw("%x", buf[i]);
  30. }
  31. RyanMqttLog_raw("\r\n");
  32. }
  33. /**
  34. * @brief mqtt事件回调处理函数
  35. * 事件的详细定义可以查看枚举定义
  36. *
  37. * @param pclient
  38. * @param event
  39. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  40. */
  41. static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  42. {
  43. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  44. switch (event)
  45. {
  46. case RyanMqttEventError: break;
  47. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  48. RyanMqttLog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  49. break;
  50. case RyanMqttEventDisconnected: RyanMqttLog_w("mqtt断开连接回调 %d", *(int32_t *)eventData); break;
  51. case RyanMqttEventSubscribed: {
  52. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  53. RyanMqttLog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  54. break;
  55. }
  56. case RyanMqttEventSubscribedFailed: {
  57. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  58. RyanMqttLog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  59. break;
  60. }
  61. case RyanMqttEventUnSubscribed: {
  62. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  63. RyanMqttLog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  64. break;
  65. }
  66. case RyanMqttEventUnSubscribedFailed: {
  67. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  68. RyanMqttLog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  69. break;
  70. }
  71. case RyanMqttEventPublished: {
  72. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  73. RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  74. pubTestPublishedEventCount++;
  75. break;
  76. }
  77. case RyanMqttEventData: {
  78. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  79. RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d", msgData->topicLen,
  80. msgData->topic, msgData->packetId, msgData->payloadLen);
  81. RyanMqttLog_i("%.*s", msgData->payloadLen, msgData->payload);
  82. pubTestDataEventCount++;
  83. break;
  84. }
  85. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  86. {
  87. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  88. RyanMqttLog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  89. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
  90. ackHandler->msgHandler->qos);
  91. printfArrStr((char *)ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  92. break;
  93. }
  94. case RyanMqttEventReconnectBefore:
  95. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  96. RyanMqttLog_i("重连前事件回调");
  97. break;
  98. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  99. {
  100. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  101. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  102. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  103. RyanMqttLog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  104. break;
  105. }
  106. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  107. {
  108. // 这里选择直接丢弃该消息
  109. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  110. RyanMqttLog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d",
  111. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
  112. ackHandler->msgHandler->qos);
  113. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  114. break;
  115. }
  116. case RyanMqttEventAckHandlerDiscard: {
  117. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  118. RyanMqttLog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType,
  119. ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  120. break;
  121. }
  122. case RyanMqttEventDestroyBefore: RyanMqttLog_i("销毁mqtt客户端前回调"); break;
  123. case RyanMqttEventUnsubscribedData: {
  124. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  125. RyanMqttLog_i("接收到未匹配任何订阅主题的报文事件 topic: %.*s, packetId: %d, payload len: %d",
  126. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
  127. break;
  128. }
  129. default: break;
  130. }
  131. }
  132. /**
  133. * @brief mqtt msh命令
  134. *
  135. */
  136. struct RyanMqttCmdDes
  137. {
  138. const char *cmd;
  139. const char *explain;
  140. int (*fun)(int argc, char *argv[]);
  141. };
  142. static int MqttHelp(int argc, char *argv[]);
  143. /**
  144. * @brief 获取mqtt状态
  145. *
  146. * @param argc
  147. * @param argv
  148. * @return int
  149. */
  150. static int MqttState(int argc, char *argv[])
  151. {
  152. char *str = NULL;
  153. RyanMqttState_e clientState = RyanMqttGetState(client);
  154. switch (clientState)
  155. {
  156. case RyanMqttInvalidState: str = "无效状态"; break;
  157. case RyanMqttInitState: str = "初始化状态"; break;
  158. case RyanMqttStartState: str = "mqtt开始状态"; break;
  159. case RyanMqttConnectState: str = "连接状态"; break;
  160. case RyanMqttDisconnectState: str = "断开连接状态"; break;
  161. case RyanMqttReconnectState: str = "重新连接状态"; break;
  162. default:
  163. RyanMqttLog_e("Invalid client state: %d", clientState);
  164. str = "未知状态";
  165. break;
  166. }
  167. RyanMqttLog_i("client state: %s", str);
  168. return 0;
  169. }
  170. /**
  171. * @brief mqtt 连接服务器
  172. *
  173. * @param argc
  174. * @param argv
  175. * @return int
  176. */
  177. static int MqttConnect(int argc, char *argv[])
  178. {
  179. if (RyanMqttConnectState == RyanMqttGetState(client))
  180. {
  181. RyanMqttLog_w("mqtt客户端已经连接,请不要重复连接");
  182. return 0;
  183. }
  184. RyanMqttLog_i("mqtt开始连接");
  185. RyanMqttError_e result = RyanMqttSuccessError;
  186. RyanMqttClientConfig_t mqttConfig = {.clientId = RyanMqttClientId,
  187. .userName = RyanMqttUserName,
  188. .password = RyanMqttPassword,
  189. .host = RyanMqttHost,
  190. .port = RyanMqttPort,
  191. .taskName = "mqttThread",
  192. .taskPrio = 16,
  193. .taskStack = 2048,
  194. .mqttVersion = 4,
  195. .ackHandlerRepeatCountWarning = 6,
  196. .ackHandlerCountWarning = 20,
  197. .autoReconnectFlag = RyanMqttTrue,
  198. .cleanSessionFlag = RyanMqttFalse,
  199. .reconnectTimeout = 3000,
  200. .recvTimeout = 5000,
  201. .sendTimeout = 2000,
  202. .ackTimeout = 10000,
  203. .keepaliveTimeoutS = 120,
  204. .mqttEventHandle = mqttEventHandle,
  205. .userData = NULL};
  206. // 初始化mqtt客户端
  207. result = RyanMqttInit(&client);
  208. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  209. // 注册需要的事件回调
  210. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  211. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  212. // 设置mqtt客户端config
  213. result = RyanMqttSetConfig(client, &mqttConfig);
  214. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  215. // 设置遗嘱消息
  216. result = RyanMqttSetLwt(client, "pub/test", "this is will", RyanMqttStrlen("this is will"), RyanMqttQos0, 0);
  217. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  218. // 启动mqtt客户端线程
  219. result = RyanMqttStart(client);
  220. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  221. RyanMqttLog_i("mqtt已开启自动重连,断连后会每3秒尝试重连一次");
  222. return 0;
  223. }
  224. /**
  225. * @brief mqtt 重连函数,注意事项请看函数简介
  226. *
  227. * @param argc
  228. * @param argv
  229. * @return int
  230. */
  231. static int MqttReconnect(int argc, char *argv[])
  232. {
  233. RyanMqttReconnect(client);
  234. return 0;
  235. }
  236. /**
  237. * @brief mqtt销毁客户端
  238. *
  239. * @param argc
  240. * @param argv
  241. * @return int
  242. */
  243. static int MqttDestroy(int argc, char *argv[])
  244. {
  245. RyanMqttDestroy(client);
  246. client = NULL;
  247. return 0;
  248. }
  249. /**
  250. * @brief 断开mqtt客户端连接
  251. *
  252. * @param argc
  253. * @param argv
  254. * @return int
  255. */
  256. static int MqttDisconnect(int argc, char *argv[])
  257. {
  258. if (RyanMqttConnectState != RyanMqttGetState(client))
  259. {
  260. RyanMqttLog_w("mqtt客户端没有连接");
  261. return 0;
  262. }
  263. RyanMqttDisconnect(client, RyanMqttTrue);
  264. return 0;
  265. }
  266. /**
  267. * @brief mqtt发布消息
  268. *
  269. * @param argc
  270. * @param argv
  271. * @return int
  272. */
  273. static int Mqttpublish(int argc, char *argv[])
  274. {
  275. if (argc < 7)
  276. {
  277. RyanMqttLog_w("参数不完整! 请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  278. return 0;
  279. }
  280. if (RyanMqttConnectState != RyanMqttGetState(client))
  281. {
  282. RyanMqttLog_w("mqtt客户端没有连接");
  283. return 0;
  284. }
  285. char *topic = argv[2];
  286. RyanMqttQos_e qos = atoi(argv[3]);
  287. char *payload = argv[4];
  288. uint16_t count = atoi(argv[5]);
  289. uint16_t delayTime = atoi(argv[6]);
  290. uint16_t pubCount = 0;
  291. RyanMqttLog_i("qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  292. for (uint16_t i = 0; i < count; i++)
  293. {
  294. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, RyanMqttStrlen(payload), qos, 0))
  295. {
  296. pubCount++;
  297. }
  298. delay(delayTime);
  299. }
  300. RyanMqttLog_w("pubCount: %d", pubCount);
  301. return 0;
  302. }
  303. /**
  304. * @brief mqtt订阅主题,支持通配符
  305. *
  306. * @param argc
  307. * @param argv
  308. * @return int
  309. */
  310. static int Mqttsubscribe(int argc, char *argv[])
  311. {
  312. if (argc < 4)
  313. {
  314. RyanMqttLog_w("参数不完整! 请输入 topic、 qos ");
  315. return 0;
  316. }
  317. if (RyanMqttConnectState != RyanMqttGetState(client))
  318. {
  319. RyanMqttLog_w("mqtt客户端没有连接");
  320. return 0;
  321. }
  322. int qos = atoi(argv[3]);
  323. if (qos < 0 || qos > 2)
  324. {
  325. RyanMqttLog_w("无效的QoS值: %d,必须为0-2", qos);
  326. return 0;
  327. }
  328. RyanMqttSubscribe(client, argv[2], qos);
  329. return 0;
  330. }
  331. /**
  332. * @brief mqtt取消订阅主题
  333. *
  334. * @param argc
  335. * @param argv
  336. * @return int
  337. */
  338. static int MqttUnSubscribe(int argc, char *argv[])
  339. {
  340. if (argc < 3)
  341. {
  342. RyanMqttLog_w("参数不完整! 请输入 取消订阅主题");
  343. return 0;
  344. }
  345. if (RyanMqttConnectState != RyanMqttGetState(client))
  346. {
  347. RyanMqttLog_w("mqtt客户端没有连接");
  348. return 0;
  349. }
  350. RyanMqttUnSubscribe(client, argv[2]);
  351. return 0;
  352. }
  353. /**
  354. * @brief mqtt获取已订阅主题
  355. *
  356. * @param argc
  357. * @param argv
  358. * @return int
  359. */
  360. static int MqttListSubscribe(int argc, char *argv[])
  361. {
  362. if (RyanMqttConnectState != RyanMqttGetState(client))
  363. {
  364. RyanMqttLog_w("mqtt客户端没有连接");
  365. return 0;
  366. }
  367. RyanMqttMsgHandler_t *msgHandles = NULL;
  368. int32_t subscribeNum = 0;
  369. int32_t result = RyanMqttSuccessError;
  370. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  371. if (RyanMqttSuccessError != result)
  372. {
  373. RyanMqttLog_e("获取订阅主题数失败可能是内存不足");
  374. }
  375. RyanMqttLog_i("mqtt客户端已订阅的主题数: %d", subscribeNum);
  376. for (int32_t i = 0; i < subscribeNum; i++)
  377. {
  378. RyanMqttLog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  379. }
  380. if (subscribeNum > 0)
  381. {
  382. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  383. }
  384. return 0;
  385. }
  386. /**
  387. * @brief 打印接收到的数据计数
  388. *
  389. * @param argc
  390. * @param argv
  391. * @return int
  392. */
  393. static int Mqttdata(int argc, char *argv[])
  394. {
  395. RyanMqttLog_i("接收到数据次数统计: %d, qos1和qos2发布成功的次数统计: %d", pubTestDataEventCount,
  396. pubTestPublishedEventCount);
  397. return 0;
  398. }
  399. static const struct RyanMqttCmdDes cmdTab[] = {
  400. // {"help", "打印帮助信息 params: null", MqttHelp},
  401. // {"state", "打印mqtt客户端状态 params: null", MqttState},
  402. // {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  403. // {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  404. // {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  405. // {"destroy", "mqtt销毁客户端 params: null", MqttDestroy},
  406. // {"pub", "mqtt发布消息 params: topic、qos、payload内容、发送条数、间隔时间(可以为0)",
  407. // Mqttpublish},
  408. // {"sub", "mqtt订阅主题 params: topic、qos", Mqttsubscribe},
  409. // {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  410. // {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  411. // {"listmsg", "打印msg链表 params: null", MqttListMsg},
  412. // {"data", "打印测试信息 params: null", Mqttdata},
  413. {"help", "打印帮助信息 params: null", MqttHelp},
  414. {"state", "打印mqtt客户端状态 params: null", MqttState},
  415. {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  416. {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  417. {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  418. {"destroy", "mqtt销毁客户端 params: null", MqttDestroy},
  419. {"pub", "mqtt发布消息 params: topic、qos、payload内容、发送条数、间隔时间(可以为0)", Mqttpublish},
  420. {"sub", "mqtt订阅主题 params: topic、qos", Mqttsubscribe},
  421. {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  422. {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  423. {"data", "打印测试信息 params: null", Mqttdata},
  424. };
  425. static int MqttHelp(int argc, char *argv[])
  426. {
  427. for (int32_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  428. {
  429. RyanMqttLog_raw("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  430. }
  431. return 0;
  432. }
  433. static int RyanMqttMsh(int argc, char *argv[])
  434. {
  435. int32_t i = 0, result = 0;
  436. const struct RyanMqttCmdDes *runCmd = NULL;
  437. if (argc == 1)
  438. {
  439. MqttHelp(argc, argv);
  440. return 0;
  441. }
  442. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  443. {
  444. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  445. {
  446. runCmd = &cmdTab[i];
  447. break;
  448. }
  449. }
  450. if (runCmd == NULL)
  451. {
  452. MqttHelp(argc, argv);
  453. return 0;
  454. }
  455. if (runCmd->fun != NULL)
  456. {
  457. result = runCmd->fun(argc, argv);
  458. }
  459. return result;
  460. }
  461. #if defined(RT_USING_MSH)
  462. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt示例程序);
  463. #endif
  464. #endif
  465. // NOLINTEND(cert-err34-c,llvm-include-order)