Bläddra i källkod

[fix] 优化mqtt_usr_api.c中的用户API

Yaochenger 7 månader sedan
förälder
incheckning
f2a0e0cc64
5 ändrade filer med 129 tillägg och 85 borttagningar
  1. 69 39
      example/demo.c
  2. 27 0
      port/config.h
  3. 22 37
      port/mqtt_usr_api.c
  4. 10 8
      port/mqtt_usr_api.h
  5. 1 1
      port/port.c

+ 69 - 39
example/demo.c

@@ -15,24 +15,38 @@ static NetworkContext_t networkContext;
 extern MQTTFixedBuffer_t mqttBuffer;
 extern MQTTContext_t mqttContext;
 
-#define MIN(a, b) ((a) < (b) ? (a) : (b))
-
 static void mqttEventCallback(MQTTContext_t *pContext, MQTTPacketInfo_t *pPacketInfo,
         MQTTDeserializedInfo_t *pDeserializedInfo)
 {
-    if (pPacketInfo->type == MQTT_PACKET_TYPE_PUBLISH)
-    {
-        MQTTPublishInfo_t *pPublishInfo = pDeserializedInfo->pPublishInfo;
-        rt_kprintf("收到主题 %.*s 的消息: %.*s\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName,
-                pPublishInfo->payloadLength, (const char *) pPublishInfo->pPayload);
-    }
-    else if (pPacketInfo->type == MQTT_PACKET_TYPE_SUBACK)
+    if (!pContext || !pPacketInfo)
     {
-        rt_kprintf("订阅确认\n");
+        rt_kprintf("Error: Invalid context or packet info\n");
+        return;
     }
-    else if (pPacketInfo->type == MQTT_PACKET_TYPE_PUBACK)
+
+    switch (pPacketInfo->type)
     {
-        rt_kprintf("发布确认\n");
+        case MQTT_PACKET_TYPE_PUBLISH:
+        {
+            if (!pDeserializedInfo || !pDeserializedInfo->pPublishInfo)
+            {
+                rt_kprintf("Error: Invalid publish info\n");
+                return;
+            }
+            MQTTPublishInfo_t *pPublishInfo = pDeserializedInfo->pPublishInfo;
+            rt_kprintf("Received message on topic '%.*s': %.*s\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName,
+                    pPublishInfo->payloadLength, (const char *) pPublishInfo->pPayload);
+            break;
+        }
+        case MQTT_PACKET_TYPE_SUBACK:
+            rt_kprintf("Subscription ACK\n");
+            break;
+        case MQTT_PACKET_TYPE_PUBACK:
+            rt_kprintf("Publish ACK\n"); // QoS0 messages do not trigger this callback.
+            break;
+        default:
+            rt_kprintf("Unhandled packet type: %d\n", pPacketInfo->type);
+            break;
     }
 }
 
