Bladeren bron

[fix] 将消息发布与订阅从掉线重连分离,优化LOG

Yaochenger 7 maanden geleden
bovenliggende
commit
776517d360
5 gewijzigde bestanden met toevoegingen van 134 en 95 verwijderingen
  1. 77 78
      example/demo.c
  2. 1 1
      port/config.h
  3. 45 14
      port/mqtt_usr_api.c
  4. 10 1
      port/mqtt_usr_api.h
  5. 1 1
      port/port.h

+ 77 - 78
example/demo.c

@@ -6,9 +6,13 @@
 #include <string.h>
 #include <stdlib.h>
 #include "port.h"
-#include <sys/select.h>  // 添加select相关定义
-#include <unistd.h>      // 添加close等系统调用定义
+#include <sys/select.h>
+#include <unistd.h>
 #include "port.h"
+
+#define DBG_TAG "MQTT"
+#define DBG_LVL DBG_LOG
+
 #include "mqtt_usr_api.h"
 
 static NetworkContext_t networkContext;
@@ -45,7 +49,7 @@ static void mqttEventCallback(MQTTContext_t *pContext, MQTTPacketInfo_t *pPacket
             rt_kprintf("Publish ACK\n"); // QoS0 messages do not trigger this callback.
             break;
         default:
-            rt_kprintf("Unhandled packet type: %d\n", pPacketInfo->type);
+            rt_kprintf("Other packet type\n");
             break;
     }
 }
@@ -55,13 +59,11 @@ static void mqttClientTask(void *parameter)
     MQTTStatus_t status;
     uint32_t retryCount = 0;
     uint32_t backoffMs = INITIAL_BACKOFF_MS;
-    uint32_t lastPublishTime = 0;
-    const uint32_t publishIntervalMs = 1000;
     bool isConnected = false;
 
     if (mqttInit(&networkContext, mqttEventCallback) != MQTTSuccess)
     {
-        rt_kprintf("MQTT initialization failed\n");
+        MQTT_PRINT("MQTT initialization failed\n");
         return;
     }
 
@@ -76,12 +78,11 @@ static void mqttClientTask(void *parameter)
         status = mqttConnect(&networkContext);
         if (status != MQTTSuccess)
         {
-            rt_kprintf("Connection failed: %d (%s), retrying in %d ms\n", status,
-                       status == MQTTServerRefused ? "Server refused" : "Other error", backoffMs);
+            MQTT_PRINT("Connection failed: %d (%s), retrying in %d ms\n", status, mqttStatus(status), backoffMs);
             if (retryCount++ >= MAX_RETRY_ATTEMPTS)
             {
-                rt_kprintf("Maximum retry attempts reached, resetting retry count after 60s\n");
-                rt_thread_mdelay(60000);
+                MQTT_PRINT("Maximum retry attempts reached, resetting retry count after 60s\n");
+                rt_thread_mdelay(30000);
                 retryCount = 0;
                 backoffMs = INITIAL_BACKOFF_MS;
             }
@@ -96,74 +97,19 @@ static void mqttClientTask(void *parameter)
         isConnected = true;
         retryCount = 0;
         backoffMs = INITIAL_BACKOFF_MS;
-        lastPublishTime = getCurrentTime();
-        rt_kprintf("Successfully connected to MQTT broker\n");
-
-        MQTTSubscribeInfo_t subscribeInfo = {
-            .qos = MQTTQoS0,
-            .pTopicFilter = MQTT_TOPIC_SUB,
-            .topicFilterLength = strlen(MQTT_TOPIC_SUB)
-        };
-
-        if (mqttSubscribe(&subscribeInfo) != MQTTSuccess)
-        {
-            rt_kprintf("Subscription failed\n");
-            if (isConnected && networkContext.socket >= 0)
-            {
-                status = MQTT_Disconnect(&mqttContext);
-                if (status != MQTTSuccess)
-                {
-                    rt_kprintf("MQTT_Disconnect failed: %d\n", status);
-                }
-                isConnected = false;
-            }
-            if (networkContext.socket >= 0)
-            {
-                closesocket(networkContext.socket);
-                networkContext.socket = -1;
-            }
-            continue;
-        }
 
         while (1)
         {
-            uint32_t currentTime = getCurrentTime();
-
             if (isSocketReadable(networkContext.socket, 100))
             {
                 status = MQTT_ProcessLoop(&mqttContext);
                 if (status != MQTTSuccess)
                 {
-                    rt_kprintf("MQTT_ProcessLoop failed: %d (%s)\n", status,
-                               status == MQTTRecvFailed ? "Receive failed" :
-                               status == MQTTBadResponse ? "Bad response" : "Other error");
-                    break;
-                }
-            }
-
-            if (currentTime - lastPublishTime >= publishIntervalMs)
-            {
-                static int counter = 0;
-                char payload[64];
-                rt_sprintf(payload, "[%d]Message from RT-Thread: Hello World", counter++);
-
-                MQTTPublishInfo_t publishInfo = {
-                    .qos = MQTTQoS0,
-                    .pTopicName = MQTT_TOPIC_PUB,
-                    .topicNameLength = strlen(MQTT_TOPIC_PUB),
-                    .pPayload = payload,
-                    .payloadLength = strlen(payload)
-                };
-
-                if (mqttPublish(&publishInfo) != MQTTSuccess)
-                {
-                    rt_kprintf("Publish failed, preparing to reconnect\n");
+                    MQTT_PRINT("MQTT_ProcessLoop failed: %d (%s)\n", status, mqttStatus(status));
                     break;
                 }
-                lastPublishTime = currentTime;
             }
-
-            rt_thread_mdelay(50);
+            rt_thread_mdelay(MQTT_LOOP_CNT);
         }
 
         if (isConnected && networkContext.socket >= 0)
@@ -171,8 +117,7 @@ static void mqttClientTask(void *parameter)
             status = MQTT_Disconnect(&mqttContext);
             if (status != MQTTSuccess)
             {
-                rt_kprintf("MQTT_Disconnect failed: %d (%s)\n", status,
-                           status == MQTTStatusNotConnected ? "Not connected" : "Other error");
+                MQTT_PRINT("MQTT_Disconnect failed: %d (%s)\n", status, mqttStatus(status));
             }
             isConnected = false;
         }
