RyanMqttTest.c 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. #include "RyanMqttTest.h"
  2. static pthread_spinlock_t spin;
  3. /**
  4. * @brief mqtt事件回调处理函数
  5. * 事件的详细定义可以查看枚举定义
  6. *
  7. * @param pclient
  8. * @param event
  9. * @param eventData 查看事件枚举,后面有说明eventData是什么类型
  10. */
  11. void mqttEventBaseHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  12. {
  13. RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
  14. switch (event)
  15. {
  16. case RyanMqttEventError:
  17. break;
  18. case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
  19. rlog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
  20. break;
  21. case RyanMqttEventDisconnected:
  22. rlog_w("mqtt断开连接回调 %d", *(int32_t *)eventData);
  23. break;
  24. case RyanMqttEventSubscribed:
  25. {
  26. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  27. rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  28. break;
  29. }
  30. case RyanMqttEventSubscribedFaile:
  31. {
  32. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  33. rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  34. break;
  35. }
  36. case RyanMqttEventUnSubscribed:
  37. {
  38. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  39. rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  40. break;
  41. }
  42. case RyanMqttEventUnSubscribedFaile:
  43. {
  44. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  45. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  46. break;
  47. }
  48. case RyanMqttEventPublished:
  49. {
  50. RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
  51. rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  52. break;
  53. }
  54. case RyanMqttEventData:
  55. {
  56. RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
  57. rlog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
  58. msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
  59. rlog_i("%.*s", msgData->payloadLen, msgData->payload);
  60. break;
  61. }
  62. case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
  63. {
  64. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  65. rlog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
  66. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  67. printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
  68. break;
  69. }
  70. case RyanMqttEventReconnectBefore:
  71. // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
  72. rlog_i("重连前事件回调");
  73. break;
  74. case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
  75. {
  76. // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
  77. // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
  78. uint16_t ackHandlerCount = *(uint16_t *)eventData;
  79. rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
  80. break;
  81. }
  82. case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
  83. {
  84. // 这里选择直接丢弃该消息
  85. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  86. rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  87. RyanMqttDiscardAckHandler(client, ackHandler);
  88. break;
  89. }
  90. case RyanMqttEventAckHandlerdiscard:
  91. {
  92. RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
  93. rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
  94. ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
  95. break;
  96. }
  97. case RyanMqttEventDestoryBefore:
  98. rlog_i("销毁mqtt客户端前回调");
  99. if (client->config.userData)
  100. sem_post((sem_t *)client->config.userData);
  101. break;
  102. default:
  103. break;
  104. }
  105. }
  106. RyanMqttError_e RyanMqttInitSync(RyanMqttClient_t **client, RyanMqttBool_e syncFlag, RyanMqttEventHandle mqttEventCallback)
  107. {
  108. // 手动避免count的资源竞争了
  109. static uint32_t count = 0;
  110. char aaa[64];
  111. pthread_spin_lock(&spin);
  112. count++;
  113. pthread_spin_unlock(&spin);
  114. snprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
  115. sem_t *sem = NULL;
  116. if (syncFlag == RyanMqttTrue)
  117. {
  118. sem = (sem_t *)malloc(sizeof(sem_t));
  119. sem_init(sem, 0, 0);
  120. }
  121. RyanMqttError_e result = RyanMqttSuccessError;
  122. RyanMqttClientConfig_t mqttConfig = {
  123. .clientId = aaa,
  124. .userName = RyanMqttUserName,
  125. .password = RyanMqttPassword,
  126. .host = RyanMqttHost,
  127. .port = RyanMqttPort,
  128. .taskName = "mqttThread",
  129. .taskPrio = 16,
  130. .taskStack = 4096,
  131. .mqttVersion = 4,
  132. .ackHandlerRepeatCountWarning = 6,
  133. .ackHandlerCountWarning = 20,
  134. .autoReconnectFlag = RyanMqttTrue,
  135. .cleanSessionFlag = RyanMqttTrue,
  136. .reconnectTimeout = 3000,
  137. .recvTimeout = 3000,
  138. .sendTimeout = 2000,
  139. .ackTimeout = 10000,
  140. .keepaliveTimeoutS = 120,
  141. .mqttEventHandle = mqttEventCallback ? mqttEventCallback : mqttEventBaseHandle,
  142. .userData = sem};
  143. // 初始化mqtt客户端
  144. result = RyanMqttInit(client);
  145. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  146. // 注册需要的事件回调
  147. result = RyanMqttRegisterEventId(*client, RyanMqttEventAnyId);
  148. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  149. // 设置mqtt客户端config
  150. result = RyanMqttSetConfig(*client, &mqttConfig);
  151. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  152. // // 设置遗嘱消息
  153. // result = RyanMqttSetLwt(*client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
  154. // RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  155. // 启动mqtt客户端线程
  156. result = RyanMqttStart(*client);
  157. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_e);
  158. while (RyanMqttConnectState != RyanMqttGetState(*client))
  159. {
  160. delay(100);
  161. }
  162. return RyanMqttSuccessError;
  163. }
  164. RyanMqttError_e RyanMqttDestorySync(RyanMqttClient_t *client)
  165. {
  166. sem_t *sem = (sem_t *)client->config.userData;
  167. // 启动mqtt客户端线程
  168. RyanMqttDestroy(client);
  169. sem_wait(sem);
  170. sem_destroy(sem);
  171. free(sem);
  172. delay(3);
  173. return RyanMqttSuccessError;
  174. }
  175. RyanMqttError_e checkAckList(RyanMqttClient_t *client)
  176. {
  177. rlog_w("等待检查ack链表,等待 recvTime: %d", client->config.recvTimeout);
  178. delay(client->config.recvTimeout + 500);
  179. if (!RyanListIsEmpty(&client->ackHandlerList))
  180. {
  181. rlog_e("mqtt空间 ack链表不为空");
  182. return RyanMqttFailedError;
  183. }
  184. if (!RyanListIsEmpty(&client->userAckHandlerList))
  185. {
  186. rlog_e("用户空间 ack链表不为空");
  187. return RyanMqttFailedError;
  188. }
  189. if (!RyanListIsEmpty(&client->msgHandlerList))
  190. {
  191. rlog_e("mqtt空间 msg链表不为空");
  192. return RyanMqttFailedError;
  193. }
  194. return RyanMqttSuccessError;
  195. }
  196. void printfArrStr(uint8_t *buf, uint32_t len, char *userData)
  197. {
  198. rlog_raw("%s", userData);
  199. for (uint32_t i = 0; i < len; i++)
  200. rlog_raw("%x", buf[i]);
  201. rlog_raw("\r\n");
  202. }
  203. // !当测试程序出错时,并不会回收内存。交由父进程进行回收
  204. int main()
  205. {
  206. RyanMqttError_e result = RyanMqttSuccessError;
  207. vallocInit();
  208. pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
  209. result = RyanMqttSubTest();
  210. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  211. result = RyanMqttPubTest();
  212. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  213. result = RyanMqttDestoryTest();
  214. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  215. result = RyanMqttReconnectTest();
  216. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  217. result = RyanMqttKeepAliveTest();
  218. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  219. __exit:
  220. pthread_spin_destroy(&spin);
  221. while (1)
  222. {
  223. displayMem();
  224. delay(10 * 1000);
  225. }
  226. return 0;
  227. }