Jelajahi Sumber

Merge branch 'master' of https://github.com/Yaochenger/firemqtt

Yaochenger 7 bulan lalu
induk
melakukan
3c73fe7c9e
1 mengubah file dengan 56 tambahan dan 15 penghapusan
  1. 56 15
      example/demo.c

+ 56 - 15
example/demo.c

@@ -6,6 +6,8 @@
 #include <string.h>
 #include <stdlib.h>
 #include "port.h"
+#include <sys/select.h>  // 添加select相关定义
+#include <unistd.h>      // 添加close等系统调用定义
 
 /* MQTT 代理配置 */
 #define MQTT_BROKER_ADDRESS  "broker.emqx.io" // MQTT 代理地址
@@ -181,11 +183,28 @@ static MQTTStatus_t mqttPublish(const char *payload) {
     return MQTTSuccess;
 }
 
+/* 检查套接字是否可读 */
+static bool isSocketReadable(int socket, int timeout_ms) {
+    fd_set readfds;
+    struct timeval timeout;
+
+    FD_ZERO(&readfds);
+    FD_SET(socket, &readfds);
+
+    timeout.tv_sec = timeout_ms / 1000;
+    timeout.tv_usec = (timeout_ms % 1000) * 1000;
+
+    int result = select(socket + 1, &readfds, NULL, NULL, &timeout);
+    return (result > 0 && FD_ISSET(socket, &readfds));
+}
+
 /* MQTT 客户端任务 */
 static void mqttClientTask(void *parameter) {
     MQTTStatus_t status;
     uint32_t retryCount = 0;
     uint32_t backoffMs = INITIAL_BACKOFF_MS;
+    uint32_t lastPublishTime = 0;
+    const uint32_t publishIntervalMs = 5000; // 5秒发布一次
 
     /* 初始化 MQTT */
     if (mqttInit() != MQTTSuccess) {
@@ -209,6 +228,7 @@ static void mqttClientTask(void *parameter) {
         /* 重置重连参数 */
         retryCount = 0;
         backoffMs = INITIAL_BACKOFF_MS;
+        lastPublishTime = desk_getCurrentTime();
 
         /* 订阅主题 */
         if (mqttSubscribe() != MQTTSuccess) {
@@ -218,30 +238,51 @@ static void mqttClientTask(void *parameter) {
 
         /* 主循环:处理消息和发布 */
         while (1) {
-            /* 处理传入的 MQTT 消息 */
-            status = MQTT_ProcessLoop(&mqttContext);
-            if (status != MQTTSuccess) {
-                rt_kprintf("MQTT_ProcessLoop 失败: %d\n", status);
-                closesocket(networkContext.socket);
-                break;
+            uint32_t currentTime = desk_getCurrentTime();
+
+            /* 检查是否有数据可读 */
+            if (isSocketReadable(networkContext.socket, 100)) {
+                /* 处理传入的 MQTT 消息 */
+                status = MQTT_ProcessLoop(&mqttContext);
+                if (status != MQTTSuccess) {
+                    rt_kprintf("MQTT_ProcessLoop 失败: %d\n", status);
+                    break;
+                }
             }
 
             /* 定期发布消息 */
-            static int counter = 0;
-            char payload[64];
-            rt_sprintf(payload, "来自 RT-Thread 的消息: %d", counter++);
-            if (mqttPublish(payload) != MQTTSuccess) {
-                rt_kprintf("发布失败,准备重连\n");
-                closesocket(networkContext.socket);
-                break;
+            if (currentTime - lastPublishTime >= publishIntervalMs) {
+                static int counter = 0;
+                char payload[64];
+                rt_sprintf(payload, "来自 RT-Thread 的消息: %d", counter++);
+                if (mqttPublish(payload) != MQTTSuccess) {
+                    rt_kprintf("发布失败,准备重连\n");
+                    break;
+                }
+                lastPublishTime = currentTime;
             }
 
-            rt_thread_mdelay(50); /* 每 5 秒发布一次 */
+            /* 短暂延时,避免CPU占用过高 */
+            rt_thread_mdelay(50);
         }
+
+        /* 关闭套接字 */
+        if (networkContext.socket >= 0) {
+            closesocket(networkContext.socket);
+            networkContext.socket = -1;
+        }
+
+        /* 准备重连 */
+        rt_kprintf("MQTT 连接断开,准备重连\n");
+        rt_thread_mdelay(backoffMs);
+        backoffMs = MIN(backoffMs * 2, MAX_BACKOFF_MS);
     }
 
     /* 清理资源 */
-    rt_free(mqttBuffer.pBuffer);
+    if (mqttBuffer.pBuffer != RT_NULL) {
+        rt_free(mqttBuffer.pBuffer);
+        mqttBuffer.pBuffer = RT_NULL;
+    }
     rt_kprintf("MQTT 客户端退出\n");
 }