@@ -46,109 +60,125 @@ static void mqttClientTask(void *parameter)
 
     if (mqttInit(&networkContext, mqttEventCallback) != MQTTSuccess)
     {
-        rt_kprintf("MQTT 初始化失败\n");
+        rt_kprintf("MQTT initialization failed\n");
         return;
     }
 
     while (1)
     {
+        if (networkContext.socket >= 0)
+        {
+            closesocket(networkContext.socket);
+            networkContext.socket = -1;
+        }
+
         if (mqttConnect(&networkContext) != MQTTSuccess)
         {
             if (retryCount++ >= MAX_RETRY_ATTEMPTS)
             {
-                rt_kprintf("达到最大重试次数,放弃重连\n");
-                break;
+                rt_kprintf("Maximum retry attempts reached, resetting retry count after 60s\n");
+                rt_thread_mdelay(60000);
+                retryCount = 0;
+                backoffMs = INITIAL_BACKOFF_MS;
+                continue;
             }
-            rt_kprintf("将在 %d 毫秒后重试连接\n", backoffMs);
+            rt_kprintf("Connection failed (MQTTServerRefused), retrying in %d ms\n", backoffMs);
             rt_thread_mdelay(backoffMs);
             backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
             continue;
         }
 
-        /* 重置重连参数 */
         retryCount = 0;
         backoffMs = INITIAL_BACKOFF_MS;
         lastPublishTime = getCurrentTime();
 
-        /* 订阅主题 */
-        if (mqttSubscribe() != MQTTSuccess)
+        MQTTSubscribeInfo_t subscribeInfo = {
+            .qos = MQTTQoS0,
+            .pTopicFilter = MQTT_TOPIC_SUB,
+            .topicFilterLength = strlen(MQTT_TOPIC_SUB)
+        };
+
+        if (mqttSubscribe(&subscribeInfo) != MQTTSuccess)
         {
+            rt_kprintf("Subscription failed\n");
             closesocket(networkContext.socket);
             continue;
         }
 
-        /* 主循环:处理消息和发布 */
         while (1)
         {
             uint32_t currentTime = getCurrentTime();
 
-            /* 检查是否有数据可读 */
             if (isSocketReadable(networkContext.socket, 100))
             {
-                /* 处理传入的 MQTT 消息 */
                 status = MQTT_ProcessLoop(&mqttContext);
                 if (status != MQTTSuccess)
                 {
-                    rt_kprintf("MQTT_ProcessLoop 失败: %d\n", status);
+                    rt_kprintf("MQTT_ProcessLoop failed: %d (%s)\n", status,
+                               status == MQTTRecvFailed ? "Receive failed" :
+                               status == MQTTBadResponse ? "Bad response" : "Other error");
                     break;
                 }
             }
 
-            /* 定期发布消息 */
             if (currentTime - lastPublishTime >= publishIntervalMs)
             {
                 static int counter = 0;
                 char payload[64];
-                rt_sprintf(payload, "来自 RT-Thread 的消息: %d", counter++);
-                if (mqttPublish(payload) != MQTTSuccess)
+                rt_sprintf(payload, "[%d]Message from RT-Thread: Hello World", counter++);
+
+                MQTTPublishInfo_t publishInfo = {
+                    .qos = MQTTQoS0,
+                    .pTopicName = MQTT_TOPIC_PUB,
+                    .topicNameLength = strlen(MQTT_TOPIC_PUB),
+                    .pPayload = payload,
+                    .payloadLength = strlen(payload)
+                };
+
+                if (mqttPublish(&publishInfo) != MQTTSuccess)
                 {
-                    rt_kprintf("发布失败,准备重连\n");
+                    rt_kprintf("Publish failed, preparing to reconnect\n");
                     break;
                 }
                 lastPublishTime = currentTime;
             }
 
-            /* 短暂延时,避免CPU占用过高 */
             rt_thread_mdelay(50);
         }
 
-        /* 关闭套接字 */
         if (networkContext.socket >= 0)
         {
             closesocket(networkContext.socket);
             networkContext.socket = -1;
         }
 
-        /* 准备重连 */
-        rt_kprintf("MQTT 连接断开,准备重连\n");
+        rt_kprintf("MQTT connection lost, preparing to reconnect in %d ms\n", backoffMs);
         rt_thread_mdelay(backoffMs);
         backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
     }
 
-    /* 清理资源 */
     if (mqttBuffer.pBuffer != RT_NULL)
     {
         rt_free(mqttBuffer.pBuffer);
         mqttBuffer.pBuffer = RT_NULL;
     }
-    rt_kprintf("MQTT 客户端退出\n");
+    rt_kprintf("MQTT client exited\n");
 }
 
-/* 启动 MQTT 客户端线程 */
 void mqtt_client_start(void)
 {
     rt_thread_t tid = rt_thread_create("mqtt", mqttClientTask,
-    RT_NULL, 4096, /* 栈大小 */
-    10, /* 优先级 */
-    20); /* 时间片 */
+                                               RT_NULL, 4096,
+                                               10,
+                                               20);
     if (tid != RT_NULL)
     {
         rt_thread_startup(tid);
-        rt_kprintf("MQTT 客户端线程启动\n");
+        rt_kprintf("MQTT client thread started\n");
     }
     else
     {
-        rt_kprintf("创建 MQTT 客户端线程失败\n");
+        rt_kprintf("Failed to create MQTT client thread\n");
     }
 }
 

+ 27 - 0
port/config.h

@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2006-2021, RT-Thread Development Team
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2025-06-10     RTT       the first version
+ */
+#ifndef APPLICATIONS_FIREMQTT_PORT_CONFIG_H_
+#define APPLICATIONS_FIREMQTT_PORT_CONFIG_H_
+#include "mqtt_usr_api.h"
+#define MQTT_BROKER_ADDRESS  "broker.emqx.io" // MQTT 代理地址
+#define MQTT_BROKER_PORT     1883                 // MQTT 代理端口
+#define MQTT_CLIENT_ID       "rtthread_mqtt_client" // 客户端 ID
+#define MQTT_TOPIC_SUB       "rtthread/test/sub"   // 订阅主题
+#define MQTT_TOPIC_PUB       "rtthread/test/pub"   // 发布主题
+#define MQTT_KEEP_ALIVE      60
+#define MQTT_BUF_SIZE        60
+
+#define MAX_RETRY_ATTEMPTS   5                     // 最大重试次数
+#define INITIAL_BACKOFF_MS   1000                  // 初始重连退避时间(毫秒)
+#define MAX_BACKOFF_MS       60000
+
+#define MQTT_USERCALLBACK mqttEventCallback
+
+#endif /* APPLICATIONS_FIREMQTT_PORT_CONFIG_H_ */

