Selaa lähdekoodia

[fix] 调整目录结构,优化示例

Yaochenger 7 kuukautta sitten
vanhempi
sitoutus
57282a8c31

+ 9 - 7
SConscript

@@ -7,20 +7,22 @@ Import('SDK_LIB')
 cwd = GetCurrentDir()
 
 src = Split('''
-core_mqtt.c
-core_mqtt_state.c
-core_mqtt_serializer.c
+api/mqtt_api.c
+core/core_mqtt.c
+core/core_mqtt_state.c
+core/core_mqtt_serializer.c
+demo/demo.c
 port/port.c
-demo.c
 ''')
 
 path =  [cwd]
-path += [cwd + '/include']
-path += [cwd + '/interface']
+path += [cwd + '/api']
 path += [cwd + '/port']
+path += [cwd + '/core/include']
+path += [cwd + '/core/interface']
 
 print(path)
 
-group = DefineGroup('firemqtt', src, depend = [''], CPPPATH = path)
+group = DefineGroup('FreeMQTT', src, depend = [''], CPPPATH = path)
 
 Return('group')

+ 128 - 6
port/mqtt_usr_api.c → api/mqtt_api.c

@@ -11,11 +11,12 @@
 #define DBG_TAG "MQTT"
 #define DBG_LVL DBG_LOG
 
-#include "mqtt_usr_api.h"
+#include "mqtt_api.h"
 
-MQTTFixedBuffer_t mqttBuffer = { .pBuffer = RT_NULL, .size = 1024 };
-MQTTContext_t mqttContext;
-TransportInterface_t transportInterface;
+static MQTTFixedBuffer_t mqttBuffer = { .pBuffer = RT_NULL, .size = 1024 };
+static MQTTContext_t mqttContext;
+static TransportInterface_t transportInterface;
+static NetworkContext_t networkContext;
 
 MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t userCallback)
 {
@@ -127,7 +128,7 @@ MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo)
     return MQTTSuccess;
 }
 
