RyanMqttExample.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. // NOLINTBEGIN(cert-err34-c,llvm-include-order)
  2. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  3. #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  4. #include <stdint.h>
  5. #include <string.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include "rtconfig.h"
  9. #include <board.h>
  10. #include <rtthread.h>
  11. #include <rtdevice.h>
  12. #include <rtdbg.h>
  13. #include "RyanMqttLog.h"
  14. #include "RyanMqttClient.h"
  15. #define RyanMqttClientId ("RyanMqttTessdfwrt") // 填写mqtt客户端id,要求唯一
  16. #define RyanMqttHost ("broker.emqx.io") // 填写你的mqtt服务器ip
  17. #define RyanMqttPort (1883) // mqtt服务器端口
  18. #define RyanMqttUserName (NULL) // 填写你的用户名,没有填NULL
  19. #define RyanMqttPassword (NULL) // 填写你的密码,没有填NULL
  20. #define delay(ms) rt_thread_mdelay(ms)
  21. static RyanMqttClient_t *client = NULL;
  22. static int32_t pubTestPublishedEventCount = 0; // qos1和qos2发布成功的次数统计
  23. static int32_t pubTestDataEventCount = 0; // 接收到数据次数统计
  24. static void printfArrStr(char *buf, uint32_t len, char *userData)
  25. {
  26. RyanMqttLog_raw("%s", userData);
  27. for (uint32_t i = 0; i < len; i++)
  28. {
  29. RyanMqttLog_raw("%x", buf[i]);
  30. }
  31. RyanMqttLog_raw("\r\n");
  32. }
  33. /**
  34. * @brief mqtt事件回调处理函数
  35. * 事件的详细定义可以查看枚举定义
  36. *
  37. * @param pclient
  38. * @param event
  39. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  40. */
  41. static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  42. {
  43. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  44. switch (event)
  45. {
  46. case RyanMqttEventError: break;
  47. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  48. RyanMqttLog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  49. break;
  50. case RyanMqttEventDisconnected: RyanMqttLog_w("mqtt断开连接回调 %d", *(int32_t *)eventData); break;
  51. case RyanMqttEventSubscribed: {
  52. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  53. RyanMqttLog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  54. break;
  55. }
  56. case RyanMqttEventSubscribedFailed: {
  57. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  58. RyanMqttLog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  59. break;
  60. }
  61. case RyanMqttEventUnSubscribed: {
  62. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  63. RyanMqttLog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  64. break;
  65. }
  66. case RyanMqttEventUnSubscribedFailed: {
  67. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  68. RyanMqttLog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  69. break;
  70. }
  71. case RyanMqttEventPublished: {
  72. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  73. RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  74. pubTestPublishedEventCount++;
  75. break;
  76. }
  77. case RyanMqttEventData: {
  78. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  79. RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d", msgData->topicLen,
  80. msgData->topic, msgData->packetId, msgData->payloadLen);
  81. RyanMqttLog_i("%.*s", msgData->payloadLen, msgData->payload);
  82. pubTestDataEventCount++;
  83. break;
  84. }
  85. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  86. {
  87. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  88. RyanMqttLog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  89. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
  90. ackHandler->msgHandler->qos);
  91. printfArrStr((char *)ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  92. break;
  93. }
  94. case RyanMqttEventReconnectBefore:
  95. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  96. RyanMqttLog_i("重连前事件回调");
  97. break;
  98. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  99. {
  100. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  101. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  102. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  103. RyanMqttLog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  104. break;
  105. }
  106. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  107. {
  108. // 这里选择直接丢弃该消息
  109. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  110. RyanMqttLog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d",
  111. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
  112. ackHandler->msgHandler->qos);
  113. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  114. break;
  115. }
  116. case RyanMqttEventAckHandlerDiscard: {
  117. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  118. RyanMqttLog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType,
  119. ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  120. break;
  121. }
  122. case RyanMqttEventDestroyBefore: RyanMqttLog_i("销毁mqtt客户端前回调"); break;
  123. default: break;
  124. }
  125. }
  126. /**
  127. * @brief mqtt msh命令
  128. *
  129. */
  130. struct RyanMqttCmdDes
  131. {
  132. const char *cmd;
  133. const char *explain;
  134. int (*fun)(int argc, char *argv[]);
  135. };
  136. static int MqttHelp(int argc, char *argv[]);
  137. /**
  138. * @brief 获取mqtt状态
  139. *
  140. * @param argc
  141. * @param argv
  142. * @return int
  143. */
  144. static int MqttState(int argc, char *argv[])
  145. {
  146. char *str = NULL;
  147. RyanMqttState_e clientState = RyanMqttGetState(client);
  148. switch (clientState)
  149. {
  150. case RyanMqttInvalidState: str = "无效状态"; break;
  151. case RyanMqttInitState: str = "初始化状态"; break;
  152. case RyanMqttStartState: str = "mqtt开始状态"; break;
  153. case RyanMqttConnectState: str = "连接状态"; break;
  154. case RyanMqttDisconnectState: str = "断开连接状态"; break;
  155. case RyanMqttReconnectState: str = "重新连接状态"; break;
  156. default:
  157. RyanMqttLog_e("Invalid client state: %d", clientState);
  158. str = "未知状态";
  159. break;
  160. }
  161. RyanMqttLog_i("client state: %s", str);
  162. return 0;
  163. }
  164. /**
  165. * @brief mqtt 连接服务器
  166. *
  167. * @param argc
  168. * @param argv
  169. * @return int
  170. */
  171. static int MqttConnect(int argc, char *argv[])
  172. {
  173. if (RyanMqttConnectState == RyanMqttGetState(client))
  174. {
  175. RyanMqttLog_w("mqtt客户端已经连接,请不要重复连接");
  176. return 0;
  177. }
  178. RyanMqttLog_i("mqtt开始连接");
  179. RyanMqttError_e result = RyanMqttSuccessError;
  180. RyanMqttClientConfig_t mqttConfig = {.clientId = RyanMqttClientId,
  181. .userName = RyanMqttUserName,
  182. .password = RyanMqttPassword,
  183. .host = RyanMqttHost,
  184. .port = RyanMqttPort,
  185. .taskName = "mqttThread",
  186. .taskPrio = 16,
  187. .taskStack = 2048,
  188. .mqttVersion = 4,
  189. .ackHandlerRepeatCountWarning = 6,
  190. .ackHandlerCountWarning = 20,
  191. .autoReconnectFlag = RyanMqttTrue,
  192. .cleanSessionFlag = RyanMqttFalse,
  193. .reconnectTimeout = 3000,
  194. .recvTimeout = 5000,
  195. .sendTimeout = 2000,
  196. .ackTimeout = 10000,
  197. .keepaliveTimeoutS = 120,
  198. .mqttEventHandle = mqttEventHandle,
  199. .userData = NULL};
  200. // 初始化mqtt客户端
  201. result = RyanMqttInit(&client);
  202. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  203. // 注册需要的事件回调
  204. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  205. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  206. // 设置mqtt客户端config
  207. result = RyanMqttSetConfig(client, &mqttConfig);
  208. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  209. // 设置遗嘱消息
  210. result = RyanMqttSetLwt(client, "pub/test", "this is will", RyanMqttStrlen("this is will"), RyanMqttQos0, 0);
  211. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  212. // 启动mqtt客户端线程
  213. result = RyanMqttStart(client);
  214. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  215. RyanMqttLog_i("mqtt已开启自动重连,断连后会每3秒尝试重连一次");
  216. return 0;
  217. }
  218. /**
  219. * @brief mqtt 重连函数,注意事项请看函数简介
  220. *
  221. * @param argc
  222. * @param argv
  223. * @return int
  224. */
  225. static int MqttReconnect(int argc, char *argv[])
  226. {
  227. RyanMqttReconnect(client);
  228. return 0;
  229. }
  230. /**
  231. * @brief mqtt销毁客户端
  232. *
  233. * @param argc
  234. * @param argv
  235. * @return int
  236. */
  237. static int MqttDestroy(int argc, char *argv[])
  238. {
  239. RyanMqttDestroy(client);
  240. client = NULL;
  241. return 0;
  242. }
  243. /**
  244. * @brief 断开mqtt客户端连接
  245. *
  246. * @param argc
  247. * @param argv
  248. * @return int
  249. */
  250. static int MqttDisconnect(int argc, char *argv[])
  251. {
  252. if (RyanMqttConnectState != RyanMqttGetState(client))
  253. {
  254. RyanMqttLog_w("mqtt客户端没有连接");
  255. return 0;
  256. }
  257. RyanMqttDisconnect(client, RyanMqttTrue);
  258. return 0;
  259. }
  260. /**
  261. * @brief mqtt发布消息
  262. *
  263. * @param argc
  264. * @param argv
  265. * @return int
  266. */
  267. static int Mqttpublish(int argc, char *argv[])
  268. {
  269. if (argc < 7)
  270. {
  271. RyanMqttLog_w("参数不完整! 请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  272. return 0;
  273. }
  274. if (RyanMqttConnectState != RyanMqttGetState(client))
  275. {
  276. RyanMqttLog_w("mqtt客户端没有连接");
  277. return 0;
  278. }
  279. char *topic = argv[2];
  280. RyanMqttQos_e qos = atoi(argv[3]);
  281. char *payload = argv[4];
  282. uint16_t count = atoi(argv[5]);
  283. uint16_t delayTime = atoi(argv[6]);
  284. uint16_t pubCount = 0;
  285. RyanMqttLog_i("qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  286. for (uint16_t i = 0; i < count; i++)
  287. {
  288. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, RyanMqttStrlen(payload), qos, 0))
  289. {
  290. pubCount++;
  291. }
  292. delay(delayTime);
  293. }
  294. RyanMqttLog_w("pubCount: %d", pubCount);
  295. return 0;
  296. }
  297. /**
  298. * @brief mqtt订阅主题,支持通配符
  299. *
  300. * @param argc
  301. * @param argv
  302. * @return int
  303. */
  304. static int Mqttsubscribe(int argc, char *argv[])
  305. {
  306. if (argc < 4)
  307. {
  308. RyanMqttLog_w("参数不完整! 请输入 topic、 qos ");
  309. return 0;
  310. }
  311. if (RyanMqttConnectState != RyanMqttGetState(client))
  312. {
  313. RyanMqttLog_w("mqtt客户端没有连接");
  314. return 0;
  315. }
  316. int qos = atoi(argv[3]);
  317. if (qos < 0 || qos > 2)
  318. {
  319. RyanMqttLog_w("无效的QoS值: %d,必须为0-2", qos);
  320. return 0;
  321. }
  322. RyanMqttSubscribe(client, argv[2], qos);
  323. return 0;
  324. }
  325. /**
  326. * @brief mqtt取消订阅主题
  327. *
  328. * @param argc
  329. * @param argv
  330. * @return int
  331. */
  332. static int MqttUnSubscribe(int argc, char *argv[])
  333. {
  334. if (argc < 3)
  335. {
  336. RyanMqttLog_w("参数不完整! 请输入 取消订阅主题");
  337. return 0;
  338. }
  339. if (RyanMqttConnectState != RyanMqttGetState(client))
  340. {
  341. RyanMqttLog_w("mqtt客户端没有连接");
  342. return 0;
  343. }
  344. RyanMqttUnSubscribe(client, argv[2]);
  345. return 0;
  346. }
  347. /**
  348. * @brief mqtt获取已订阅主题
  349. *
  350. * @param argc
  351. * @param argv
  352. * @return int
  353. */
  354. static int MqttListSubscribe(int argc, char *argv[])
  355. {
  356. if (RyanMqttConnectState != RyanMqttGetState(client))
  357. {
  358. RyanMqttLog_w("mqtt客户端没有连接");
  359. return 0;
  360. }
  361. RyanMqttMsgHandler_t *msgHandles = NULL;
  362. int32_t subscribeNum = 0;
  363. int32_t result = RyanMqttSuccessError;
  364. result = RyanMqttGetSubscribeSafe(client, &msgHandles, &subscribeNum);
  365. if (RyanMqttSuccessError != result)
  366. {
  367. RyanMqttLog_e("获取订阅主题数失败可能是内存不足");
  368. }
  369. RyanMqttLog_i("mqtt客户端已订阅的主题数: %d", subscribeNum);
  370. for (int32_t i = 0; i < subscribeNum; i++)
  371. {
  372. RyanMqttLog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  373. }
  374. if (subscribeNum > 0)
  375. {
  376. RyanMqttSafeFreeSubscribeResources(msgHandles, subscribeNum);
  377. }
  378. return 0;
  379. }
  380. /**
  381. * @brief 打印接收到的数据计数
  382. *
  383. * @param argc
  384. * @param argv
  385. * @return int
  386. */
  387. static int Mqttdata(int argc, char *argv[])
  388. {
  389. RyanMqttLog_i("接收到数据次数统计: %d, qos1和qos2发布成功的次数统计: %d", pubTestDataEventCount,
  390. pubTestPublishedEventCount);
  391. return 0;
  392. }
  393. static const struct RyanMqttCmdDes cmdTab[] = {
  394. // {"help", "打印帮助信息 params: null", MqttHelp},
  395. // {"state", "打印mqtt客户端状态 params: null", MqttState},
  396. // {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  397. // {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  398. // {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  399. // {"destroy", "mqtt销毁客户端 params: null", MqttDestroy},
  400. // {"pub", "mqtt发布消息 params: topic、qos、payload内容、发送条数、间隔时间(可以为0)",
  401. // Mqttpublish},
  402. // {"sub", "mqtt订阅主题 params: topic、qos", Mqttsubscribe},
  403. // {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  404. // {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  405. // {"listmsg", "打印msg链表 params: null", MqttListMsg},
  406. // {"data", "打印测试信息 params: null", Mqttdata},
  407. {"help", "打印帮助信息 params: null", MqttHelp},
  408. {"state", "打印mqtt客户端状态 params: null", MqttState},
  409. {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  410. {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  411. {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  412. {"destroy", "mqtt销毁客户端 params: null", MqttDestroy},
  413. {"pub", "mqtt发布消息 params: topic、qos、payload内容、发送条数、间隔时间(可以为0)", Mqttpublish},
  414. {"sub", "mqtt订阅主题 params: topic、qos", Mqttsubscribe},
  415. {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  416. {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  417. {"data", "打印测试信息 params: null", Mqttdata},
  418. };
  419. static int MqttHelp(int argc, char *argv[])
  420. {
  421. for (int32_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  422. {
  423. RyanMqttLog_raw("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  424. }
  425. return 0;
  426. }
  427. static int RyanMqttMsh(int argc, char *argv[])
  428. {
  429. int32_t i = 0, result = 0;
  430. const struct RyanMqttCmdDes *runCmd = NULL;
  431. if (argc == 1)
  432. {
  433. MqttHelp(argc, argv);
  434. return 0;
  435. }
  436. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  437. {
  438. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  439. {
  440. runCmd = &cmdTab[i];
  441. break;
  442. }
  443. }
  444. if (runCmd == NULL)
  445. {
  446. MqttHelp(argc, argv);
  447. return 0;
  448. }
  449. if (runCmd->fun != NULL)
  450. {
  451. result = runCmd->fun(argc, argv);
  452. }
  453. return result;
  454. }
  455. #if defined(RT_USING_MSH)
  456. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt示例程序);
  457. #endif
  458. #endif
  459. // NOLINTEND(cert-err34-c,llvm-include-order)