@@ -183,7 +128,7 @@ static void mqttClientTask(void *parameter)
             networkContext.socket = -1;
         }
 
-        rt_kprintf("MQTT connection lost, preparing to reconnect in %d ms\n", backoffMs);
+        MQTT_PRINT("MQTT connection lost, preparing to reconnect in %d ms\n", backoffMs);
         rt_thread_mdelay(backoffMs);
         backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
     }
@@ -193,29 +138,83 @@ static void mqttClientTask(void *parameter)
         rt_free(mqttBuffer.pBuffer);
         mqttBuffer.pBuffer = RT_NULL;
     }
-    rt_kprintf("MQTT client exited\n");
+    MQTT_PRINT("MQTT client exited\n");
 }
+
 #include <wlan_mgnt.h>
 void mqtt_client_start(void)
 {
     rt_wlan_unregister_event_handler(RT_WLAN_EVT_READY);
 
     rt_thread_t tid = rt_thread_create("mqtt", mqttClientTask,
-                                               RT_NULL, 4096,
-                                               10,
-                                               20);
+    RT_NULL, 4096, 10, 20);
     if (tid != RT_NULL)
     {
         rt_thread_startup(tid);
-        rt_kprintf("MQTT client thread started\n");
+        MQTT_PRINT("MQTT client thread started\n");
     }
     else
     {
-        rt_kprintf("Failed to create MQTT client thread\n");
+        MQTT_PRINT("Failed to create MQTT client thread\n");
+    }
+}
+
+static int mqtt_pub(int argc, char **argv)
+{
+    MQTTStatus_t status;
+    MQTTPublishInfo_t publishInfo;
+
+    if (argc != 2)
+    {
+        rt_kprintf("Usage: mqtt_pub <message>\n");
+        return -RT_ERROR;
     }
+
+    publishInfo.qos = MQTTQoS0;
+    publishInfo.pTopicName = MQTT_TOPIC_PUB;
+    publishInfo.topicNameLength = strlen(MQTT_TOPIC_PUB);
+    publishInfo.pPayload = argv[1];
+    publishInfo.payloadLength = strlen(argv[1]);
+
+    status = mqttPublish(&publishInfo);
+    if (status != MQTTSuccess)
+    {
+        rt_kprintf("MQTT publish failed: %d (%s)\n", status, mqttStatus(status));
+        return -RT_ERROR;
+    }
+
+    rt_kprintf("Published message: %s to topic: %s\n", argv[1], MQTT_TOPIC_PUB);
+    return RT_EOK;
 }