-bool isSocketReadable(int socket, int timeout_ms)
+static bool isSocketReadable(int socket, int timeout_ms)
 {
     fd_set readfds;
     struct timeval timeout;
@@ -144,7 +145,7 @@ bool isSocketReadable(int socket, int timeout_ms)
 
 const char *mqttStatus(MQTTStatus_t status)
 {
-    static const char *const statusStrings[] = {
+    const char *const statusStrings[] = {
         "Success",
         "BadParameter",
         "NoMemory",
@@ -170,3 +171,124 @@ const char *mqttStatus(MQTTStatus_t status)
     }
     return "Unknown";
 }
+
+static void mqttEventCallback(MQTTContext_t *pContext, MQTTPacketInfo_t *pPacketInfo,
+        MQTTDeserializedInfo_t *pDeserializedInfo)
+{
+    if (!pContext || !pPacketInfo)
+    {
+        rt_kprintf("Error: Invalid context or packet info\n");
+        return;
+    }
+
+    switch (pPacketInfo->type)
+    {
+        case MQTT_PACKET_TYPE_PUBLISH:
+        {
+            if (!pDeserializedInfo || !pDeserializedInfo->pPublishInfo)
+            {
+                rt_kprintf("Error: Invalid publish info\n");
+                return;
+            }
+            MQTTPublishInfo_t *pPublishInfo = pDeserializedInfo->pPublishInfo;
+            rt_kprintf("Received message on topic '%.*s': %.*s\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName,
+                    pPublishInfo->payloadLength, (const char *) pPublishInfo->pPayload);
+            break;
+        }
+        case MQTT_PACKET_TYPE_SUBACK:
+            rt_kprintf("Subscription ACK\n");
+            break;
+        case MQTT_PACKET_TYPE_PUBACK:
+            rt_kprintf("Publish ACK\n"); // QoS0 messages do not trigger this callback.
+            break;
+        default:
+            break;
+    }
+}
+
+void mqttClientTask(void *parameter)
+{
+    MQTTStatus_t status;
+    uint32_t retryCount = 0;
+    uint32_t backoffMs = INITIAL_BACKOFF_MS;
+    bool isConnected = false;
+
+    if (mqttInit(&networkContext, mqttEventCallback) != MQTTSuccess)
+    {
+        MQTT_PRINT("MQTT initialization failed\n");
+        return;
+    }
+
+    while (1)
+    {
+        if (networkContext.socket >= 0)
+        {
+            closesocket(networkContext.socket);
+            networkContext.socket = -1;
+        }
+
+        status = mqttConnect(&networkContext);
+        if (status != MQTTSuccess)
+        {
+            MQTT_PRINT("Connection failed: %d (%s), retrying in %d ms\n", status, mqttStatus(status), backoffMs);
+            if (retryCount++ >= MAX_RETRY_ATTEMPTS)
+            {
+                MQTT_PRINT("Maximum retry attempts reached, resetting retry count after 60s\n");
+                rt_thread_mdelay(30000);
+                retryCount = 0;
+                backoffMs = INITIAL_BACKOFF_MS;
+            }
+            else
+            {
+                rt_thread_mdelay(backoffMs);
+                backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
+            }
+            continue;
+        }
+
+        isConnected = true;
+        retryCount = 0;
+        backoffMs = INITIAL_BACKOFF_MS;
+
+        while (1)
+        {
+            if (isSocketReadable(networkContext.socket, 100))
+            {
+                status = MQTT_ProcessLoop(&mqttContext);
+                if (status != MQTTSuccess)
+                {
+                    MQTT_PRINT("MQTT_ProcessLoop failed: %d (%s)\n", status, mqttStatus(status));
+                    break;
+                }
+            }
+            rt_thread_mdelay(MQTT_LOOP_CNT);
+        }
+
+        if (isConnected && networkContext.socket >= 0)
+        {
+            status = MQTT_Disconnect(&mqttContext);
+            if (status != MQTTSuccess)
+            {
+                MQTT_PRINT("MQTT_Disconnect failed: %d (%s)\n", status, mqttStatus(status));
+            }
+            isConnected = false;
+        }
+
+        if (networkContext.socket >= 0)
+        {
+            closesocket(networkContext.socket);
+            networkContext.socket = -1;
+        }
+
+        MQTT_PRINT("MQTT connection lost, preparing to reconnect in %d ms\n", backoffMs);
+        rt_thread_mdelay(backoffMs);
+        backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
+    }
+
+    if (mqttBuffer.pBuffer != RT_NULL)
+    {
+        rt_free(mqttBuffer.pBuffer);
+        mqttBuffer.pBuffer = RT_NULL;
+    }
+    MQTT_PRINT("MQTT client exited\n");
+}

+ 2 - 14
port/mqtt_usr_api.h → api/mqtt_api.h

@@ -21,6 +21,7 @@
 #include <unistd.h>      // 添加close等系统调用定义
 #include "port.h"
 #include <rtdbg.h>
+#include "config.h"
 
 #ifdef RT_USING_ULOG
 #define MQTT_PRINT(fmt, ...) LOG_D(fmt, ##__VA_ARGS__)
@@ -30,23 +31,10 @@
 
 #define MIN(a, b) ((a) < (b) ? (a) : (b))
 
-#define MQTT_BROKER_ADDRESS  "broker.emqx.io"       // MQTT 代理地址
-#define MQTT_BROKER_PORT     1883                   // MQTT 代理端口
-#define MQTT_CLIENT_ID       "rtthread_mqtt_client" // 客户端 ID
-#define MQTT_TOPIC_SUB       "rtthread/test/sub"    // 订阅主题
-#define MQTT_TOPIC_PUB       "rtthread/test/pub"    // 发布主题
-#define MQTT_KEEP_ALIVE      60
-#define MQTT_LOOP_CNT        50
-
-#define MAX_RETRY_ATTEMPTS   5                      // 最大重试次数
-#define INITIAL_BACKOFF_MS   1000                   // 初始重连退避时间(毫秒)
-#define MAX_BACKOFF_MS       60000
-
 MQTTStatus_t mqttInit(NetworkContext_t *networkContext, MQTTEventCallback_t userCallback);
 MQTTStatus_t mqttConnect(NetworkContext_t *networkContext);
 MQTTStatus_t mqttSubscribe(MQTTSubscribeInfo_t *subscribeInfo);
 MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo);
-const char *mqttStatus(MQTTStatus_t status);
-bool isSocketReadable(int socket, int timeout_ms);
+void mqttClientTask(void *parameter);
 
 #endif /* APPLICATIONS_FIREMQTT_PORT_MQTT_USR_API_H_ */

+ 2 - 2
port/config.h → config.h

@@ -9,14 +9,14 @@
  */
 #ifndef APPLICATIONS_FIREMQTT_PORT_CONFIG_H_
 #define APPLICATIONS_FIREMQTT_PORT_CONFIG_H_
-#include "mqtt_usr_api.h"
+#include "mqtt_api.h"
 #define MQTT_BROKER_ADDRESS  "broker.emqx.io" // MQTT 代理地址
 #define MQTT_BROKER_PORT     1883                 // MQTT 代理端口
 #define MQTT_CLIENT_ID       "rtthread_mqtt_client" // 客户端 ID
 #define MQTT_TOPIC_SUB       "rtthread/test/sub"   // 订阅主题
 #define MQTT_TOPIC_PUB       "rtthread/test/pub"   // 发布主题
 #define MQTT_KEEP_ALIVE      60
-#define MQTT_BUF_SIZE        60
+#define MQTT_LOOP_CNT        60
 
 #define MAX_RETRY_ATTEMPTS   5                     // 最大重试次数
 #define INITIAL_BACKOFF_MS   1000                  // 初始重连退避时间(毫秒)

+ 0 - 0
core_mqtt.c → core/core_mqtt.c


+ 0 - 0
core_mqtt_serializer.c → core/core_mqtt_serializer.c


+ 0 - 0
core_mqtt_state.c → core/core_mqtt_state.c


+ 0 - 0
include/core_mqtt.h → core/include/core_mqtt.h


+ 0 - 0
include/core_mqtt_config_defaults.h → core/include/core_mqtt_config_defaults.h


+ 0 - 0
include/core_mqtt_serializer.h → core/include/core_mqtt_serializer.h


+ 0 - 0
include/core_mqtt_state.h → core/include/core_mqtt_state.h


+ 0 - 0
interface/transport_interface.h → core/interface/transport_interface.h


+ 84 - 0
demo/demo.c

@@ -0,0 +1,84 @@
+#include <rtthread.h>
+#include <wlan_mgnt.h>
+
+#define DBG_TAG "MQTT"
+#define DBG_LVL DBG_LOG
+
+#include "mqtt_api.h"
+
+void mqtt_client_start(void)
+{
+    rt_wlan_unregister_event_handler(RT_WLAN_EVT_READY);
+
+    rt_thread_t tid = rt_thread_create("mqtt", mqttClientTask,
+    RT_NULL, 4096, 10, 20);
+    if (tid != RT_NULL)
+    {
+        rt_thread_startup(tid);
+        MQTT_PRINT("MQTT client thread started\n");
+    }
+    else
+    {
+        MQTT_PRINT("Failed to create MQTT client thread\n");
+    }
+}
+
+static int mqtt_pub(int argc, char **argv)
+{
+    MQTTStatus_t status;
+    MQTTPublishInfo_t publishInfo;
+
+    if (argc != 2)
+    {
+        rt_kprintf("Usage: mqtt_pub <message>\n");
+        return -RT_ERROR;
+    }
+
+    publishInfo.qos = MQTTQoS0;
+    publishInfo.pTopicName = MQTT_TOPIC_PUB;
+    publishInfo.topicNameLength = strlen(MQTT_TOPIC_PUB);
+    publishInfo.pPayload = argv[1];
+    publishInfo.payloadLength = strlen(argv[1]);
+
+    status = mqttPublish(&publishInfo);
+    if (status != MQTTSuccess)
+    {
+        rt_kprintf("MQTT publish failed: %d\n", status);
+        return -RT_ERROR;
+    }
+
+    rt_kprintf("Published message: %s to topic: %s\n", argv[1], MQTT_TOPIC_PUB);
+    return RT_EOK;
+}
+#ifdef RT_USING_FINSH
+MSH_CMD_EXPORT_ALIAS(mqtt_pub, mqtt_pub, Send MQTT message);
+#endif
+
+static int mqtt_sub(int argc, char **argv)
+{
+    MQTTStatus_t status;
+    MQTTSubscribeInfo_t subscribeInfo;
+
+    if (argc != 2)
+    {
+        rt_kprintf("Usage: mqtt_sub <topic>\n");
+        return -RT_ERROR;
+    }
+
+    subscribeInfo.qos = MQTTQoS0;
+    subscribeInfo.pTopicFilter = argv[1];
+    subscribeInfo.topicFilterLength = strlen(argv[1]);
+
+    status = mqttSubscribe(&subscribeInfo);
+    if (status != MQTTSuccess)
+    {
+        rt_kprintf("MQTT subscribe failed: %d\n", status);
+        return -RT_ERROR;
+    }
+
+    rt_kprintf("Subscribed to topic: %s\n", argv[1]);
+    return RT_EOK;
+}
+#ifdef RT_USING_FINSH
+MSH_CMD_EXPORT_ALIAS(mqtt_sub, mqtt_sub, Subscribe MQTT message);
+#endif

+ 0 - 220
example/demo.c

@@ -1,220 +0,0 @@
-#include <rtthread.h>
-#include <core_mqtt.h>
-#include <transport_interface.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <string.h>
-#include <stdlib.h>
-#include "port.h"
-#include <sys/select.h>
-#include <unistd.h>
-#include "port.h"
-
-#define DBG_TAG "MQTT"
-#define DBG_LVL DBG_LOG
-
-#include "mqtt_usr_api.h"
-
-static NetworkContext_t networkContext;
-extern MQTTFixedBuffer_t mqttBuffer;
-extern MQTTContext_t mqttContext;
-
-static void mqttEventCallback(MQTTContext_t *pContext, MQTTPacketInfo_t *pPacketInfo,
-        MQTTDeserializedInfo_t *pDeserializedInfo)
-{
-    if (!pContext || !pPacketInfo)
-    {
-        rt_kprintf("Error: Invalid context or packet info\n");
-        return;
-    }
-
-    switch (pPacketInfo->type)
-    {
-        case MQTT_PACKET_TYPE_PUBLISH:
-        {
-            if (!pDeserializedInfo || !pDeserializedInfo->pPublishInfo)
-            {
-                rt_kprintf("Error: Invalid publish info\n");
-                return;
-            }
-            MQTTPublishInfo_t *pPublishInfo = pDeserializedInfo->pPublishInfo;
-            rt_kprintf("Received message on topic '%.*s': %.*s\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName,
-                    pPublishInfo->payloadLength, (const char *) pPublishInfo->pPayload);
-            break;
-        }
-        case MQTT_PACKET_TYPE_SUBACK:
-            rt_kprintf("Subscription ACK\n");
-            break;
-        case MQTT_PACKET_TYPE_PUBACK:
-            rt_kprintf("Publish ACK\n"); // QoS0 messages do not trigger this callback.
-            break;
-        default:
-            rt_kprintf("Other packet type\n");
-            break;
-    }
-}
-
-static void mqttClientTask(void *parameter)
-{
-    MQTTStatus_t status;
-    uint32_t retryCount = 0;
-    uint32_t backoffMs = INITIAL_BACKOFF_MS;
-    bool isConnected = false;
-
-    if (mqttInit(&networkContext, mqttEventCallback) != MQTTSuccess)
-    {
-        MQTT_PRINT("MQTT initialization failed\n");
-        return;
-    }
-
-    while (1)
-    {
-        if (networkContext.socket >= 0)
-        {
-            closesocket(networkContext.socket);
-            networkContext.socket = -1;
-        }
-
-        status = mqttConnect(&networkContext);
-        if (status != MQTTSuccess)
-        {
-            MQTT_PRINT("Connection failed: %d (%s), retrying in %d ms\n", status, mqttStatus(status), backoffMs);
-            if (retryCount++ >= MAX_RETRY_ATTEMPTS)
-            {
-                MQTT_PRINT("Maximum retry attempts reached, resetting retry count after 60s\n");
-                rt_thread_mdelay(30000);
-                retryCount = 0;
-                backoffMs = INITIAL_BACKOFF_MS;
-            }
-            else
-            {
-                rt_thread_mdelay(backoffMs);
-                backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
-            }
-            continue;
-        }
-
-        isConnected = true;
-        retryCount = 0;
-        backoffMs = INITIAL_BACKOFF_MS;
-
-        while (1)
-        {
-            if (isSocketReadable(networkContext.socket, 100))
-            {
-                status = MQTT_ProcessLoop(&mqttContext);
-                if (status != MQTTSuccess)
-                {
-                    MQTT_PRINT("MQTT_ProcessLoop failed: %d (%s)\n", status, mqttStatus(status));
-                    break;
-                }
-            }
-            rt_thread_mdelay(MQTT_LOOP_CNT);
-        }
-
-        if (isConnected && networkContext.socket >= 0)
-        {
-            status = MQTT_Disconnect(&mqttContext);
-            if (status != MQTTSuccess)
-            {
-                MQTT_PRINT("MQTT_Disconnect failed: %d (%s)\n", status, mqttStatus(status));
-            }
-            isConnected = false;
-        }
-
-        if (networkContext.socket >= 0)
-        {
-            closesocket(networkContext.socket);
-            networkContext.socket = -1;
-        }
-
-        MQTT_PRINT("MQTT connection lost, preparing to reconnect in %d ms\n", backoffMs);
-        rt_thread_mdelay(backoffMs);
-        backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
-    }
-
-    if (mqttBuffer.pBuffer != RT_NULL)
-    {
-        rt_free(mqttBuffer.pBuffer);
-        mqttBuffer.pBuffer = RT_NULL;
-    }
-    MQTT_PRINT("MQTT client exited\n");
-}
-
-#include <wlan_mgnt.h>
-void mqtt_client_start(void)
-{
-    rt_wlan_unregister_event_handler(RT_WLAN_EVT_READY);
-
-    rt_thread_t tid = rt_thread_create("mqtt", mqttClientTask,
-    RT_NULL, 4096, 10, 20);
-    if (tid != RT_NULL)
-    {
-        rt_thread_startup(tid);
-        MQTT_PRINT("MQTT client thread started\n");
-    }
-    else
-    {
-        MQTT_PRINT("Failed to create MQTT client thread\n");
-    }
-}
-
-static int mqtt_pub(int argc, char **argv)
-{
-    MQTTStatus_t status;
-    MQTTPublishInfo_t publishInfo;
-
-    if (argc != 2)
-    {
-        rt_kprintf("Usage: mqtt_pub <message>\n");
-        return -RT_ERROR;
-    }
-
-    publishInfo.qos = MQTTQoS0;
-    publishInfo.pTopicName = MQTT_TOPIC_PUB;
-    publishInfo.topicNameLength = strlen(MQTT_TOPIC_PUB);
-    publishInfo.pPayload = argv[1];
-    publishInfo.payloadLength = strlen(argv[1]);
-
-    status = mqttPublish(&publishInfo);
-    if (status != MQTTSuccess)
-    {
-        rt_kprintf("MQTT publish failed: %d (%s)\n", status, mqttStatus(status));
-        return -RT_ERROR;
-    }
-
-    rt_kprintf("Published message: %s to topic: %s\n", argv[1], MQTT_TOPIC_PUB);
-    return RT_EOK;
-}
-#ifdef RT_USING_FINSH
-MSH_CMD_EXPORT_ALIAS(mqtt_pub, mqtt_pub, Send MQTT message);
-#endif
-
-static int mqtt_sub(int argc, char **argv)
-{
-    MQTTStatus_t status;
-    MQTTSubscribeInfo_t subscribeInfo;
-
-    if (argc != 2)
-    {
-        rt_kprintf("Usage: mqtt_sub <topic>\n");
-        return -RT_ERROR;
-    }
-
-    subscribeInfo.qos = MQTTQoS0;
-    subscribeInfo.pTopicFilter = argv[1];
-    subscribeInfo.topicFilterLength = strlen(argv[1]);
-
-    status = mqttSubscribe(&subscribeInfo);
-    if (status != MQTTSuccess)
-    {
-        rt_kprintf("MQTT subscribe failed: %d (%s)\n", status, mqttStatus(status));
-        return -RT_ERROR;
-    }
-
-    rt_kprintf("Subscribed to topic: %s\n", argv[1]);
-    return RT_EOK;
-}
-#ifdef RT_USING_FINSH
-MSH_CMD_EXPORT_ALIAS(mqtt_sub, mqtt_sub, Subscribe MQTT message);
-#endif