+ 22 - 37
port/mqtt_usr_api.c

@@ -11,36 +11,33 @@
 #include "mqtt_usr_api.h"
 
 MQTTFixedBuffer_t mqttBuffer = { .pBuffer = RT_NULL, .size = 1024 };
-static TransportInterface_t transportInterface;
 MQTTContext_t mqttContext;
+TransportInterface_t transportInterface;
 
 MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t userCallback)
 {
     MQTTStatus_t status;
 
-    /* 配置传输接口 */
     transportInterface.pNetworkContext = networkContext;
     transportInterface.send = transportSend;
     transportInterface.recv = transportRecv;
 
-    /* 初始化 MQTT 缓冲区 */
-    mqttBuffer.pBuffer = rt_malloc(mqttBuffer.size);
+    mqttBuffer.pBuffer = rt_malloc(mqttBuffer.size); // 缓存
     if (mqttBuffer.pBuffer == RT_NULL)
     {
-        rt_kprintf("分配 MQTT 缓冲区失败\n");
-        return MQTTSendFailed;
+        rt_kprintf("Failed to allocate MQTT buffer\n");
+        return MQTTNoMemory;
     }
 
-    /* 初始化 MQTT 上下文 */
     status = MQTT_Init(&mqttContext, &transportInterface, getCurrentTime, userCallback, &mqttBuffer);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Init 失败: %d\n", status);
+        rt_kprintf("MQTT_Init failed: %d\n", status);
         rt_free(mqttBuffer.pBuffer);
         return status;
     }
 
-    rt_kprintf("MQTT 客户端初始化成功\n");
+    rt_kprintf("MQTT client initialized successfully\n");
     return MQTTSuccess;
 }
 
@@ -50,17 +47,17 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
     MQTTConnectInfo_t connectInfo = { 0 };
     bool sessionPresent;
 
-    /* 配置连接信息 */
+    /* Configure connection information */
     connectInfo.clientIdentifierLength = strlen(MQTT_CLIENT_ID);
     connectInfo.pClientIdentifier = MQTT_CLIENT_ID;
     connectInfo.keepAliveSeconds = MQTT_KEEP_ALIVE;
     connectInfo.cleanSession = true;
 
-    /* 建立 TCP 连接 */
+    /* Establish TCP connection */
     networkContext->socket = socket(AF_INET, SOCK_STREAM, 0);
     if (networkContext->socket < 0)
     {
-        rt_kprintf("创建 socket 失败\n");
+        rt_kprintf("Failed to create socket\n");
         return MQTTSendFailed;
     }
 
@@ -70,7 +67,7 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
     struct hostent *host = gethostbyname(MQTT_BROKER_ADDRESS);
     if (host == NULL || host->h_addr_list[0] == NULL)
     {
-        rt_kprintf("解析代理地址失败\n");
+        rt_kprintf("Failed to resolve broker address\n");
         closesocket(networkContext->socket);
         return MQTTSendFailed;
     }
@@ -78,65 +75,53 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
 
     if (connect(networkContext->socket, (struct sockaddr *) &serverAddr, sizeof(serverAddr)) < 0)
     {
-        rt_kprintf("连接代理失败\n");
+        rt_kprintf("Failed to connect to broker\n");
         closesocket(networkContext->socket);
         return MQTTSendFailed;
     }
 
-    /* MQTT 连接 */
+    /* MQTT connection */
     status = MQTT_Connect(&mqttContext, &connectInfo, NULL, 10000, &sessionPresent);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Connect 失败: %d\n", status);
+        rt_kprintf("MQTT_Connect failed: %d\n", status);
         closesocket(networkContext->socket);
         return status;
     }
 
-    rt_kprintf("成功连接到 MQTT 代理\n");
+    rt_kprintf("Successfully connected to MQTT broker\n");
     return MQTTSuccess;
 }
 