+#ifdef RT_USING_FINSH
+MSH_CMD_EXPORT_ALIAS(mqtt_pub, mqtt_pub, Send MQTT message);
+#endif
+
+static int mqtt_sub(int argc, char **argv)
+{
+    MQTTStatus_t status;
+    MQTTSubscribeInfo_t subscribeInfo;
+
+    if (argc != 2)
+    {
+        rt_kprintf("Usage: mqtt_sub <topic>\n");
+        return -RT_ERROR;
+    }
+
+    subscribeInfo.qos = MQTTQoS0;
+    subscribeInfo.pTopicFilter = argv[1];
+    subscribeInfo.topicFilterLength = strlen(argv[1]);
+
+    status = mqttSubscribe(&subscribeInfo);
+    if (status != MQTTSuccess)
+    {
+        rt_kprintf("MQTT subscribe failed: %d (%s)\n", status, mqttStatus(status));
+        return -RT_ERROR;
+    }
 
+    rt_kprintf("Subscribed to topic: %s\n", argv[1]);
+    return RT_EOK;
+}
 #ifdef RT_USING_FINSH
-#include <finsh.h>
-FINSH_FUNCTION_EXPORT(mqtt_client_start, 启动 MQTT 客户端);
+MSH_CMD_EXPORT_ALIAS(mqtt_sub, mqtt_sub, Subscribe MQTT message);
 #endif

+ 1 - 1
port/config.h

@@ -5,7 +5,7 @@
  *
  * Change Logs:
  * Date           Author       Notes
- * 2025-06-10     RTT       the first version
+ * 2025-06-10     RV          the first version
  */
 #ifndef APPLICATIONS_FIREMQTT_PORT_CONFIG_H_
 #define APPLICATIONS_FIREMQTT_PORT_CONFIG_H_

+ 45 - 14
port/mqtt_usr_api.c

@@ -5,9 +5,12 @@
  *
  * Change Logs:
  * Date           Author       Notes
- * 2025-06-03     RTT       the first version
+ * 2025-06-03     RV          the first version
  */
 
+#define DBG_TAG "MQTT"
+#define DBG_LVL DBG_LOG
+
 #include "mqtt_usr_api.h"
 
 MQTTFixedBuffer_t mqttBuffer = { .pBuffer = RT_NULL, .size = 1024 };
@@ -25,19 +28,19 @@ MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t user
     mqttBuffer.pBuffer = rt_malloc(mqttBuffer.size); // 缓存
     if (mqttBuffer.pBuffer == RT_NULL)
     {
-        rt_kprintf("Failed to allocate MQTT buffer\n");
+        MQTT_PRINT("Failed to allocate MQTT buffer\n");
         return MQTTNoMemory;
     }
 
     status = MQTT_Init(&mqttContext, &transportInterface, getCurrentTime, userCallback, &mqttBuffer);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Init failed: %d\n", status);
+        MQTT_PRINT("MQTT_Init failed: %d\n", status);
         rt_free(mqttBuffer.pBuffer);
         return status;
     }
 
-    rt_kprintf("MQTT client initialized successfully\n");
+    MQTT_PRINT("MQTT client initialized successfully\n");
     return MQTTSuccess;
 }
 
@@ -57,17 +60,16 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
     networkContext->socket = socket(AF_INET, SOCK_STREAM, 0);
     if (networkContext->socket < 0)
     {
-        rt_kprintf("Failed to create socket\n");
+        MQTT_PRINT("Failed to create socket\n");
         return MQTTSendFailed;
     }
-
     struct sockaddr_in serverAddr = { 0 };
     serverAddr.sin_family = AF_INET;
     serverAddr.sin_port = htons(MQTT_BROKER_PORT);
     struct hostent *host = gethostbyname(MQTT_BROKER_ADDRESS);
     if (host == NULL || host->h_addr_list[0] == NULL)
     {
-        rt_kprintf("Failed to resolve broker address\n");
+        MQTT_PRINT("Failed to resolve broker address\n");
         closesocket(networkContext->socket);
         return MQTTSendFailed;
     }
@@ -75,7 +77,7 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
 
     if (connect(networkContext->socket, (struct sockaddr *) &serverAddr, sizeof(serverAddr)) < 0)
     {
-        rt_kprintf("Failed to connect to broker\n");
+        MQTT_PRINT("Failed to connect to broker\n");
         closesocket(networkContext->socket);
         return MQTTSendFailed;
     }
@@ -84,12 +86,12 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
     status = MQTT_Connect(&mqttContext, &connectInfo, NULL, 10000, &sessionPresent);
     if ((status != MQTTSuccess) && (status != MQTTStatusConnected))
     {
-        rt_kprintf("MQTT_Connect failed: %d\n", status);
+        MQTT_PRINT("MQTT_Connect failed: %d\n", status);
         closesocket(networkContext->socket);
         return status;
     }
 
-    rt_kprintf("Successfully connected to MQTT broker\n");
+    rt_kprintf("MQTT broker connected\n");
     return MQTTSuccess;
 }
 
