RyanMqttTest.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. #include "rtconfig.h"
  2. #define RyanMqttClientId ("RyanMqttTessdfwrt") // 填写mqtt客户端id,要求唯一
  3. #define RyanMqttHost ("broker.emqx.io") // 填写你的mqtt服务器ip
  4. #define RyanMqttPort (1883) // mqtt服务器端口
  5. #define RyanMqttUserName (NULL) // 填写你的用户名,没有填NULL
  6. #define RyanMqttPassword (NULL) // 填写你的密码,没有填NULL
  7. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  8. #include <stdio.h>
  9. #include <stdint.h>
  10. #include <string.h>
  11. #include <stdlib.h>
  12. #include <board.h>
  13. #include <rtthread.h>
  14. #include <rtdevice.h>
  15. #include <rtdbg.h>
  16. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  17. #include "RyanMqttLog.h"
  18. #include "RyanMqttClient.h"
  19. #define delay(ms) rt_thread_mdelay(ms)
  20. static RyanMqttClient_t *client = NULL;
  21. static char mqttRecvBuffer[512];
  22. static char mqttSendBuffer[512];
  23. // 具体数值计算可以查看事件回调函数
  24. static uint32_t mqttTest[10] = {0};
  25. #define dataEventCount (0) // 接收到数据次数统计
  26. #define PublishedEventCount (1) // qos1和qos2发布成功的次数统计
  27. static void printfArrStr(char *buf, uint32_t len, char *userData)
  28. {
  29. rlog_raw("%s", userData);
  30. for (uint32_t i = 0; i < len; i++)
  31. rlog_raw("%x", buf[i]);
  32. rlog_raw("\r\n");
  33. }
  34. /**
  35. * @brief mqtt事件回调处理函数
  36. * 事件的详细定义可以查看枚举定义
  37. *
  38. * @param pclient
  39. * @param event
  40. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  41. */
  42. static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  43. {
  44. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  45. switch (event)
  46. {
  47. case RyanMqttEventError:
  48. break;
  49. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  50. rlog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  51. break;
  52. case RyanMqttEventDisconnected:
  53. rlog_w("mqtt断开连接回调 %d", *(int32_t *)eventData);
  54. break;
  55. case RyanMqttEventSubscribed:
  56. {
  57. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  58. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  59. break;
  60. }
  61. case RyanMqttEventSubscribedFaile:
  62. {
  63. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  64. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  65. break;
  66. }
  67. case RyanMqttEventUnSubscribed:
  68. {
  69. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  70. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  71. break;
  72. }
  73. case RyanMqttEventUnSubscribedFaile:
  74. {
  75. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  76. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  77. break;
  78. }
  79. case RyanMqttEventPublished:
  80. {
  81. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  82. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  83. pubTestPublishedEventCount++;
  84. break;
  85. }
  86. case RyanMqttEventData:
  87. {
  88. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  89. rlog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d",
  90. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
  91. rlog_i("%.*s", msgData->payloadLen, msgData->payload);
  92. pubTestDataEventCount++;
  93. break;
  94. }
  95. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  96. {
  97. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  98. rlog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  99. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  100. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  101. break;
  102. }
  103. case RyanMqttEventReconnectBefore:
  104. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  105. rlog_i("重连前事件回调");
  106. break;
  107. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  108. {
  109. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  110. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  111. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  112. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  113. break;
  114. }
  115. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  116. {
  117. // 这里选择直接丢弃该消息
  118. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  119. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  120. RyanMqttDiscardAckHandler(client, ackHandler);
  121. break;
  122. }
  123. case RyanMqttEventAckHandlerdiscard:
  124. {
  125. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  126. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  127. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  128. break;
  129. }
  130. case RyanMqttEventDestoryBefore:
  131. rlog_i("销毁mqtt客户端前回调");
  132. break;
  133. default:
  134. break;
  135. }
  136. }
  137. /**
  138. * @brief mqtt msh命令
  139. *
  140. */
  141. struct RyanMqttCmdDes
  142. {
  143. const char *cmd;
  144. const char *explain;
  145. int (*fun)(int argc, char *argv[]);
  146. };
  147. static int MqttHelp(int argc, char *argv[]);
  148. /**
  149. * @brief 获取mqtt状态
  150. *
  151. * @param argc
  152. * @param argv
  153. * @return int
  154. */
  155. static int MqttState(int argc, char *argv[])
  156. {
  157. char *str = NULL;
  158. RyanMqttState_e clientState = RyanMqttGetState(client);
  159. switch (clientState)
  160. {
  161. case RyanMqttInvalidState:
  162. str = "无效状态";
  163. break;
  164. case RyanMqttInitState:
  165. str = "初始化状态";
  166. break;
  167. case RyanMqttStartState:
  168. str = "mqtt开始状态";
  169. break;
  170. case RyanMqttConnectState:
  171. str = "连接状态";
  172. break;
  173. case RyanMqttDisconnectState:
  174. str = "断开连接状态";
  175. break;
  176. case RyanMqttReconnectState:
  177. str = "重新连接状态";
  178. break;
  179. default:
  180. RyanMqttCheck(NULL, RyanMqttFailedError, rlog_d);
  181. break;
  182. }
  183. rlog_i("client state: %s", str);
  184. return 0;
  185. }
  186. /**
  187. * @brief mqtt 连接服务器
  188. *
  189. * @param argc
  190. * @param argv
  191. * @return int
  192. */
  193. static int MqttConnect(int argc, char *argv[])
  194. {
  195. if (RyanMqttConnectState == RyanMqttGetState(client))
  196. {
  197. rlog_w("mqtt客户端已经连接,请不要重复连接");
  198. return 0;
  199. }
  200. RyanMqttError_e result = RyanMqttSuccessError;
  201. RyanMqttClientConfig_t mqttConfig = {
  202. .clientId = RyanMqttClientId,
  203. .userName = RyanMqttUserName,
  204. .password = RyanMqttPassword,
  205. .host = RyanMqttHost,
  206. .port = RyanMqttPort,
  207. .taskName = "mqttThread",
  208. .taskPrio = 16,
  209. .taskStack = 2048,
  210. .mqttVersion = 4,
  211. .ackHandlerRepeatCountWarning = 6,
  212. .ackHandlerCountWarning = 20,
  213. .autoReconnectFlag = RyanMqttTrue,
  214. .cleanSessionFlag = RyanMqttFalse,
  215. .reconnectTimeout = 3000,
  216. .recvTimeout = 5000,
  217. .sendTimeout = 2000,
  218. .ackTimeout = 10000,
  219. .keepaliveTimeoutS = 120,
  220. .mqttEventHandle = mqttEventHandle,
  221. .userData = NULL};
  222. // 初始化mqtt客户端
  223. result = RyanMqttInit(&client);
  224. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  225. // 注册需要的事件回调
  226. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  227. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  228. // 设置mqtt客户端config
  229. result = RyanMqttSetConfig(client, &mqttConfig);
  230. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  231. // 设置遗嘱消息
  232. result = RyanMqttSetLwt(client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  233. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  234. // 启动mqtt客户端线程
  235. result = RyanMqttStart(client);
  236. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  237. return 0;
  238. }
  239. /**
  240. * @brief mqtt 重连函数,注意事项请看函数简介
  241. *
  242. * @param argc
  243. * @param argv
  244. * @return int
  245. */
  246. static int MqttReconnect(int argc, char *argv[])
  247. {
  248. RyanMqttReconnect(client);
  249. return 0;
  250. }
  251. /**
  252. * @brief mqtt销毁客户端
  253. *
  254. * @param argc
  255. * @param argv
  256. * @return int
  257. */
  258. static int MqttDestroy(int argc, char *argv[])
  259. {
  260. RyanMqttDestroy(client);
  261. client = NULL;
  262. return 0;
  263. }
  264. /**
  265. * @brief 断开mqtt客户端连接
  266. *
  267. * @param argc
  268. * @param argv
  269. * @return int
  270. */
  271. static int MqttDisconnect(int argc, char *argv[])
  272. {
  273. if (RyanMqttConnectState != RyanMqttGetState(client))
  274. {
  275. rlog_w("mqtt客户端没有连接");
  276. return 0;
  277. }
  278. RyanMqttDisconnect(client, RyanMqttTrue);
  279. return 0;
  280. }
  281. /**
  282. * @brief mqtt发布消息
  283. *
  284. * @param argc
  285. * @param argv
  286. * @return int
  287. */
  288. static int Mqttpublish(int argc, char *argv[])
  289. {
  290. if (argc < 7)
  291. {
  292. rlog_w("参数不完整! 请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  293. return 0;
  294. }
  295. if (RyanMqttConnectState != RyanMqttGetState(client))
  296. {
  297. rlog_w("mqtt客户端没有连接");
  298. return 0;
  299. }
  300. char *topic = argv[2];
  301. RyanMqttQos_e qos = atoi(argv[3]);
  302. char *payload = argv[4];
  303. uint16_t count = atoi(argv[5]);
  304. uint16_t delayTime = atoi(argv[6]);
  305. uint16_t pubCount = 0;
  306. rlog_i("qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  307. for (uint16_t i = 0; i < count; i++)
  308. {
  309. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, strlen(payload), qos, 0))
  310. pubCount++;
  311. delay(delayTime);
  312. }
  313. rlog_w("pubCount: %d", pubCount);
  314. return 0;
  315. }
  316. /**
  317. * @brief mqtt订阅主题,支持通配符
  318. *
  319. * @param argc
  320. * @param argv
  321. * @return int
  322. */
  323. static int Mqttsubscribe(int argc, char *argv[])
  324. {
  325. if (argc < 4)
  326. {
  327. rlog_w("参数不完整! 请输入 topic、 qos ");
  328. return 0;
  329. }
  330. if (RyanMqttConnectState != RyanMqttGetState(client))
  331. {
  332. rlog_w("mqtt客户端没有连接");
  333. return 0;
  334. }
  335. RyanMqttSubscribe(client, argv[2], atoi(argv[3]));
  336. return 0;
  337. }
  338. /**
  339. * @brief mqtt取消订阅主题
  340. *
  341. * @param argc
  342. * @param argv
  343. * @return int
  344. */
  345. static int MqttUnSubscribe(int argc, char *argv[])
  346. {
  347. if (argc < 3)
  348. {
  349. rlog_w("参数不完整! 请输入 取消订阅主题");
  350. return 0;
  351. }
  352. if (RyanMqttConnectState != RyanMqttGetState(client))
  353. {
  354. rlog_w("mqtt客户端没有连接");
  355. return 0;
  356. }
  357. RyanMqttUnSubscribe(client, argv[2]);
  358. return 0;
  359. }
  360. /**
  361. * @brief mqtt获取已订阅主题
  362. *
  363. * @param argc
  364. * @param argv
  365. * @return int
  366. */
  367. static int MqttListSubscribe(int argc, char *argv[])
  368. {
  369. if (RyanMqttConnectState != RyanMqttGetState(client))
  370. {
  371. rlog_w("mqtt客户端没有连接");
  372. return 0;
  373. }
  374. RyanMqttMsgHandler_t msgHandles[100] = {0};
  375. int32_t subscribeNum = 0;
  376. int32_t result = RyanMqttSuccessError;
  377. result = RyanMqttGetSubscribe(client, msgHandles, sizeof(msgHandles) / sizeof(msgHandles[0]), &subscribeNum);
  378. if (result == RyanMqttNoRescourceError)
  379. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", sizeof(msgHandles) / sizeof(msgHandles[0]));
  380. rlog_i("mqtt客户端已订阅的主题数: %d", subscribeNum);
  381. for (int32_t i = 0; i < subscribeNum; i++)
  382. rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  383. return 0;
  384. }
  385. /**
  386. * @brief 打印ack链表
  387. *
  388. * @param argc
  389. * @param argv
  390. * @return int
  391. */
  392. static int MqttListAck(int argc, char *argv[])
  393. {
  394. RyanList_t *curr = NULL,
  395. *next = NULL;
  396. RyanMqttAckHandler_t *ackHandler = NULL;
  397. if (RyanListIsEmpty(&client->ackHandlerList))
  398. {
  399. rlog_w("ack链表为空,没有等待ack的消息");
  400. return 0;
  401. }
  402. // 遍历链表
  403. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  404. {
  405. // 获取此节点的结构体
  406. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  407. // 发送qos1 / qos2消息服务器ack响应超时。需要重新发送它们。
  408. rlog_i(" type: %d, packetId is %d ", ackHandler->packetType, ackHandler->packetId);
  409. if (NULL != ackHandler->msgHandler)
  410. rlog_i("topic: %s, qos: %d", ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  411. }
  412. return 0;
  413. }
  414. /**
  415. * @brief 打印msg链表
  416. *
  417. * @param argc
  418. * @param argv
  419. * @return int
  420. */
  421. static int MqttListMsg(int argc, char *argv[])
  422. {
  423. RyanList_t *curr = NULL,
  424. *next = NULL;
  425. RyanMqttMsgHandler_t *msgHandler = NULL;
  426. if (RyanListIsEmpty(&client->msgHandlerList))
  427. {
  428. rlog_w("msg链表为空,没有等待的msg消息");
  429. return 0;
  430. }
  431. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  432. {
  433. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  434. rlog_i("topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  435. }
  436. return 0;
  437. }
  438. /**
  439. * @brief 打印接收到的数据计数
  440. *
  441. * @param argc
  442. * @param argv
  443. * @return int
  444. */
  445. static int Mqttdata(int argc, char *argv[])
  446. {
  447. // mqtt data
  448. if (argc == 3)
  449. {
  450. uint32_t num = atoi(argv[2]);
  451. if (num < sizeof(mqttTest) / sizeof(mqttTest[0]) - 1)
  452. mqttTest[num] = 0;
  453. else
  454. rlog_e("数组越界");
  455. }
  456. rlog_i("接收到数据次数统计: %d, qos1和qos2发布成功的次数统计: %d",
  457. pubTestDataEventCount, pubTestPublishedEventCount);
  458. return 0;
  459. }
  460. static const struct RyanMqttCmdDes cmdTab[] =
  461. {
  462. // {"help", "打印帮助信息 params: null", MqttHelp},
  463. // {"state", "打印mqtt客户端状态 params: null", MqttState},
  464. // {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  465. // {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  466. // {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  467. // {"destory", "mqtt销毁客户端 params: null", MqttDestroy},
  468. // {"pub", "mqtt发布消息 params: topic、qos、payload内容、发送条数、间隔时间(可以为0)", Mqttpublish},
  469. // {"sub", "mqtt订阅主题 params: topic、qos", Mqttsubscribe},
  470. // {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  471. // {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  472. // {"listack", "打印ack链表 params: null", MqttListAck},
  473. // {"listmsg", "打印msg链表 params: null", MqttListMsg},
  474. // {"data", "打印测试信息用户自定义的 params: null", Mqttdata},
  475. {"help", "打印帮助信息 params: null", MqttHelp},
  476. {"state", "打印mqtt客户端状态 params: null", MqttState},
  477. {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  478. {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  479. {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  480. {"destory", "mqtt销毁客户端 params: null", MqttDestroy},
  481. {"pub", "mqtt发布消息 params: topic、qos、payload内容、发送条数、间隔时间(可以为0)", Mqttpublish},
  482. {"sub", "mqtt订阅主题 params: topic、qos", Mqttsubscribe},
  483. {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  484. {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  485. {"listack", "打印ack链表 params: null", MqttListAck},
  486. {"listmsg", "打印msg链表 params: null", MqttListMsg},
  487. {"data", "打印测试信息用户自定义的 params: null", Mqttdata},
  488. };
  489. static int MqttHelp(int argc, char *argv[])
  490. {
  491. for (uint8_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  492. rlog_raw("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  493. return 0;
  494. }
  495. static int RyanMqttMsh(int argc, char *argv[])
  496. {
  497. int32_t i = 0,
  498. result = 0;
  499. const struct RyanMqttCmdDes *runCmd = NULL;
  500. if (argc == 1)
  501. {
  502. MqttHelp(argc, argv);
  503. return 0;
  504. }
  505. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  506. {
  507. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  508. {
  509. runCmd = &cmdTab[i];
  510. break;
  511. }
  512. }
  513. if (runCmd == NULL)
  514. {
  515. MqttHelp(argc, argv);
  516. return 0;
  517. }
  518. if (runCmd->fun != NULL)
  519. result = runCmd->fun(argc, argv);
  520. return result;
  521. }
  522. #if defined(RT_USING_MSH)
  523. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt command);
  524. #endif
  525. #endif