RyanCW 1 год назад
Родитель
Сommit
f79bed6b40
2 измененных файлов с 35 добавлено и 26 удалено
  1. 20 19
      mqttclient/RyanMqttClient.h
  2. 15 7
      mqttclient/RyanMqttThread.c

+ 20 - 19
mqttclient/RyanMqttClient.h

@@ -92,25 +92,26 @@ extern "C"
 
     typedef struct
     {
-        uint8_t lwtFlag : 1;               // 遗嘱标志位
-        uint8_t destoryFlag : 1;           // 销毁标志位
-        uint16_t ackHandlerCount;          // 等待ack的记录个数
-        uint16_t packetId;                 // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
-        uint32_t eventFlag;                // 事件标志位
-        RyanMqttState_e clientState;       // mqtt客户端的状态
-        RyanList_t msgHandlerList;         // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
-        RyanList_t ackHandlerList;         // 维护ack链表
-        RyanList_t userAckHandlerList;     // 用户接口的ack链表,会由mqtt线程移动到ack链表
-        platformTimer_t keepaliveTimer;    // 保活定时器
-        platformNetwork_t network;         // 网络组件
-        RyanMqttClientConfig_t config;     // mqtt config
-        platformThread_t mqttThread;       // mqtt线程
-        platformMutex_t msgHandleLock;     // msg链表锁
-        platformMutex_t ackHandleLock;     // ack链表锁
-        platformMutex_t userAckHandleLock; // 用户接口的ack链表锁
-        platformMutex_t sendBufLock;       // 写缓冲区锁
-        platformCritical_t criticalLock;   // 临界区锁
-        lwtOptions_t lwtOptions;           // 遗嘱相关配置
+        uint8_t lwtFlag : 1;                  // 遗嘱标志位
+        uint8_t destoryFlag : 1;              // 销毁标志位
+        uint16_t ackHandlerCount;             // 等待ack的记录个数
+        uint16_t packetId;                    // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
+        uint32_t eventFlag;                   // 事件标志位
+        RyanMqttState_e clientState;          // mqtt客户端的状态
+        RyanList_t msgHandlerList;            // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
+        RyanList_t ackHandlerList;            // 维护ack链表
+        RyanList_t userAckHandlerList;        // 用户接口的ack链表,会由mqtt线程移动到ack链表
+        platformTimer_t keepaliveTimer;       // 保活定时器
+        platformTimer_t keepaliveDebounTimer; // 保活定时器消抖
+        platformNetwork_t network;            // 网络组件
+        RyanMqttClientConfig_t config;        // mqtt config
+        platformThread_t mqttThread;          // mqtt线程
+        platformMutex_t msgHandleLock;        // msg链表锁
+        platformMutex_t ackHandleLock;        // ack链表锁
+        platformMutex_t userAckHandleLock;    // 用户接口的ack链表锁
+        platformMutex_t sendBufLock;          // 写缓冲区锁
+        platformCritical_t criticalLock;      // 临界区锁
+        lwtOptions_t lwtOptions;              // 遗嘱相关配置
     } RyanMqttClient_t;
 
     /* extern variables-----------------------------------------------------------*/

+ 15 - 7
mqttclient/RyanMqttThread.c

@@ -27,8 +27,14 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
     int32_t packetLen = 0;
     RyanMqttAssert(NULL != client);
 
+    // mqtt没有连接就退出
+    if (RyanMqttConnectState != RyanMqttGetClientState(client))
+        return RyanMqttNotConnectError;
+
+    uint32_t timeRemain = platformTimerRemain(&client->keepaliveTimer);
+
     // 超过设置的 1.4 倍心跳周期
-    if (0 == platformTimerRemain(&client->keepaliveTimer))
+    if (0 == timeRemain)
     {
         connectState = RyanMqttKeepaliveTimeout;
         RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
@@ -36,14 +42,14 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
         return RyanMqttFailedError;
     }
 
-    // 当到达 0.9 倍时间时发送心跳包
-    else if (1000 * 0.5 * client->config.keepaliveTimeoutS < platformTimerRemain(&client->keepaliveTimer))
+    // 剩余时间小于 recvtimeout 或者 当到达 0.9 倍时间时发送心跳包
+    else if (timeRemain < client->config.recvTimeout ||
+             timeRemain < 1000 * 0.5 * client->config.keepaliveTimeoutS)
     {
-        return RyanMqttSuccessError;
-    }
+        // 消抖时间内不发送心跳包
+        if (platformTimerRemain(&client->keepaliveDebounTimer))
+            return RyanMqttSuccessError;
 
-    else
-    {
         platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
         packetLen = MQTTSerialize_pingreq((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
         RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
@@ -55,6 +61,8 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
             platformMutexUnLock(client->config.userData, &client->sendBufLock);
         });
         platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
+
+        platformTimerCutdown(&client->keepaliveDebounTimer, 1500); // 启动心跳消抖定时器
     }
 
     return RyanMqttSuccessError;