-MQTTStatus_t mqttSubscribe(void)
+MQTTStatus_t mqttSubscribe(MQTTSubscribeInfo_t *subscribeInfo)
 {
     MQTTStatus_t status;
-    MQTTSubscribeInfo_t subscribeInfo = { 0 };
-
-    subscribeInfo.qos = MQTTQoS0;
-    subscribeInfo.pTopicFilter = MQTT_TOPIC_SUB;
-    subscribeInfo.topicFilterLength = strlen(MQTT_TOPIC_SUB);
 
     uint16_t packetId = MQTT_GetPacketId(&mqttContext);
-    status = MQTT_Subscribe(&mqttContext, &subscribeInfo, 1, packetId);
+    status = MQTT_Subscribe(&mqttContext, subscribeInfo, 1, packetId);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Subscribe 失败: %d\n", status);
+        rt_kprintf("MQTT_Subscribe failed: %d\n", status);
         return status;
     }
 
-    rt_kprintf("订阅主题: %s\n", MQTT_TOPIC_SUB);
+    rt_kprintf("Subscribed to topic: %s\n", MQTT_TOPIC_SUB);
     return MQTTSuccess;
 }
 
-MQTTStatus_t mqttPublish(const char *payload)
+MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo)
 {
     MQTTStatus_t status;
-    MQTTPublishInfo_t publishInfo = { 0 };
-
-    publishInfo.qos = MQTTQoS0;
-    publishInfo.pTopicName = MQTT_TOPIC_PUB;
-    publishInfo.topicNameLength = strlen(MQTT_TOPIC_PUB);
-    publishInfo.pPayload = payload;
-    publishInfo.payloadLength = strlen(payload);
 
     uint16_t packetId = MQTT_GetPacketId(&mqttContext);
-    status = MQTT_Publish(&mqttContext, &publishInfo, packetId);
+    status = MQTT_Publish(&mqttContext, publishInfo, packetId);
     if (status != MQTTSuccess)
     {
-        rt_kprintf("MQTT_Publish 失败: %d\n", status);
+        rt_kprintf("MQTT_Publish failed: %d\n", status);
         return status;
     }
 
-    rt_kprintf("发布消息: %s\n", payload);
+    rt_kprintf("Published message: %s\n", publishInfo->pPayload);
     return MQTTSuccess;
 }
 

+ 10 - 8
port/mqtt_usr_api.h

@@ -21,21 +21,23 @@
 #include <unistd.h>      // 添加close等系统调用定义
 #include "port.h"
 
-#define MQTT_BROKER_ADDRESS  "broker.emqx.io" // MQTT 代理地址
-#define MQTT_BROKER_PORT     1883                 // MQTT 代理端口
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+
+#define MQTT_BROKER_ADDRESS  "broker.emqx.io"       // MQTT 代理地址
+#define MQTT_BROKER_PORT     1883                   // MQTT 代理端口
 #define MQTT_CLIENT_ID       "rtthread_mqtt_client" // 客户端 ID
-#define MQTT_TOPIC_SUB       "rtthread/test/sub"   // 订阅主题
-#define MQTT_TOPIC_PUB       "rtthread/test/pub"   // 发布主题
+#define MQTT_TOPIC_SUB       "rtthread/test/sub"    // 订阅主题
+#define MQTT_TOPIC_PUB       "rtthread/test/pub"    // 发布主题
 #define MQTT_KEEP_ALIVE      60
 
-#define MAX_RETRY_ATTEMPTS   5                     // 最大重试次数
-#define INITIAL_BACKOFF_MS   1000                  // 初始重连退避时间(毫秒)
+#define MAX_RETRY_ATTEMPTS   5                      // 最大重试次数
+#define INITIAL_BACKOFF_MS   1000                   // 初始重连退避时间(毫秒)
 #define MAX_BACKOFF_MS       60000
 
 MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t userCallback);
 MQTTStatus_t mqttConnect(NetworkContext_t *networkContext);
-MQTTStatus_t mqttSubscribe(void);
-MQTTStatus_t mqttPublish(const char *payload);
+MQTTStatus_t mqttSubscribe(MQTTSubscribeInfo_t *subscribeInfo);
+MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo);
 bool isSocketReadable(int socket, int timeout_ms);
 
 #endif /* APPLICATIONS_FIREMQTT_PORT_MQTT_USR_API_H_ */

+ 1 - 1
port/port.c

@@ -10,7 +10,7 @@
 
 uint32_t getCurrentTime(void)
 {
-    return rt_tick_get() / (1000 / RT_TICK_PER_SECOND);
+    return rt_tick_get() / (1000 / RT_TICK_PER_SECOND); //ms
 }
 
 int32_t transportSend(NetworkContext_t *pNetworkContext, const void *pBuffer, size_t bytesToSend)