RyanMqttTest.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. #include "rtconfig.h"
  2. #define RyanMqttClientId ("RyanMqttTessdfwrt") // 填写mqtt客户端id,要求唯一
  3. #define RyanMqttHost ("broker.emqx.io") // 填写你的mqtt服务器ip
  4. #define RyanMqttPort ("1883") // mqtt服务器端口
  5. #define RyanMqttUserName ("") // 为空时填写""
  6. #define RyanMqttPassword ("") // 为空时填写""
  7. #ifdef PKG_USING_RYANMQTT_EXAMPLE
  8. #include <stdio.h>
  9. #include <stdint.h>
  10. #include <string.h>
  11. #include <stdlib.h>
  12. #include <board.h>
  13. #include <rtthread.h>
  14. #include <rtdevice.h>
  15. #include <rtdbg.h>
  16. #define rlogEnable 1 // 是否使能日志
  17. #define rlogColorEnable 1 // 是否使能日志颜色
  18. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  19. #define rlogTag "RyanMqttTest" // 日志tag
  20. #include "RyanMqttLog.h"
  21. #include "RyanMqttClient.h"
  22. #define delay(ms) rt_thread_mdelay(ms)
  23. static RyanMqttClient_t *client = NULL;
  24. static char mqttRecvBuffer[1024];
  25. static char mqttSendBuffer[1024];
  26. // 具体数值计算可以查看事件回调函数
  27. static uint32_t mqttTest[10] = {0};
  28. #define dataEventCount (0) // 接收到数据次数统计
  29. #define PublishedEventCount (1) // qos1和qos2发布成功的次数统计
  30. void printfArrStr(char *buf, uint32_t len, char *userData)
  31. {
  32. rlog_raw("%s", userData);
  33. for (uint32_t i = 0; i < len; i++)
  34. rlog_raw("%x", buf[i]);
  35. rlog_raw("\r\n");
  36. }
  37. /**
  38. * @brief mqtt事件回调处理函数
  39. * 事件的详细定义可以查看枚举定义
  40. *
  41. * @param pclient
  42. * @param event
  43. * @param eventData
  44. */
  45. void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
  46. {
  47. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  48. switch (event)
  49. {
  50. case RyanMqttEventError:
  51. break;
  52. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  53. rlog_i("mqtt连接成功回调");
  54. break;
  55. case RyanMqttEventDisconnected:
  56. rlog_w("mqtt断开连接回调 %d", *(RyanMqttConnectStatus_e *)eventData);
  57. break;
  58. case RyanMqttEventSubscribed:
  59. {
  60. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  61. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  62. break;
  63. }
  64. case RyanMqttEventSubscribedFaile:
  65. {
  66. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  67. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  68. break;
  69. }
  70. case RyanMqttEventUnSubscribed:
  71. {
  72. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  73. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  74. break;
  75. }
  76. case RyanMqttEventUnSubscribedFaile:
  77. {
  78. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  79. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  80. break;
  81. }
  82. case RyanMqttEventPublished:
  83. {
  84. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  85. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  86. mqttTest[PublishedEventCount]++;
  87. break;
  88. }
  89. case RyanMqttEventData:
  90. {
  91. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  92. rlog_i(" RyanMqtt topic recv callback! topic: %s, packetId: %d, payload len: %d",
  93. msgData->topic, msgData->packetId, msgData->payloadLen);
  94. rt_kprintf("%.*s\r\n", msgData->payloadLen, msgData->payload);
  95. mqttTest[dataEventCount]++;
  96. break;
  97. }
  98. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  99. {
  100. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  101. rlog_w("packetType: %d, packetId: %d, topic: %s, qos: %d",
  102. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  103. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  104. break;
  105. }
  106. case RyanMqttEventReconnectBefore:
  107. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  108. rlog_i("重连前事件回调");
  109. break;
  110. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  111. {
  112. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  113. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  114. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  115. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  116. break;
  117. }
  118. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  119. {
  120. // 这里选择直接丢弃该消息
  121. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  122. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  123. RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
  124. break;
  125. }
  126. case RyanMqttEventAckHandlerdiscard:
  127. {
  128. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  129. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  130. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  131. break;
  132. }
  133. case RyanMqttEventDestoryBefore:
  134. rlog_i("销毁mqtt客户端前回调");
  135. break;
  136. default:
  137. break;
  138. }
  139. }
  140. /**
  141. * @brief mqtt msh命令
  142. *
  143. */
  144. struct RyanMqttCmdDes
  145. {
  146. const char *cmd;
  147. const char *explain;
  148. int (*fun)(int argc, char *argv[]);
  149. };
  150. static int MqttHelp(int argc, char *argv[]);
  151. /**
  152. * @brief 获取mqtt状态
  153. *
  154. * @param argc
  155. * @param argv
  156. * @return int
  157. */
  158. static int MqttState(int argc, char *argv[])
  159. {
  160. char *str = NULL;
  161. RyanMqttState_e clientState = RyanMqttGetState(client);
  162. switch (clientState)
  163. {
  164. case RyanMqttInvalidState:
  165. str = "无效状态";
  166. break;
  167. case RyanMqttInitState:
  168. str = "初始化状态";
  169. break;
  170. case RyanMqttStartState:
  171. str = "mqtt开始状态";
  172. break;
  173. case RyanMqttConnectState:
  174. str = "连接状态";
  175. break;
  176. case RyanMqttDisconnectState:
  177. str = "断开连接状态";
  178. break;
  179. case RyanMqttReconnectState:
  180. str = "重新连接状态";
  181. break;
  182. default:
  183. RyanMqttCheck(NULL, RyanMqttFailedError, rlog_d);
  184. break;
  185. }
  186. rlog_i("client state: %s", str);
  187. return 0;
  188. }
  189. /**
  190. * @brief mqtt 连接服务器
  191. *
  192. * @param argc
  193. * @param argv
  194. * @return int
  195. */
  196. static int MqttConnect(int argc, char *argv[])
  197. {
  198. if (RyanMqttConnectState == RyanMqttGetState(client))
  199. {
  200. rlog_w("mqtt客户端没有连接");
  201. return 0;
  202. }
  203. RyanMqttError_e result = RyanMqttSuccessError;
  204. RyanMqttClientConfig_t mqttConfig = {
  205. .clientId = RyanMqttClientId,
  206. .userName = RyanMqttUserName,
  207. .password = RyanMqttPassword,
  208. .host = RyanMqttHost,
  209. .port = RyanMqttPort,
  210. .taskName = "mqttThread",
  211. .taskPrio = 16,
  212. .taskStack = 2048,
  213. .recvBufferSize = sizeof(mqttRecvBuffer),
  214. .sendBufferSize = sizeof(mqttSendBuffer),
  215. .recvBuffer = mqttRecvBuffer,
  216. .sendBuffer = mqttSendBuffer,
  217. .mqttVersion = 4,
  218. .ackHandlerRepeatCountWarning = 6,
  219. .ackHandlerCountWarning = 20,
  220. .autoReconnectFlag = RyanMqttTrue,
  221. .cleanSessionFlag = RyanMqttFalse,
  222. .reconnectTimeout = 3000,
  223. .recvTimeout = 5000,
  224. .sendTimeout = 2000,
  225. .ackTimeout = 10000,
  226. .keepaliveTimeoutS = 120,
  227. .mqttEventHandle = mqttEventHandle,
  228. .userData = NULL};
  229. // 初始化mqtt客户端
  230. result = RyanMqttInit(&client);
  231. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  232. // 注册需要的事件回调
  233. result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
  234. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  235. // 设置mqtt客户端config
  236. result = RyanMqttSetConfig(client, &mqttConfig);
  237. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  238. // 设置遗嘱消息
  239. result = RyanMqttSetLwt(client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  240. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  241. // 启动mqtt客户端线程
  242. result = RyanMqttStart(client);
  243. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  244. return 0;
  245. }
  246. /**
  247. * @brief mqtt 重连函数,注意事项请看函数简介
  248. *
  249. * @param argc
  250. * @param argv
  251. * @return int
  252. */
  253. static int MqttReconnect(int argc, char *argv[])
  254. {
  255. RyanMqttReconnect(client);
  256. return 0;
  257. }
  258. /**
  259. * @brief mqtt销毁客户端
  260. *
  261. * @param argc
  262. * @param argv
  263. * @return int
  264. */
  265. static int MqttDestroy(int argc, char *argv[])
  266. {
  267. RyanMqttDestroy(client);
  268. return 0;
  269. }
  270. /**
  271. * @brief 断开mqtt客户端连接
  272. *
  273. * @param argc
  274. * @param argv
  275. * @return int
  276. */
  277. static int MqttDisconnect(int argc, char *argv[])
  278. {
  279. if (RyanMqttConnectState != RyanMqttGetState(client))
  280. {
  281. rlog_w("mqtt客户端没有连接");
  282. return 0;
  283. }
  284. RyanMqttDisconnect(client, RyanMqttTrue);
  285. return 0;
  286. }
  287. /**
  288. * @brief mqtt发布消息
  289. *
  290. * @param argc
  291. * @param argv
  292. * @return int
  293. */
  294. static int Mqttpublish(int argc, char *argv[])
  295. {
  296. if (argc < 7)
  297. {
  298. rlog_w("参数不完整! 请输入 topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0) ");
  299. return 0;
  300. }
  301. if (RyanMqttConnectState != RyanMqttGetState(client))
  302. {
  303. rlog_w("mqtt客户端没有连接");
  304. return 0;
  305. }
  306. char *topic = argv[2];
  307. RyanMqttQos_e qos = atoi(argv[3]);
  308. char *payload = argv[4];
  309. uint16_t count = atoi(argv[5]);
  310. uint16_t delayTime = atoi(argv[6]);
  311. uint16_t pubCount = 0;
  312. rlog_i("qos: %d, count: %d, delayTime: %d, payload: %s", qos, count, delayTime, payload);
  313. for (uint16_t i = 0; i < count; i++)
  314. {
  315. if (RyanMqttSuccessError == RyanMqttPublish(client, topic, payload, strlen(payload), qos, 0))
  316. pubCount++;
  317. delay(delayTime);
  318. }
  319. delay(3000);
  320. rlog_w("pubCount: %d", pubCount);
  321. return 0;
  322. }
  323. /**
  324. * @brief mqtt订阅主题,支持通配符
  325. *
  326. * @param argc
  327. * @param argv
  328. * @return int
  329. */
  330. static int Mqttsubscribe(int argc, char *argv[])
  331. {
  332. if (argc < 4)
  333. {
  334. rlog_w("参数不完整! 请输入 topic、 qos ");
  335. return 0;
  336. }
  337. if (RyanMqttConnectState != RyanMqttGetState(client))
  338. {
  339. rlog_w("mqtt客户端没有连接");
  340. return 0;
  341. }
  342. RyanMqttSubscribe(client, argv[2], atoi(argv[3]));
  343. return 0;
  344. }
  345. /**
  346. * @brief mqtt取消订阅主题
  347. *
  348. * @param argc
  349. * @param argv
  350. * @return int
  351. */
  352. static int MqttUnSubscribe(int argc, char *argv[])
  353. {
  354. if (argc < 3)
  355. {
  356. rlog_w("参数不完整! 请输入 取消订阅主题");
  357. return 0;
  358. }
  359. if (RyanMqttConnectState != RyanMqttGetState(client))
  360. {
  361. rlog_w("mqtt客户端没有连接");
  362. return 0;
  363. }
  364. RyanMqttUnSubscribe(client, argv[2]);
  365. return 0;
  366. }
  367. /**
  368. * @brief mqtt获取已订阅主题
  369. *
  370. * @param argc
  371. * @param argv
  372. * @return int
  373. */
  374. static int MqttListSubscribe(int argc, char *argv[])
  375. {
  376. if (RyanMqttConnectState != RyanMqttGetState(client))
  377. {
  378. rlog_w("mqtt客户端没有连接");
  379. return 0;
  380. }
  381. RyanMqttMsgHandler_t msgHandles[100] = {0};
  382. int32_t subscribeNum = 0;
  383. int32_t result = RyanMqttSuccessError;
  384. result = RyanMqttGetSubscribe(client, msgHandles, sizeof(msgHandles) / sizeof(msgHandles[0]), &subscribeNum);
  385. if (result == RyanMqttNoRescourceError)
  386. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", sizeof(msgHandles) / sizeof(msgHandles[0]));
  387. rlog_i("mqtt客户端已订阅的主题数: %d", subscribeNum);
  388. for (int32_t i = 0; i < subscribeNum; i++)
  389. rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  390. return 0;
  391. }
  392. /**
  393. * @brief 打印ack链表
  394. *
  395. * @param argc
  396. * @param argv
  397. * @return int
  398. */
  399. static int MqttListAck(int argc, char *argv[])
  400. {
  401. RyanList_t *curr = NULL,
  402. *next = NULL;
  403. RyanMqttAckHandler_t *ackHandler = NULL;
  404. if (RyanListIsEmpty(&client->ackHandlerList))
  405. {
  406. rlog_w("ack链表为空,没有等待ack的消息");
  407. return 0;
  408. }
  409. // 遍历链表
  410. RyanListForEachSafe(curr, next, &client->ackHandlerList)
  411. {
  412. // 获取此节点的结构体
  413. ackHandler = RyanListEntry(curr, RyanMqttAckHandler_t, list);
  414. // 发送qos1 / qos2消息服务器ack响应超时。需要重新发送它们。
  415. rlog_i(" type: %d, packetId is %d ", ackHandler->packetType, ackHandler->packetId);
  416. if (NULL != ackHandler->msgHandler)
  417. rlog_i("topic: %s, qos: %d", ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  418. }
  419. return 0;
  420. }
  421. /**
  422. * @brief 打印msg链表
  423. *
  424. * @param argc
  425. * @param argv
  426. * @return int
  427. */
  428. static int MqttListMsg(int argc, char *argv[])
  429. {
  430. RyanList_t *curr = NULL,
  431. *next = NULL;
  432. RyanMqttMsgHandler_t *msgHandler = NULL;
  433. if (RyanListIsEmpty(&client->msgHandlerList))
  434. {
  435. rlog_w("msg链表为空,没有等待的msg消息");
  436. return 0;
  437. }
  438. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  439. {
  440. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  441. rlog_i("topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  442. }
  443. return 0;
  444. }
  445. /**
  446. * @brief 打印接收到的数据计数
  447. *
  448. * @param argc
  449. * @param argv
  450. * @return int
  451. */
  452. static int Mqttdata(int argc, char *argv[])
  453. {
  454. // mqtt data
  455. if (argc == 3)
  456. {
  457. uint32_t num = atoi(argv[2]);
  458. if (num < sizeof(mqttTest) / sizeof(mqttTest[0]) - 1)
  459. mqttTest[num] = 0;
  460. else
  461. rlog_e("数组越界");
  462. }
  463. rlog_i("接收到数据次数统计: %d, qos1和qos2发布成功的次数统计: %d",
  464. mqttTest[dataEventCount], mqttTest[PublishedEventCount]);
  465. return 0;
  466. }
  467. static const struct RyanMqttCmdDes cmdTab[] =
  468. {
  469. // {"help", "打印帮助信息 params: null", MqttHelp},
  470. // {"state", "打印mqtt客户端状态 params: null", MqttState},
  471. // {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  472. // {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  473. // {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  474. // {"destory", "mqtt销毁客户端 params: null", MqttDestroy},
  475. // {"pub", "mqtt发布消息 params: topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0)", Mqttpublish},
  476. // {"sub", "mqtt订阅主题 params: topic、 qos", Mqttsubscribe},
  477. // {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  478. // {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  479. // {"listack", "打印ack链表 params: null", MqttListAck},
  480. // {"listmsg", "打印msg链表 params: null", MqttListMsg},
  481. // {"data", "打印测试信息用户自定义的 params: null", Mqttdata},
  482. {"help", "打印帮助信息", MqttHelp},
  483. {"state", "打印mqtt客户端状态 params: null", MqttState},
  484. {"connect", "mqtt客户端连接服务器 params: null", MqttConnect},
  485. {"disc", "mqtt客户端断开连接 params: null", MqttDisconnect},
  486. {"reconnect", "mqtt断开连接时重新连接 params: null", MqttReconnect},
  487. {"destory", "mqtt销毁客户端 params: null", MqttDestroy},
  488. {"pub", "mqtt发布消息 params: topic、 qos、 payload内容、 发送条数、 间隔时间(可以为0)", Mqttpublish},
  489. {"sub", "mqtt订阅主题 params: topic、 qos", Mqttsubscribe},
  490. {"unsub", "mqtt取消订阅主题 params: 取消订阅主题", MqttUnSubscribe},
  491. {"listsub", "mqtt获取已订阅主题 params: null", MqttListSubscribe},
  492. {"listack", "打印ack链表 params: null", MqttListAck},
  493. {"listmsg", "打印msg链表 params: null", MqttListMsg},
  494. {"data", "打印测试信息用户自定义的 params: null", Mqttdata},
  495. };
  496. static int MqttHelp(int argc, char *argv[])
  497. {
  498. for (uint8_t i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  499. rlog_raw("mqtt %-16s %s\r\n", cmdTab[i].cmd, cmdTab[i].explain);
  500. return 0;
  501. }
  502. static int RyanMqttMsh(int argc, char *argv[])
  503. {
  504. int32_t i = 0,
  505. result = 0;
  506. const struct RyanMqttCmdDes *runCmd = NULL;
  507. if (argc == 1)
  508. {
  509. MqttHelp(argc, argv);
  510. return 0;
  511. }
  512. for (i = 0; i < sizeof(cmdTab) / sizeof(cmdTab[0]); i++)
  513. {
  514. if (rt_strcmp(cmdTab[i].cmd, argv[1]) == 0)
  515. {
  516. runCmd = &cmdTab[i];
  517. break;
  518. }
  519. }
  520. if (runCmd == NULL)
  521. {
  522. MqttHelp(argc, argv);
  523. return 0;
  524. }
  525. if (runCmd->fun != NULL)
  526. result = runCmd->fun(argc, argv);
  527. return result;
  528. }
  529. #if defined(RT_USING_MSH)
  530. MSH_CMD_EXPORT_ALIAS(RyanMqttMsh, mqtt, RyanMqtt command);
  531. #endif
  532. #endif