@@ -101,11 +103,11 @@ MQTTStatus_t mqttSubscribe(MQTTSubscribeInfo_t *subscribeInfo)
     status = MQTT_Subscribe(&mqttContext, subscribeInfo, 1, packetId);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Subscribe failed: %d\n", status);
+        MQTT_PRINT("MQTT_Subscribe failed: %d\n", status);
         return status;
     }
 
-    rt_kprintf("Subscribed to topic: %s\n", MQTT_TOPIC_SUB);
+    MQTT_PRINT("Subscribed to topic: %s\n", MQTT_TOPIC_SUB);
     return MQTTSuccess;
 }
 
@@ -117,11 +119,11 @@ MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo)
     status = MQTT_Publish(&mqttContext, publishInfo, packetId);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Publish failed: %d\n", status);
+        MQTT_PRINT("MQTT_Publish failed: %d\n", status);
         return status;
     }
 
-    rt_kprintf("Published message: %s\n", publishInfo->pPayload);
+    MQTT_PRINT("Published message: %s\n", publishInfo->pPayload);
     return MQTTSuccess;
 }
 
@@ -139,3 +141,32 @@ bool isSocketReadable(int socket, int timeout_ms)
     int result = select(socket + 1, &readfds, NULL, NULL, &timeout);
     return (result > 0 && FD_ISSET(socket, &readfds));
 }
+
+const char *mqttStatus(MQTTStatus_t status)
+{
+    static const char *const statusStrings[] = {
+        "Success",
+        "BadParameter",
+        "NoMemory",
+        "SendFailed",
+        "RecvFailed",
+        "BadResponse",
+        "ServerRefused",
+        "NoDataAvailable",
+        "IllegalState",
+        "StateCollision",
+        "KeepAliveTimeout",
+        "NeedMoreBytes",
+        "Connected",
+        "NotConnected",
+        "DisconnectPending",
+        "PublishStoreFailed",
+        "PublishRetrieveFailed"
+    };
+
+    if (status >= 0 && status < sizeof(statusStrings) / sizeof(statusStrings[0]))
+    {
+        return statusStrings[status];
+    }
+    return "Unknown";
+}

+ 10 - 1
port/mqtt_usr_api.h

@@ -5,7 +5,7 @@
  *
  * Change Logs:
  * Date           Author       Notes
- * 2025-06-03     RTT       the first version
+ * 2025-06-03     RV           the first version
  */
 #ifndef APPLICATIONS_FIREMQTT_PORT_MQTT_USR_API_H_
 #define APPLICATIONS_FIREMQTT_PORT_MQTT_USR_API_H_
@@ -20,6 +20,13 @@
 #include <sys/select.h>  // 添加select相关定义
 #include <unistd.h>      // 添加close等系统调用定义
 #include "port.h"
+#include <rtdbg.h>
+
+#ifdef RT_USING_ULOG
+#define MQTT_PRINT(fmt, ...) LOG_D(fmt, ##__VA_ARGS__)
+#else
+#define MQTT_PRINT(fmt, ...) ((void)0)
+#endif
 
 #define MIN(a, b) ((a) < (b) ? (a) : (b))
 
@@ -29,6 +36,7 @@
 #define MQTT_TOPIC_SUB       "rtthread/test/sub"    // 订阅主题
 #define MQTT_TOPIC_PUB       "rtthread/test/pub"    // 发布主题
 #define MQTT_KEEP_ALIVE      60
+#define MQTT_LOOP_CNT        50
 
 #define MAX_RETRY_ATTEMPTS   5                      // 最大重试次数
 #define INITIAL_BACKOFF_MS   1000                   // 初始重连退避时间(毫秒)
@@ -38,6 +46,7 @@ MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t user
 MQTTStatus_t mqttConnect(NetworkContext_t *networkContext);
 MQTTStatus_t mqttSubscribe(MQTTSubscribeInfo_t *subscribeInfo);
 MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo);
+const char *mqttStatus(MQTTStatus_t status);
 bool isSocketReadable(int socket, int timeout_ms);
 
 #endif /* APPLICATIONS_FIREMQTT_PORT_MQTT_USR_API_H_ */

+ 1 - 1
port/port.h

@@ -5,7 +5,7 @@
  *
  * Change Logs:
  * Date           Author       Notes
- * 2025-05-26     RTT       the first version
+ * 2025-05-26     RV           the first version
  */
 #ifndef APPLICATIONS_FIREMQTT_PORT_PORT_H_
 #define APPLICATIONS_FIREMQTT_PORT_PORT_H_