mqtt_api.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /*
  2. * Copyright (c) 2006-2021, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2025-06-03 RV the first version
  9. */
  10. #define DBG_TAG "MQTT"
  11. #define DBG_LVL DBG_LOG
  12. #include "mqtt_api.h"
  13. static MQTTFixedBuffer_t mqttBuffer = { .pBuffer = RT_NULL, .size = MQTT_BUF_SIZE };
  14. static MQTTContext_t mqttContext;
  15. static TransportInterface_t transportInterface;
  16. static NetworkContext_t networkContext;
  17. static MQTTPubAckInfo_t outgoingPublishes[MQTT_OUTGOING_PUBLISH_COUNT];
  18. MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t userCallback)
  19. {
  20. MQTTStatus_t status;
  21. transportInterface.pNetworkContext = networkContext;
  22. transportInterface.send = transportSend;
  23. transportInterface.recv = transportRecv;
  24. mqttBuffer.pBuffer = rt_malloc(mqttBuffer.size);
  25. if (mqttBuffer.pBuffer == RT_NULL)
  26. {
  27. MQTT_PRINT("Failed to allocate MQTT buffer\n");
  28. return MQTTNoMemory;
  29. }
  30. status = MQTT_Init(&mqttContext, &transportInterface, getCurrentTime, userCallback, &mqttBuffer);
  31. if (status != MQTTSuccess)
  32. {
  33. MQTT_PRINT("MQTT_Init failed: %d\n", status);
  34. rt_free(mqttBuffer.pBuffer);
  35. return status;
  36. }
  37. else
  38. {
  39. status = MQTT_InitStatefulQoS(&mqttContext, outgoingPublishes, MQTT_OUTGOING_PUBLISH_COUNT, NULL, 0);
  40. MQTT_PRINT("MQTT client initialized successfully\n");
  41. }
  42. return MQTTSuccess;
  43. }
  44. MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
  45. {
  46. MQTTStatus_t status;
  47. MQTTConnectInfo_t connectInfo = { 0 };
  48. bool sessionPresent;
  49. /* Configure connection information */
  50. connectInfo.clientIdentifierLength = strlen(MQTT_CLIENT_ID);
  51. connectInfo.pClientIdentifier = MQTT_CLIENT_ID;
  52. connectInfo.keepAliveSeconds = MQTT_KEEP_ALIVE;
  53. connectInfo.cleanSession = true;
  54. /* Establish TCP connection */
  55. networkContext->socket = socket(AF_INET, SOCK_STREAM, 0);
  56. if (networkContext->socket < 0)
  57. {
  58. MQTT_PRINT("Failed to create socket\n");
  59. return MQTTSendFailed;
  60. }
  61. struct sockaddr_in serverAddr = { 0 };
  62. serverAddr.sin_family = AF_INET;
  63. serverAddr.sin_port = htons(MQTT_BROKER_PORT);
  64. struct hostent *host = gethostbyname(MQTT_BROKER_ADDRESS);
  65. if (host == NULL || host->h_addr_list[0] == NULL)
  66. {
  67. MQTT_PRINT("Failed to resolve broker address\n");
  68. closesocket(networkContext->socket);
  69. return MQTTSendFailed;
  70. }
  71. serverAddr.sin_addr.s_addr = *(uint32_t *) host->h_addr_list[0];
  72. if (connect(networkContext->socket, (struct sockaddr *) &serverAddr, sizeof(serverAddr)) < 0)
  73. {
  74. MQTT_PRINT("Failed to connect to broker\n");
  75. closesocket(networkContext->socket);
  76. return MQTTSendFailed;
  77. }
  78. /* MQTT connection */
  79. status = MQTT_Connect(&mqttContext, &connectInfo, NULL, 10000, &sessionPresent);
  80. if ((status != MQTTSuccess) && (status != MQTTStatusConnected))
  81. {
  82. MQTT_PRINT("MQTT_Connect failed: %d\n", status);
  83. closesocket(networkContext->socket);
  84. return status;
  85. }
  86. rt_kprintf("[%d] MQTT broker connected\n", getCurrentTime());
  87. return MQTTSuccess;
  88. }
  89. MQTTStatus_t mqttSubscribe(MQTTSubscribeInfo_t *subscribeInfo)
  90. {
  91. MQTTStatus_t status;
  92. uint16_t packetId = MQTT_GetPacketId(&mqttContext);
  93. status = MQTT_Subscribe(&mqttContext, subscribeInfo, 1, packetId);
  94. if (status != MQTTSuccess)
  95. {
  96. MQTT_PRINT("MQTT_Subscribe failed: %d\n", status);
  97. return status;
  98. }
  99. MQTT_PRINT("Subscribed to topic: %s\n", MQTT_TOPIC_SUB);
  100. return MQTTSuccess;
  101. }
  102. MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo)
  103. {
  104. MQTTStatus_t status;
  105. uint16_t packetId = MQTT_GetPacketId(&mqttContext);
  106. status = MQTT_Publish(&mqttContext, publishInfo, packetId);
  107. if (status != MQTTSuccess)
  108. {
  109. MQTT_PRINT("MQTT_Publish failed: %d\n", status);
  110. return status;
  111. }
  112. MQTT_PRINT("Published message: %s\n", publishInfo->pPayload);
  113. return MQTTSuccess;
  114. }
  115. const char *mqttStatus(MQTTStatus_t status)
  116. {
  117. const char *const statusStrings[] = {
  118. "Success",
  119. "BadParameter",
  120. "NoMemory",
  121. "SendFailed",
  122. "RecvFailed",
  123. "BadResponse",
  124. "ServerRefused",
  125. "NoDataAvailable",
  126. "IllegalState",
  127. "StateCollision",
  128. "KeepAliveTimeout",
  129. "NeedMoreBytes",
  130. "Connected",
  131. "NotConnected",
  132. "DisconnectPending",
  133. "PublishStoreFailed",
  134. "PublishRetrieveFailed"
  135. };
  136. if (status >= 0 && status < sizeof(statusStrings) / sizeof(statusStrings[0]))
  137. {
  138. return statusStrings[status];
  139. }
  140. return "Unknown";
  141. }
  142. static void mqttEventCallback(MQTTContext_t *pContext, MQTTPacketInfo_t *pPacketInfo,
  143. MQTTDeserializedInfo_t *pDeserializedInfo)
  144. {
  145. if (!pContext || !pPacketInfo)
  146. {
  147. rt_kprintf("Error: Invalid context or packet info\n");
  148. return;
  149. }
  150. switch (pPacketInfo->type)
  151. {
  152. case MQTT_PACKET_TYPE_PUBLISH:
  153. {
  154. if (!pDeserializedInfo || !pDeserializedInfo->pPublishInfo)
  155. {
  156. rt_kprintf("Error: Invalid publish info\n");
  157. return;
  158. }
  159. MQTTPublishInfo_t *pPublishInfo = pDeserializedInfo->pPublishInfo;
  160. rt_kprintf("Received message on topic '%.*s': %.*s\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName,
  161. pPublishInfo->payloadLength, (const char *) pPublishInfo->pPayload);
  162. break;
  163. }
  164. case MQTT_PACKET_TYPE_SUBACK:
  165. rt_kprintf("Subscription ACK\n");
  166. break;
  167. case MQTT_PACKET_TYPE_PUBACK:
  168. rt_kprintf("Publish ACK\n"); // QoS0 messages do not trigger this callback.
  169. break;
  170. default:
  171. break;
  172. }
  173. }
  174. void mqttClientTask(void *parameter)
  175. {
  176. MQTTStatus_t status;
  177. uint32_t retryCount = 0;
  178. uint32_t backoffMs = INITIAL_BACKOFF_MS;
  179. bool isConnected = false;
  180. if (mqttInit(&networkContext, MQTT_USER_CALLBACK) != MQTTSuccess)
  181. {
  182. MQTT_PRINT("MQTT initialization failed\n");
  183. return;
  184. }
  185. while (1)
  186. {
  187. if (networkContext.socket >= 0)
  188. {
  189. closesocket(networkContext.socket);
  190. networkContext.socket = -1;
  191. }
  192. status = mqttConnect(&networkContext);
  193. if (status != MQTTSuccess)
  194. {
  195. MQTT_PRINT("Connection failed: %d (%s), retrying in %d ms\n", status, mqttStatus(status), backoffMs);
  196. if (retryCount++ >= MAX_RETRY_ATTEMPTS)
  197. {
  198. MQTT_PRINT("Maximum retry attempts reached, resetting retry count after 60s\n");
  199. rt_thread_mdelay(30000);
  200. retryCount = 0;
  201. backoffMs = INITIAL_BACKOFF_MS;
  202. }
  203. else
  204. {
  205. rt_thread_mdelay(backoffMs);
  206. backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
  207. }
  208. continue;
  209. }
  210. isConnected = true;
  211. retryCount = 0;
  212. backoffMs = INITIAL_BACKOFF_MS;
  213. while (1)
  214. {
  215. status = MQTT_ProcessLoop(&mqttContext);
  216. if (status != MQTTSuccess && status != MQTTNeedMoreBytes)
  217. {
  218. MQTT_PRINT("MQTT_ProcessLoop failed: %d (%s)\n", status, mqttStatus(status));
  219. status = MQTT_Disconnect(&mqttContext);
  220. break;
  221. }
  222. rt_thread_mdelay(MQTT_LOOP_CNT);
  223. }
  224. if (isConnected && networkContext.socket >= 0)
  225. {
  226. status = MQTT_Disconnect(&mqttContext);
  227. if (status != MQTTSuccess)
  228. {
  229. MQTT_PRINT("MQTT_Disconnect failed: %d (%s)\n", status, mqttStatus(status));
  230. }
  231. isConnected = false;
  232. }
  233. if (networkContext.socket >= 0)
  234. {
  235. closesocket(networkContext.socket);
  236. networkContext.socket = -1;
  237. }
  238. MQTT_PRINT("MQTT connection lost, preparing to reconnect in %d ms\n", backoffMs);
  239. rt_thread_mdelay(backoffMs);
  240. backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
  241. }
  242. if (mqttBuffer.pBuffer != RT_NULL)
  243. {
  244. rt_free(mqttBuffer.pBuffer);
  245. mqttBuffer.pBuffer = RT_NULL;
  246. }
  247. MQTT_PRINT("MQTT client exited\n");
  248. }