Эх сурвалжийг харах

refactor(keepAlive platform): 优化心跳保活逻辑、增加linux平台接口、平台接口优化、msg句柄函数优化

ryancw 2 жил өмнө
parent
commit
69cbf300dc

+ 7 - 1
.gitignore

@@ -2,8 +2,14 @@
 # SConscript
 
 # 平台移植层
-platform/linux/*
+# platform/linux/*
 platform/FreeRTOS/*
 
 null
 .vscode
+
+# 忽略编译生成的文件
+*.o
+*.out
+*.exe
+

+ 27 - 0
Makefile

@@ -0,0 +1,27 @@
+
+CFLAGS_INC = -I common
+CFLAGS_INC +=  -I pahoMqtt
+CFLAGS_INC +=  -I mqttclient
+CFLAGS_INC +=  -I platform/linux
+CFLAGS_INC +=  -I platform/linux/valloc
+
+src = $(wildcard ./test/*.c)
+src += $(wildcard ./common/*.c)
+src += $(wildcard ./platform/linux/*.c)
+src += $(wildcard ./platform/linux/valloc/*.c)
+src += $(wildcard ./pahoMqtt/*.c)
+src += $(wildcard ./mqttclient/*.c)
+
+obj = $(patsubst %.c, %.o, $(src))
+target = app
+CC = gcc
+
+$(target): $(obj)
+	$(CC) $(CFLAGS_INC) $(obj) -o $(target)  -lpthread
+
+%.o: %.c
+	$(CC) $(CFLAGS_INC) -c $< -o $@  -lpthread
+
+.PHONY: clean
+clean:
+	rm -rf $(obj) $(target)

+ 30 - 56
mqttclient/RyanMqttClient.c

@@ -11,7 +11,7 @@
 
 /**
  * @brief 获取报文标识符,报文标识符不可为0
- *
+ * 都在sendbuf锁内调用
  * @param client
  * @return uint16_t
  */
@@ -24,7 +24,6 @@ static uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
 
 static RyanMqttError_e setConfigValue(char **dest, char const *const rest)
 {
-
     if (NULL == dest || NULL == rest)
         return RyanMqttNoRescourceError;
 
@@ -72,17 +71,21 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
     client->sendBufLock = platformMemoryMalloc(sizeof(platformMutex_t));
     RyanMqttCheckCode(NULL != client->sendBufLock, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
     memset(client->sendBufLock, 0, sizeof(platformMutex_t));
+    platformMutexInit(client->config->userData, client->sendBufLock); // 初始化发送缓冲区互斥锁
+
+    client->criticalLock = platformMemoryMalloc(sizeof(platformCritical_t));
+    RyanMqttCheckCode(NULL != client->criticalLock, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
+    memset(client->criticalLock, 0, sizeof(platformMutex_t));
+    platformCriticalInit(client->config->userData, client->criticalLock); // 初始化临界区
 
     client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
-    client->clientState = 0;
+    client->clientState = RyanMqttInitState;
     client->eventFlag = 0;
     client->keepaliveTimeoutCount = 0;
     client->ackHandlerCount = 0;
     client->lwtFlag = RyanMqttFalse;
     client->lwtOptions = NULL;
 
-    platformMutexInit(client->config->userData, client->sendBufLock); // 初始化发送缓冲区互斥锁
-
     RyanListInit(&client->msgHandlerList);
     RyanListInit(&client->ackHandlerList);
     platformTimerInit(&client->keepaliveTimer);
@@ -95,10 +98,10 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
 
 /**
  * @brief 销毁mqtt客户端
- *  由于直接删除mqtt线程是很危险的行为。这里设置标志位,由mqtt线程自己删除自己所有资源。
- *  mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
- *  mqtt删除自己前会调用 RyanMqttEventDestoryBefore 事件回调
- *  * 调用此函数后就不应该在对该客户端进行任何操作
+ *  !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
+ *  !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
+ *  !mqtt删除自己前会调用 RyanMqttEventDestoryBefore 事件回调
+ *  !调用此函数后就不应该再对该客户端进行任何操作
  * @param client
  * @return RyanMqttError_e
  */
@@ -107,9 +110,9 @@ RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
 
     RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
 
-    platformCriticalEnter();
+    platformCriticalEnter(client->config->userData, client->criticalLock);
     client->destoryFlag = RyanMqttTrue;
-    platformCriticalExit();
+    platformCriticalExit(client->config->userData, client->criticalLock);
 
     return RyanMqttSuccessError;
 }
@@ -135,7 +138,6 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
                                 client,
                                 client->config->taskStack,
                                 client->config->taskPrio);
-
     RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttSetClientState(client, RyanMqttInitState); });
 
     return RyanMqttSuccessError;
@@ -221,7 +223,7 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
     packetLen = MQTTSerialize_subscribe((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName, (int *)&qos);
     RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
 
-    result = RyanMqttMsgHandlerCreate(topic, strlen(topic), qos, &msgHandler);
+    result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
 
     result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, msgHandler, &ackHandler);
@@ -229,22 +231,14 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
         platformMemoryFree(msgHandler);
         platformMutexUnLock(client->config->userData, client->sendBufLock);
     });
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
-
-    // 确定节点是否已存在,存在就删除
-    result = RyanMqttAckListNodeFind(client, SUBACK, packetId, &ackHandler);
-    if (RyanMqttSuccessError == result)
-    {
-        RyanMqttAckListRemove(client, ackHandler);
-        RyanMqttAckHandlerDestroy(client, ackHandler);
-    }
 
+    // 添加等待 ack
     result = RyanMqttAckListAdd(client, ackHandler);
-
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
+    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                      { RyanMqttAckListRemove(client, ackHandler);
-                        RyanMqttAckHandlerDestroy(client, ackHandler); });
+                      { RyanMqttAckListRemove(client, ackHandler);RyanMqttAckHandlerDestroy(client, ackHandler); });
+
     return result;
 }
 
@@ -283,19 +277,10 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
     result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { platformMutexUnLock(client->config->userData, client->sendBufLock); });
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
-
-    // 确定节点是否已存在,存在就删除
-    result = RyanMqttAckListNodeFind(client, UNSUBACK, packetId, &ackHandler);
-    if (RyanMqttSuccessError == result)
-    {
-        RyanMqttAckListRemove(client, ackHandler);
-        RyanMqttAckHandlerDestroy(client, ackHandler);
-    }
 
     result = RyanMqttAckListAdd(client, ackHandler);
-
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
+    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);
                         RyanMqttAckHandlerDestroy(client, ackHandler); });
@@ -353,35 +338,24 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
 
     packetLen = MQTTSerialize_publish((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
                                       topicName, (uint8_t *)payload, payloadLen);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
-                      { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
 
-    result = RyanMqttMsgHandlerCreate(topic, strlen(topic), qos, &msgHandler);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                      { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
+    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
 
     result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
         platformMemoryFree(msgHandler);
         platformMutexUnLock(client->config->userData, client->sendBufLock);
     });
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
-
-    // 确定节点是否已存在,存在就删除,理论上不会存在
-    result = RyanMqttAckListNodeFind(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, &ackHandler);
-    if (RyanMqttSuccessError == result)
-    {
-        RyanMqttAckListRemove(client, ackHandler);
-        RyanMqttAckHandlerDestroy(client, ackHandler);
-    }
 
     result = RyanMqttAckListAdd(client, ackHandler);
-
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
+    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);
                         RyanMqttAckHandlerDestroy(client, ackHandler); });
-
     // 提前设置重发标志位
     RyanMqttSetPublishDup(&ackHandler->packet[0], 1);
 
@@ -396,7 +370,6 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
  */
 RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
 {
-
     if (NULL == client)
         return RyanMqttInvalidState;
 
@@ -493,12 +466,13 @@ RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandle
 
 /**
  * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt thread线程和用户线程的资源互斥
- * todo 此函数没有实现完整------
+ *
  * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt thread处于挂起状态时调用
- * mqtt thread处于阻塞状态时调用此函数也是很危险的行为,因为无法确定此函数的执行时间,调用此函数的任务运行时间片有多少
- * 总之就是要保证mqtt线程和用户线程的资源互斥
- * 项目中用户也不应该频繁调用此函数
+ * mqtt thread处于阻塞状态时调用此函数也是很危险的行为
+ * 要保证mqtt线程和用户线程的资源互斥
+ * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
  *
+ * !项目中用户不应频繁调用此函数
  * ! 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
  *
  * @param client

+ 1 - 0
mqttclient/RyanMqttClient.h

@@ -105,6 +105,7 @@ extern "C"
         RyanMqttClientConfig_t *config;    // mqtt config
         platformThread_t *mqttThread;      // mqtt线程
         platformMutex_t *sendBufLock;      // 写缓冲区锁
+        platformCritical_t *criticalLock;  // 临界区锁
         lwtOptions_t *lwtOptions;          // 遗嘱相关配置
     } RyanMqttClient_t;
 

+ 91 - 108
mqttclient/RyanMqttThread.c

@@ -9,7 +9,14 @@
 #include "RyanMqttUtile.h"
 #include "RyanMqttThread.h"
 
-// void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData);
+void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
+{
+    // 服务器在心跳时间的1.5倍内没有收到keeplive消息则会断开连接
+    // 这里在用户设置的心跳剩余 1/4 时手动发送保活指令
+    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformTimerCutdown(&client->keepaliveTimer, client->config->keepaliveTimeoutS / 4 * 3 * 1000); // 启动心跳定时器
+    platformCriticalExit(client->config->userData, client->criticalLock);
+}
 
 /**
  * @brief mqtt心跳保活
@@ -19,31 +26,32 @@
  */
 static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 {
-    int32_t connectState = RyanMqttConnectAccepted;
-    int32_t packetLen = 0;
     RyanMqttAssert(NULL != client);
 
-    // 如果没有连接则不需要心跳保活
-    RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
-
-    // 服务器在心跳时间的1.5倍内没有收到消息则会断开连接
     // 在心跳的一半发送keepalive
     if (platformTimerRemain(&client->keepaliveTimer) != 0)
+    {
+        client->keepaliveTimeoutCount = 0;
         return RyanMqttSuccessError;
+    }
 
-    // 心跳超时,断开连接
-    connectState = RyanMqttKeepaliveTimeout;
-    RyanMqttCheckCode(2 > client->keepaliveTimeoutCount, RyanMqttKeepaliveTimeout, rlog_d,
-                      { RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState); });
+    // 发送5次都没有收到服务器的心跳保活响应,主动认为断连
+    // 其实服务器也会主动断连的
+    if (client->keepaliveTimeoutCount > 5)
+    {
+        int32_t connectState = RyanMqttKeepaliveTimeout;
+        RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
+        rlog_d("ErrorCode: %d, strError: %s", RyanMqttKeepaliveTimeout, RyanMqttStrError(RyanMqttKeepaliveTimeout));
+        return RyanMqttKeepaliveTimeout;
+    }
 
     platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-    packetLen = MQTTSerialize_pingreq((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize);
+    int32_t packetLen = MQTTSerialize_pingreq((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize);
     if (packetLen > 0)
         RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
     platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
 
     client->keepaliveTimeoutCount++;
-    platformTimerCutdown(&client->keepaliveTimer, client->config->keepaliveTimeoutS * 1000 / 2); // 启动心跳定时器
     return RyanMqttSuccessError;
 }
 
@@ -98,7 +106,7 @@ static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *c
     result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
-    // 可能会多次收到puback / pubcomp,仅在首次收到时触发发布成功回调函数
+    // 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
     result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { rlog_i("packetType: %d, packetId: %d", packetType, packetId); });
 
@@ -135,8 +143,8 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
 
     // 每次收到PUBREL都返回消息
     result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
     platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 删除pubrel记录
     result = RyanMqttAckListNodeFind(client, PUBREL, packetId, &ackHandler);
@@ -159,7 +167,6 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
 {
     RyanMqttError_e result = RyanMqttFailedError;
     uint8_t dup = 0;
-    RyanMqttBool_e fastFlag = RyanMqttFalse;
     uint8_t packetType = 0;
     uint16_t packetId = 0;
     int32_t packetLen = 0;
@@ -171,6 +178,17 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
     result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
+    // 制作确认数据包并发送
+    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+    // 序列化发布释放报文
+    packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREL, 0, packetId);
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+
+    // 每次收到PUBREC都返回ack
+    result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
+    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
     // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
     result = RyanMqttAckListNodeFind(client, PUBREC, packetId, &ackHandlerPubrec);
     if (RyanMqttSuccessError == result)
@@ -179,9 +197,20 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
         result = RyanMqttAckListNodeFind(client, PUBCOMP, packetId, &ackHandler);
         if (RyanMqttSuccessError != result)
         {
-            fastFlag = RyanMqttTrue;
+            result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
+                                              strlen(ackHandlerPubrec->msgHandler->topic),
+                                              ackHandlerPubrec->msgHandler->qos, &msgHandler);
+            RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+            // 创建一个 ACK 处理程序节点
+            result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, msgHandler, &ackHandler);
+            RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(msgHandler); });
+
+            result = RyanMqttAckListAdd(client, ackHandler);
+            RyanMqttAckListRemove(client, ackHandlerPubrec);
+            RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
         }
-        // 出现pubrec和pubcomp同时存在的情况,清除pubrec理论上不会出现
+        // 出现pubrec和pubcomp同时存在的情况,清除pubrec理论上不会出现(冗余措施)
         else
         {
             RyanMqttAckListRemove(client, ackHandlerPubrec);
@@ -189,42 +218,6 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
         }
     }
 
-    // 制作确认数据包并发送
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-    // 序列化发布释放报文
-    packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREL, 0, packetId);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
-
-    // 每次收到PUBREC都返回ack
-    result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
-
-    // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
-    if (RyanMqttTrue == fastFlag)
-    {
-        result = RyanMqttMsgHandlerCreate(ackHandlerPubrec->msgHandler->topic,
-                                          strlen(ackHandlerPubrec->msgHandler->topic),
-                                          ackHandlerPubrec->msgHandler->qos, &msgHandler);
-        RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                          { platformMutexUnLock(client->config->userData, client->sendBufLock); });
-
-        // 创建一个 ACK 处理程序节点
-        result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, msgHandler, &ackHandler);
-        RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                          { platformMemoryFree(msgHandler);
-             platformMutexUnLock(client->config->userData, client->sendBufLock); });
-    }
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
-
-    // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
-    if (RyanMqttTrue == fastFlag)
-    {
-        result = RyanMqttAckListAdd(client, ackHandler);
-        // 保证等待PUBCOMP记录成功后再清除PUBREC记录
-        RyanMqttAckListRemove(client, ackHandlerPubrec);
-        RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
-    }
-
     return result;
 }
 
@@ -266,21 +259,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
                           { platformMutexUnLock(client->config->userData, client->sendBufLock); });
 
         result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-        RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                          { platformMutexUnLock(client->config->userData, client->sendBufLock); });
         platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+        RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
         deliverMsgFlag = RyanMqttTrue;
-
         break;
 
-    case RyanMqttQos2:
-    {
-        RyanMqttBool_e fastFlag = RyanMqttFalse;
-        // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish不进行qos2消息处理
-        result = RyanMqttAckListNodeFind(client, PUBREL, msgData.packetId, &ackHandler);
-        if (RyanMqttSuccessError != result)
-            fastFlag = RyanMqttTrue;
+    case RyanMqttQos2: // qos2采用方法B
 
         platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
         packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREC, 0, msgData.packetId);
@@ -288,32 +273,23 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
                           { platformMutexUnLock(client->config->userData, client->sendBufLock); });
 
         result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-        RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                          { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+        platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+        RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
-        if (RyanMqttTrue == fastFlag)
+        // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish, 不进行qos2 PUBREC消息处理
+        result = RyanMqttAckListNodeFind(client, PUBREL, msgData.packetId, &ackHandler);
+        if (RyanMqttSuccessError != result)
         {
-            result = RyanMqttMsgHandlerCreate(topicName.lenstring.data, topicName.lenstring.len, msgData.qos, &msgHandler);
-            RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
-                platformMutexUnLock(client->config->userData, client->sendBufLock);
-            });
+            result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, msgData.qos, &msgHandler);
+            RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {});
 
             result = RyanMqttAckHandlerCreate(client, PUBREL, msgData.packetId, packetLen, msgHandler, &ackHandler);
-            RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
-                platformMemoryFree(msgHandler);
-                platformMutexUnLock(client->config->userData, client->sendBufLock);
-            });
-        }
-        platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+            RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(msgHandler); });
 
-        if (RyanMqttTrue == fastFlag)
-        {
             result = RyanMqttAckListAdd(client, ackHandler);
             deliverMsgFlag = RyanMqttTrue;
         }
-    }
-
-    break;
+        break;
 
     default:
         break;
@@ -369,12 +345,12 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
     result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, strlen(ackHandler->msgHandler->topic), RyanMqttFalse, &msgHandler);
     if (RyanMqttSuccessError == result)
     {
-        RyanMqttMsgHandlerRemove(msgHandler);
-        RyanMqttMsgHandlerDestory(msgHandler);
+        RyanMqttMsgHandlerRemove(client, msgHandler);
+        RyanMqttMsgHandlerDestory(client, msgHandler);
     }
 
     // 服务端可以授予比订阅者要求的低一些的 QoS 等级。
-    result = RyanMqttMsgHandlerCreate(ackHandler->msgHandler->topic,
+    result = RyanMqttMsgHandlerCreate(client, ackHandler->msgHandler->topic,
                                       strlen(ackHandler->msgHandler->topic),
                                       grantedQoS, &msgHandler);
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d); // 这里创建失败了不触发回调,等待ack超时触发失败回调函数
@@ -492,7 +468,8 @@ static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8
         break;
 
     case PINGRESP: // 心跳响应
-        client->keepaliveTimeoutCount = 0;
+        RyanMqttRefreshKeepaliveTime(client);
+        result = RyanMqttSuccessError;
         break;
 
     default:
@@ -536,10 +513,10 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
         switch (ackHandler->packetType)
         {
         // 发送qos1 / qos2消息, 服务器ack响应超时。需要重新发送它们。
-        case PUBACK:
-        case PUBREC:
-        case PUBREL:
-        case PUBCOMP:
+        case PUBACK:  // qos1 publish后没有收到puback
+        case PUBREC:  // qos2 publish后没有收到pubrec
+        case PUBREL:  // qos2 收到pubrec,发送pubrel后没有收到pubcomp
+        case PUBCOMP: // 理论不会出现,冗余措施
         {
 
             if (RyanMqttConnectState != RyanMqttGetClientState(client))
@@ -634,19 +611,18 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
 
     // 发送序列化mqtt的CONNECT报文
     result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 等待报文
     // mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
     result = RyanMqttReadPacketHandler(client, &packetType);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
-    RyanMqttCheckCode(CONNACK == packetType, RyanMqttConnectDisconnected, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    RyanMqttCheck(CONNACK == packetType, RyanMqttConnectDisconnected, rlog_d);
 
     // 解析CONNACK报文
     result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
-    RyanMqttCheckCode(1 == result, RyanMqttDeserializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
     rlog_i("result: %d, packetLen: %d, packetType: %d connackRc: %d", result, packetLen, packetType, connackRc);
 
     return connackRc;
@@ -661,17 +637,16 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
  */
 void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
 {
-
     RyanMqttAssert(NULL != client);
     RyanMqttAssert(NULL != client->network);
     RyanMqttAssert(NULL != client->config);
 
     switch (eventId)
     {
-    case RyanMqttEventConnected:                                                                       // 连接成功
-        client->keepaliveTimeoutCount = 0;                                                             // 重置心跳超时计数器
-        platformTimerCutdown(&client->keepaliveTimer, (client->config->keepaliveTimeoutS * 1000 / 2)); // 启动心跳定时器
-        RyanMqttAckListScan(client, RyanMqttFalse);                                                    // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
+    case RyanMqttEventConnected:           // 第一次连接成功
+        client->keepaliveTimeoutCount = 0; // 重置心跳超时计数器
+        RyanMqttRefreshKeepaliveTime(client);
+        RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
         RyanMqttSetClientState(client, RyanMqttConnectState);
         break;
 
@@ -729,14 +704,6 @@ void RyanMqttThread(void *argument)
                 client->network = NULL;
             }
 
-            // 清除互斥锁
-            if (NULL != client->sendBufLock)
-            {
-                platformMutexDestroy(client->config->userData, client->sendBufLock);
-                platformMemoryFree(client->sendBufLock);
-                client->sendBufLock = NULL;
-            }
-
             // 清除config信息
             if (NULL != client->config)
             {
@@ -774,6 +741,22 @@ void RyanMqttThread(void *argument)
             // 清除session  ack链表和msg链表
             RyanMqttCleanSession(client);
 
+            // 清除互斥锁
+            if (NULL != client->sendBufLock)
+            {
+                platformMutexDestroy(client->config->userData, client->sendBufLock);
+                platformMemoryFree(client->sendBufLock);
+                client->sendBufLock = NULL;
+            }
+
+            // 清除临界区
+            if (NULL != client->criticalLock)
+            {
+                platformCriticalDestroy(client->config->userData, client->criticalLock);
+                platformMemoryFree(client->criticalLock);
+                client->criticalLock = NULL;
+            }
+
             platformThread_t mqttThread = *client->mqttThread;
             void *userData = client->config->userData;
 

+ 1 - 0
mqttclient/RyanMqttThread.h

@@ -15,6 +15,7 @@ extern "C"
 
     extern void RyanMqttThread(void *argument);
     extern void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData);
+    extern void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client);
 
 #ifdef __cplusplus
 }

+ 19 - 18
mqttclient/RyanMqttUtile.c

@@ -119,6 +119,7 @@ RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, char *sendBuf, int3
     {
     case RyanMqttSuccessError:
     case RyanMqttSendPacketTimeOutError:
+        RyanMqttRefreshKeepaliveTime(client); // 只要发送数据就刷新 keepalive 时间,可以降低一些心智负担
         return result;
 
     case RyanSocketFailedError:
@@ -139,9 +140,9 @@ void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state)
 {
     RyanMqttAssert(NULL != client);
 
-    platformCriticalEnter();
+    platformCriticalEnter(client->config->userData, client->criticalLock);
     client->clientState = state;
-    platformCriticalExit();
+    platformCriticalExit(client->config->userData, client->criticalLock);
 }
 
 /**
@@ -153,7 +154,7 @@ void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state)
 RyanMqttState_e RyanMqttGetClientState(RyanMqttClient_t *client)
 {
     RyanMqttAssert(NULL != client);
-    return client->clientState; // 原子操作不必互斥
+    return client->clientState;
 }
 
 /**
@@ -310,7 +311,7 @@ RyanMqttBool_e RyanMqttMatchTopic(const char *topic,
  * @param pMsgHandler
  * @return RyanMqttError_e
  */
-RyanMqttError_e RyanMqttMsgHandlerCreate(char *topic, uint16_t topicLen, RyanMqttQos_e qos, RyanMqttMsgHandler_t **pMsgHandler)
+RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, char *topic, uint16_t topicLen, RyanMqttQos_e qos, RyanMqttMsgHandler_t **pMsgHandler)
 {
     RyanMqttError_e result = RyanMqttSuccessError;
     RyanMqttMsgHandler_t *msgHandler = NULL;
@@ -338,7 +339,7 @@ RyanMqttError_e RyanMqttMsgHandlerCreate(char *topic, uint16_t topicLen, RyanMqt
  *
  * @param msgHandler
  */
-void RyanMqttMsgHandlerDestory(RyanMqttMsgHandler_t *msgHandler)
+void RyanMqttMsgHandlerDestory(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
 {
     RyanMqttAssert(NULL != msgHandler);
     RyanMqttAssert(NULL != msgHandler->topic);
@@ -413,9 +414,9 @@ RyanMqttError_e RyanMqttMsgHandlerAdd(RyanMqttClient_t *client, RyanMqttMsgHandl
     RyanMqttAssert(NULL != msgHandler);
     RyanMqttAssert(NULL != msgHandler->topic);
 
-    platformCriticalEnter();
+    platformCriticalEnter(client->config->userData, client->criticalLock);
     RyanListAddTail(&msgHandler->list, &client->msgHandlerList); // 将msgHandler节点添加到链表尾部
-    platformCriticalExit();
+    platformCriticalExit(client->config->userData, client->criticalLock);
 
     return RyanMqttSuccessError;
 }
@@ -427,14 +428,14 @@ RyanMqttError_e RyanMqttMsgHandlerAdd(RyanMqttClient_t *client, RyanMqttMsgHandl
  * @param msgHandler
  * @return int32_t
  */
-RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttMsgHandler_t *msgHandler)
+RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler)
 {
     RyanMqttAssert(NULL != msgHandler);
     RyanMqttAssert(NULL != msgHandler->topic);
 
-    platformCriticalEnter();
+    platformCriticalEnter(client->config->userData, client->criticalLock);
     RyanListDel(&msgHandler->list);
-    platformCriticalExit();
+    platformCriticalExit(client->config->userData, client->criticalLock);
 
     return RyanMqttSuccessError;
 }
@@ -492,8 +493,8 @@ void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *a
     RyanMqttAssert(NULL != ackHandler->msgHandler);
     RyanMqttAssert(NULL != ackHandler->msgHandler->topic);
 
-    RyanMqttMsgHandlerRemove(ackHandler->msgHandler);
-    RyanMqttMsgHandlerDestory(ackHandler->msgHandler); // 释放msgHandler
+    RyanMqttMsgHandlerRemove(client, ackHandler->msgHandler);
+    RyanMqttMsgHandlerDestory(client, ackHandler->msgHandler); // 释放msgHandler
     platformMemoryFree(ackHandler);
 }
 
@@ -547,10 +548,10 @@ RyanMqttError_e RyanMqttAckListAdd(RyanMqttClient_t *client, RyanMqttAckHandler_
     RyanMqttAssert(NULL != ackHandler->msgHandler->topic);
 
     // 将ack节点添加到链表尾部
-    platformCriticalEnter();
+    platformCriticalEnter(client->config->userData, client->criticalLock);
     RyanListAddTail(&ackHandler->list, &client->ackHandlerList);
     client->ackHandlerCount++;
-    platformCriticalExit();
+    platformCriticalExit(client->config->userData, client->criticalLock);
 
     if (client->ackHandlerCount >= client->config->ackHandlerCountWarning)
         RyanMqttEventMachine(client, RyanMqttEventAckCountWarning, (void *)&client->ackHandlerCount);
@@ -573,11 +574,11 @@ RyanMqttError_e RyanMqttAckListRemove(RyanMqttClient_t *client, RyanMqttAckHandl
     RyanMqttAssert(NULL != ackHandler->msgHandler->topic);
 
     // 将ack节点添加到链表尾部
-    platformCriticalEnter();
+    platformCriticalEnter(client->config->userData, client->criticalLock);
     RyanListDel(&ackHandler->list);
     if (client->ackHandlerCount > 0)
         client->ackHandlerCount--;
-    platformCriticalExit();
+    platformCriticalExit(client->config->userData, client->criticalLock);
 
     return RyanMqttSuccessError;
 }
@@ -613,8 +614,8 @@ void RyanMqttCleanSession(RyanMqttClient_t *client)
         RyanListForEachSafe(curr, next, &client->msgHandlerList)
         {
             msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
-            RyanMqttMsgHandlerRemove(msgHandler);
-            RyanMqttMsgHandlerDestory(msgHandler);
+            RyanMqttMsgHandlerRemove(client, msgHandler);
+            RyanMqttMsgHandlerDestory(client, msgHandler);
         }
         RyanListDelInit(&client->msgHandlerList);
     }

+ 3 - 3
mqttclient/RyanMqttUtile.h

@@ -20,11 +20,11 @@ extern "C"
     extern RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, char *buf, int32_t length);
     extern RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, char *buf, int32_t length);
 
-    extern RyanMqttError_e RyanMqttMsgHandlerCreate(char *topic, uint16_t topicLen, RyanMqttQos_e qos, RyanMqttMsgHandler_t **pMsgHandler);
-    extern void RyanMqttMsgHandlerDestory(RyanMqttMsgHandler_t *msgHandler);
+    extern RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, char *topic, uint16_t topicLen, RyanMqttQos_e qos, RyanMqttMsgHandler_t **pMsgHandler);
+    extern void RyanMqttMsgHandlerDestory(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
     extern RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, char *topic, uint16_t topicLen, RyanMqttBool_e topicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler);
     extern RyanMqttError_e RyanMqttMsgHandlerAdd(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
-    extern RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttMsgHandler_t *msgHandler);
+    extern RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 
     extern RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId, uint16_t packetLen, RyanMqttMsgHandler_t *msgHandler, RyanMqttAckHandler_t **pAckHandler);
     extern void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);

+ 306 - 0
platform/linux/platformNetwork.c

@@ -0,0 +1,306 @@
+#define rlogEnable 1               // 是否使能日志
+#define rlogColorEnable 1          // 是否使能日志颜色
+#define rlogLevel (rlogLvlWarning) // 日志打印等级
+#define rlogTag "RyanMqttNet"      // 日志tag
+
+#include "platformNetwork.h"
+#include "RyanMqttLog.h"
+
+/**
+ * @brief 连接mqtt服务器
+ *
+ * @param userData
+ * @param platformNetwork
+ * @param host
+ * @param port
+ * @return RyanMqttError_e
+ * 成功返回RyanMqttSuccessError, 失败返回错误信息
+ */
+RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, const char *port)
+{
+    RyanMqttError_e result = RyanMqttSuccessError;
+
+    // ?线程安全版本,有些设备没有实现,默认不启用。如果涉及多个客户端解析域名请使用线程安全版本
+    // char buf[256];
+    // int ret;
+    // struct hostent hostinfo, *phost;
+
+    // if (0 != gethostbyname_r(host, &hostinfo, buf, sizeof(buf), &phost, &ret))
+    // {
+    //     result = RyanSocketFailedError;
+    //     goto exit;
+    // }
+
+    // platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+    // if (platformNetwork->socket < 0)
+    // {
+    //     result = RyanSocketFailedError;
+    //     goto exit;
+    // }
+
+    // struct sockaddr_in server_addr;
+    // memset(&server_addr, 0, sizeof(server_addr));
+    // server_addr.sin_family = AF_INET;
+    // server_addr.sin_port = htons(atoi(port)); // 指定端口号,这里使用HTTP默认端口80
+    // server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
+
+    // // 绑定套接字到主机地址和端口号
+    // if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
+    // {
+    //     platformNetworkClose(userData, platformNetwork);
+    //     result = RyanMqttSocketConnectFailError;
+    //     goto exit;
+    // }
+
+    // 非线程安全版本,请根据实际情况选择使用
+    struct hostent *hostinfo;
+    hostinfo = gethostbyname(host);
+    if (NULL == hostinfo)
+    {
+        result = RyanSocketFailedError;
+        goto exit;
+    }
+
+    platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+    if (platformNetwork->socket < 0)
+    {
+        result = RyanSocketFailedError;
+        goto exit;
+    }
+
+    struct sockaddr_in server_addr;
+    memset(&server_addr, 0, sizeof(server_addr));
+    server_addr.sin_family = AF_INET;
+    server_addr.sin_port = htons(atoi(port)); // 指定端口号,这里使用HTTP默认端口80
+    server_addr.sin_addr = *((struct in_addr *)hostinfo->h_addr_list[0]);
+
+    // 绑定套接字到主机地址和端口号
+    if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
+    {
+        platformNetworkClose(userData, platformNetwork);
+        result = RyanMqttSocketConnectFailError;
+        goto exit;
+    }
+
+exit:
+    return result;
+}
+
+/**
+ * @brief 非阻塞接收数据
+ *
+ * @param userData
+ * @param platformNetwork
+ * @param recvBuf
+ * @param recvLen
+ * @param timeout
+ * @return RyanMqttError_e
+ * socket错误返回 RyanSocketFailedError
+ * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
+ * 接收成功 RyanMqttSuccessError
+ */
+RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
+{
+
+    int32_t recvResult = 0;
+    int32_t offset = 0;
+    int32_t timeOut2 = timeout;
+    struct timeval tv = {0};
+    platformTimer_t timer = {0};
+
+    if (-1 == platformNetwork->socket)
+        return RyanSocketFailedError;
+
+    platformTimerCutdown(&timer, timeout);
+
+    while ((offset < recvLen) && (0 != timeOut2))
+    {
+
+        tv.tv_sec = timeOut2 / 1000;
+        tv.tv_usec = timeOut2 % 1000 * 1000;
+
+        if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
+        {
+            tv.tv_sec = 0;
+            tv.tv_usec = 100;
+        }
+
+        setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
+
+        recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
+
+        if (recvResult <= 0) // 小于零,表示错误,个别错误不代表socket错误
+        {
+            // 下列3种表示没问题,但需要推出发送
+            if ((errno == EAGAIN ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+                 errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
+                 errno == EINTR))        // 操作被信号中断
+                break;
+
+            return RyanSocketFailedError;
+        }
+
+        offset += recvResult;
+        timeOut2 = platformTimerRemain(&timer);
+    }
+
+    if (offset != recvLen)
+        return RyanMqttRecvPacketTimeOutError;
+
+    return RyanMqttSuccessError;
+
+    // int32_t recvResult = 0;
+    // int32_t offset = 0;
+    // int32_t timeOut2 = timeout;
+    // struct timeval tv = {0};
+    // platformTimer_t timer = {0};
+
+    // if (-1 == platformNetwork->socket)
+    //     return RyanSocketFailedError;
+
+    // platformTimerCutdown(&timer, timeout);
+
+    // while ((offset < recvLen) && (0 != timeOut2))
+    // {
+
+    //     tv.tv_sec = timeOut2 / 1000;
+    //     tv.tv_usec = timeOut2 % 1000 * 1000;
+
+    //     if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
+    //     {
+    //         tv.tv_sec = 0;
+    //         tv.tv_usec = 100;
+    //     }
+
+    //     fd_set readset;
+    //     int i, maxfdp1;
+
+    //     /* 清空可读事件描述符列表 */
+    //     FD_ZERO(&readset);
+
+    //     /* 将需要监听可读事件的描述符加入列表 */
+    //     FD_SET(platformNetwork->socket, &readset);
+
+    //     /* 等待设定的网络描述符有事件发生 */
+    //     i = select(platformNetwork->socket + 1, &readset, RT_NULL, RT_NULL, &tv);
+    //     if (i < 0)
+    //     {
+    //         // 下列3种表示没问题,但需要退出接收
+    //         if ((errno == EAGAIN ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+    //              errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
+    //              errno == EINTR))        // 操作被信号中断
+    //             break;
+
+    //         return RyanSocketFailedError;
+    //     }
+
+    //     /* 查看 sock 描述符上有没有发生可读事件 */
+    //     if (i > 0 && FD_ISSET(platformNetwork->socket, &readset))
+    //     {
+    //         recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
+
+    //         if (recvResult <= 0) // 小于零,表示错误,个别错误不代表socket错误
+    //         {
+    //             // 下列3种表示没问题,但需要退出接收
+    //             if ((errno == EAGAIN ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+    //                  errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
+    //                  errno == EINTR))        // 操作被信号中断
+    //                 break;
+
+    //             return RyanSocketFailedError;
+    //         }
+
+    //         offset += recvResult;
+    //     }
+
+    //     timeOut2 = platformTimerRemain(&timer);
+    // }
+
+    // if (offset != recvLen)
+    //     return RyanMqttRecvPacketTimeOutError;
+
+    // return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 非阻塞发送数据
+ *
+ * @param userData
+ * @param platformNetwork
+ * @param sendBuf
+ * @param sendLen
+ * @param timeout
+ * @return RyanMqttError_e
+ * socket错误返回 RyanSocketFailedError
+ * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
+ * 接收成功 RyanMqttSuccessError
+ */
+RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
+{
+
+    int32_t sendResult = 0;
+    int32_t offset = 0;
+    int32_t timeOut2 = timeout;
+    struct timeval tv = {0};
+    platformTimer_t timer = {0};
+
+    if (-1 == platformNetwork->socket)
+        return RyanSocketFailedError;
+
+    platformTimerCutdown(&timer, timeout);
+
+    while ((offset < sendLen) && (0 != timeOut2))
+    {
+
+        tv.tv_sec = timeOut2 / 1000;
+        tv.tv_usec = timeOut2 % 1000 * 1000;
+
+        if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
+        {
+            tv.tv_sec = 0;
+            tv.tv_usec = 100;
+        }
+
+        setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
+
+        sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0);
+
+        if (sendResult <= 0) // 小于零,表示错误,个别错误不代表socket错误
+        {
+            // 下列3种表示没问题,但需要推出发送
+            if ((errno == EAGAIN ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+                 errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
+                 errno == EINTR))        // 操作被信号中断
+                break;
+
+            return RyanSocketFailedError;
+        }
+
+        offset += sendResult;
+        timeOut2 = platformTimerRemain(&timer);
+    }
+
+    if (offset != sendLen)
+        return RyanMqttSendPacketTimeOutError;
+
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 断开mqtt服务器连接
+ *
+ * @param userData
+ * @param platformNetwork
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
+{
+
+    if (platformNetwork->socket >= 0)
+    {
+        close(platformNetwork->socket);
+        platformNetwork->socket = -1;
+    }
+
+    return RyanMqttSuccessError;
+}

+ 45 - 0
platform/linux/platformNetwork.h

@@ -0,0 +1,45 @@
+
+
+#ifndef __platformNetSocket__
+#define __platformNetSocket__
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/param.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include "platformTimer.h"
+#include "RyanMqttPublic.h"
+
+    typedef struct
+    {
+        int socket;
+    } platformNetwork_t;
+
+    extern RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, const char *port);
+    extern RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout);
+    extern RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout);
+    extern RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 226 - 0
platform/linux/platformSystem.c

@@ -0,0 +1,226 @@
+
+#include "platformSystem.h"
+
+/**
+ * @brief 申请内存
+ *
+ * @param size
+ * @return void*
+ */
+inline void *platformMemoryMalloc(size_t size)
+{
+    return malloc(size);
+}
+
+/**
+ * @brief 释放内存
+ *
+ * @param ptr
+ */
+inline void platformMemoryFree(void *ptr)
+{
+    free(ptr);
+}
+
+/**
+ * @brief ms延时
+ *
+ * @param ms
+ */
+inline void platformDelay(uint32_t ms)
+{
+    usleep(ms * 1000);
+}
+
+/**
+ * @brief 打印字符串函数,可通过串口打印出去
+ *
+ * @param str
+ * @param strLen
+ */
+inline void platformPrint(char *str, uint16_t strLen)
+{
+    printf("%.*s", strLen, str);
+}
+
+/**
+ * @brief 初始化并运行线程
+ *
+ * @param userData
+ * @param platformThread
+ * @param name
+ * @param entry
+ * @param param
+ * @param stackSize
+ * @param priority
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformThreadInit(void *userData,
+                                   platformThread_t *platformThread,
+                                   const char *name,
+                                   void (*entry)(void *),
+                                   void *const param,
+                                   uint32_t stackSize,
+                                   uint32_t priority)
+{
+
+    pthread_attr_t attr = {0};
+    pthread_attr_init(&attr);
+    pthread_attr_setstacksize(&attr, stackSize);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 设置为分离状态
+
+    int ret = pthread_create(&platformThread->thread, &attr, entry, param);
+    if (0 != ret)
+        return RyanMqttNoRescourceError;
+
+    pthread_mutex_init(&platformThread->mutex, NULL);
+
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 销毁自身线程
+ *
+ * @param userData
+ * @param platformThread
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformThreadDestroy(void *userData, platformThread_t *platformThread)
+{
+    pthread_exit(NULL);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 开启线程
+ *
+ * @param userData
+ * @param platformThread
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformThreadStart(void *userData, platformThread_t *platformThread)
+{
+    pthread_mutex_lock(&platformThread->mutex);
+    pthread_cond_signal(&platformThread->cond);
+    pthread_mutex_unlock(&platformThread->mutex);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 挂起线程
+ *
+ * @param userData
+ * @param platformThread
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformThreadStop(void *userData, platformThread_t *platformThread)
+{
+    pthread_mutex_lock(&platformThread->mutex);
+    pthread_cond_wait(&platformThread->cond, &platformThread->mutex);
+    pthread_mutex_unlock(&platformThread->mutex);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 互斥锁初始化
+ *
+ * @param userData
+ * @param platformMutex
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformMutexInit(void *userData, platformMutex_t *platformMutex)
+{
+    pthread_mutex_init(&platformMutex->mutex, NULL);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 销毁互斥锁
+ *
+ * @param userData
+ * @param platformMutex
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformMutexDestroy(void *userData, platformMutex_t *platformMutex)
+{
+    pthread_mutex_destroy(&platformMutex->mutex);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 阻塞获取互斥锁
+ *
+ * @param userData
+ * @param platformMutex
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex)
+{
+    pthread_mutex_lock(&platformMutex->mutex); // 互斥锁上锁
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 释放互斥锁
+ *
+ * @param userData
+ * @param platformMutex
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMutex)
+{
+    pthread_mutex_unlock(&platformMutex->mutex); // 互斥锁解锁
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 临界区初始化
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformCriticalInit(void *userData, platformCritical_t *platformCritical)
+{
+    pthread_spin_init(&platformCritical->spin, PTHREAD_PROCESS_PRIVATE);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 销毁临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformCriticalDestroy(void *userData, platformCritical_t *platformCritical)
+{
+    pthread_spin_destroy(&platformCritical->spin);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 进入临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+inline RyanMqttError_e platformCriticalEnter(void *userData, platformCritical_t *platformCritical)
+{
+    pthread_spin_lock(&platformCritical->spin);
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 退出临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+inline RyanMqttError_e platformCriticalExit(void *userData, platformCritical_t *platformCritical)
+{
+    pthread_spin_unlock(&platformCritical->spin);
+    return RyanMqttSuccessError;
+}

+ 68 - 0
platform/linux/platformSystem.h

@@ -0,0 +1,68 @@
+
+#ifndef __platformSystem__
+#define __platformSystem__
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <assert.h>
+#include <pthread.h>
+#include <unistd.h>
+#include "RyanMqttPublic.h"
+#include "valloc.h"
+
+#define RyanMqttAssert(EX) assert(EX)
+
+    typedef struct
+    {
+        pthread_t thread;
+        pthread_mutex_t mutex;
+        pthread_cond_t cond;
+    } platformThread_t;
+
+    typedef struct
+    {
+        pthread_mutex_t mutex;
+    } platformMutex_t;
+
+    typedef struct
+    {
+        pthread_spinlock_t spin;
+    } platformCritical_t;
+
+    extern void *platformMemoryMalloc(size_t size);
+    extern void platformMemoryFree(void *ptr);
+
+    extern void platformPrint(char *str, uint16_t strLen);
+    extern void platformDelay(uint32_t ms);
+
+    extern RyanMqttError_e platformThreadInit(void *userData,
+                                              platformThread_t *platformThread,
+                                              const char *name,
+                                              void (*entry)(void *),
+                                              void *const param,
+                                              uint32_t stackSize,
+                                              uint32_t priority);
+    extern RyanMqttError_e platformThreadDestroy(void *userData, platformThread_t *platformThread);
+    extern RyanMqttError_e platformThreadStart(void *userData, platformThread_t *platformThread);
+    extern RyanMqttError_e platformThreadStop(void *userData, platformThread_t *platformThread);
+
+    extern RyanMqttError_e platformMutexInit(void *userData, platformMutex_t *platformMutex);
+    extern RyanMqttError_e platformMutexDestroy(void *userData, platformMutex_t *platformMutex);
+    extern RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex);
+    extern RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMutex);
+
+    extern RyanMqttError_e platformCriticalInit(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalDestroy(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalEnter(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalExit(void *userData, platformCritical_t *platformCritical);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 42 - 0
platform/linux/platformTimer.c

@@ -0,0 +1,42 @@
+
+
+#include "platformTimer.h"
+
+/**
+ * @brief 初始化定时器
+ *
+ * @param platformTimer
+ */
+void platformTimerInit(platformTimer_t *platformTimer)
+{
+    platformTimer->time.tv_sec = 0;
+    platformTimer->time.tv_usec = 0;
+}
+
+/**
+ * @brief 添加计数时间
+ *
+ * @param platformTimer
+ * @param timeout
+ */
+void platformTimerCutdown(platformTimer_t *platformTimer, uint32_t timeout)
+{
+    struct timeval now = {0};
+    gettimeofday(&now, NULL);
+    struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
+    timeradd(&now, &interval, &platformTimer->time);
+}
+
+/**
+ * @brief 计算time还有多长时间超时,考虑了32位溢出判断
+ *
+ * @param platformTimer
+ * @return uint32_t 返回剩余时间,超时返回0
+ */
+uint32_t platformTimerRemain(platformTimer_t *platformTimer)
+{
+    struct timeval now = {0}, res = {0};
+    gettimeofday(&now, NULL);
+    timersub(&platformTimer->time, &now, &res);
+    return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
+}

+ 26 - 0
platform/linux/platformTimer.h

@@ -0,0 +1,26 @@
+
+
+#ifndef __platformTimer__
+#define __platformTimer__
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include <stdint.h>
+#include <stddef.h>
+#include <sys/time.h>
+
+    typedef struct
+    {
+        struct timeval time;
+    } platformTimer_t;
+
+    extern void platformTimerInit(platformTimer_t *platformTimer);
+    extern void platformTimerCutdown(platformTimer_t *platformTimer, uint32_t timeout);
+    extern uint32_t platformTimerRemain(platformTimer_t *platformTimer);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 97 - 0
platform/linux/valloc/valloc.c

@@ -0,0 +1,97 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+
+static pthread_mutex_t mutex;
+static int count = 0;
+static int use = 0;
+
+void *v_malloc(size_t size)
+{
+    void *p;
+    p = malloc(size ? size + sizeof(int) : 0);
+    if (!p)
+        return NULL;
+
+    pthread_mutex_lock(&mutex); // 互斥锁上锁
+    count++;
+    *(int *)p = size;
+    use += size;
+    pthread_mutex_unlock(&mutex); // 互斥锁解锁
+    return (void *)((char *)p + sizeof(int));
+}
+
+void *v_calloc(size_t num, size_t size)
+{
+    void *p;
+    p = v_malloc(num * size);
+    if (!p)
+        return NULL;
+    memset(p, 0, num * size);
+    return p;
+}
+
+void v_free(void *block)
+{
+    void *p;
+    if (!block)
+        return;
+    p = (void *)((char *)block - sizeof(int));
+
+    pthread_mutex_lock(&mutex); // 互斥锁上锁
+    use -= *(int *)p;
+    count--;
+    pthread_mutex_unlock(&mutex); // 互斥锁解锁
+
+    free(p);
+}
+
+void *v_realloc(void *block, size_t size)
+{
+    void *p;
+    int s = 0;
+    if (block)
+    {
+        block = (void *)((char *)block - sizeof(int));
+        s = *(int *)block;
+    }
+    p = realloc(block, size ? size + sizeof(int) : 0);
+    if (!p)
+        return NULL;
+
+    pthread_mutex_lock(&mutex); // 互斥锁上锁
+    if (!block)
+        count++;
+    *(int *)p = size;
+    use += (size - s);
+    pthread_mutex_unlock(&mutex); // 互斥锁解锁
+
+    return (void *)((char *)p + sizeof(int));
+}
+
+int v_mcheck(int *_count, int *_use)
+{
+    pthread_mutex_lock(&mutex); // 互斥锁上锁
+    if (_count)
+        *_count = count;
+    if (_use)
+        *_use = use;
+    pthread_mutex_unlock(&mutex); // 互斥锁解锁
+
+    return 0;
+}
+
+void displayMem()
+{
+    int area = 0, use = 0;
+    v_mcheck(&area, &use);
+    printf("|||----------->>> area = %d, size = %d\r\n", area, use);
+}
+
+void vallocInit()
+{
+    /* 初始化互斥锁 */
+    pthread_mutex_init(&mutex, NULL);
+}

+ 38 - 0
platform/linux/valloc/valloc.h

@@ -0,0 +1,38 @@
+/*********************************************************************************************************
+ *  ------------------------------------------------------------------------------------------------------
+ *  file description
+ *  ------------------------------------------------------------------------------------------------------
+ *         \file  valloc.h
+ *         \unit  valloc
+ *        \brief  Test how much space is allocated
+ *       \author  Lamdonn
+ *      \details  v1.0.0
+ ********************************************************************************************************/
+#ifndef __valloc_H
+#define __valloc_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include <stdlib.h>
+
+    void *v_malloc(size_t size);
+    void *v_calloc(size_t num, size_t size);
+    void v_free(void *block);
+    void *v_realloc(void *block, size_t size);
+    int v_mcheck(int *_count, int *_use);
+    void displayMem();
+    void vallocInit();
+
+#define malloc v_malloc
+#define calloc v_calloc
+#define free v_free
+#define realloc v_realloc
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 36 - 4
platform/quecOpen/platformSystem.c

@@ -174,19 +174,51 @@ RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMut
 }
 
 /**
- * @brief 进入临界区 / 关中断
+ * @brief 临界区初始化
  *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformCriticalInit(void *userData, platformCritical_t *platformCritical)
+{
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 销毁临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformCriticalDestroy(void *userData, platformCritical_t *platformCritical)
+{
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 进入临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
  */
-void platformCriticalEnter(void)
+inline RyanMqttError_e platformCriticalEnter(void *userData, platformCritical_t *platformCritical)
 {
     osKernelLock();
+    return RyanMqttSuccessError;
 }
 
 /**
- * @brief 退出临界区 / 开中断
+ * @brief 退出临界区
  *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
  */
-void platformCriticalExit(void)
+inline RyanMqttError_e platformCriticalExit(void *userData, platformCritical_t *platformCritical)
 {
     osKernelUnlock();
+    return RyanMqttSuccessError;
 }

+ 9 - 2
platform/quecOpen/platformSystem.h

@@ -26,6 +26,11 @@ extern "C"
         osMutexId_t mutex;
     } platformMutex_t;
 
+    typedef struct
+    {
+
+    } platformCritical_t;
+
     extern void *platformMemoryMalloc(size_t size);
     extern void platformMemoryFree(void *ptr);
 
@@ -48,8 +53,10 @@ extern "C"
     extern RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex);
     extern RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMutex);
 
-    extern void platformCriticalEnter(void);
-    extern void platformCriticalExit(void);
+    extern RyanMqttError_e platformCriticalInit(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalDestroy(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalEnter(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalExit(void *userData, platformCritical_t *platformCritical);
 
 #ifdef __cplusplus
 }

+ 1 - 2
platform/quecOpen/platformTimer.c

@@ -15,8 +15,7 @@ uint32_t platformUptimeMs(void)
 }
 
 /**
- * @brief 初始化定时器,没有使用,
- * timer结构体比较简单,没有做init和destory。看后面需求
+ * @brief 初始化定时器
  *
  * @param platformTimer
  */

+ 36 - 4
platform/rtthread/platformSystem.c

@@ -172,19 +172,51 @@ RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMut
 }
 
 /**
- * @brief 进入临界区 / 关中断
+ * @brief 临界区初始化
  *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformCriticalInit(void *userData, platformCritical_t *platformCritical)
+{
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 销毁临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e platformCriticalDestroy(void *userData, platformCritical_t *platformCritical)
+{
+    return RyanMqttSuccessError;
+}
+
+/**
+ * @brief 进入临界区
+ *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
  */
-void platformCriticalEnter(void)
+inline RyanMqttError_e platformCriticalEnter(void *userData, platformCritical_t *platformCritical)
 {
     rt_enter_critical();
+    return RyanMqttSuccessError;
 }
 
 /**
- * @brief 退出临界区 / 开中断
+ * @brief 退出临界区
  *
+ * @param userData
+ * @param platformCritical
+ * @return RyanMqttError_e
  */
-void platformCriticalExit(void)
+inline RyanMqttError_e platformCriticalExit(void *userData, platformCritical_t *platformCritical)
 {
     rt_exit_critical();
+    return RyanMqttSuccessError;
 }

+ 8 - 2
platform/rtthread/platformSystem.h

@@ -29,6 +29,10 @@ extern "C"
         rt_mutex_t mutex;
     } platformMutex_t;
 
+    typedef struct
+    {
+    } platformCritical_t;
+
     extern void *platformMemoryMalloc(size_t size);
     extern void platformMemoryFree(void *ptr);
 
@@ -51,8 +55,10 @@ extern "C"
     extern RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex);
     extern RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMutex);
 
-    extern void platformCriticalEnter(void);
-    extern void platformCriticalExit(void);
+    extern RyanMqttError_e platformCriticalInit(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalDestroy(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalEnter(void *userData, platformCritical_t *platformCritical);
+    extern RyanMqttError_e platformCriticalExit(void *userData, platformCritical_t *platformCritical);
 
 #ifdef __cplusplus
 }

+ 1 - 2
platform/rtthread/platformTimer.c

@@ -15,8 +15,7 @@ uint32_t platformUptimeMs(void)
 }
 
 /**
- * @brief 初始化定时器,没有使用,
- * timer结构体比较简单,没有做init和destory。看后面需求
+ * @brief 初始化定时器
  *
  * @param platformTimer
  */

+ 608 - 0
test/RyanMqttTestLinux.c

@@ -0,0 +1,608 @@
+
+#define RyanMqttClientId ("RyanMqttTest1") // 填写mqtt客户端id,要求唯一
+#define RyanMqttHost ("39.164.131.143")    // 填写你的mqtt服务器ip
+#define RyanMqttPort ("1883")              // mqtt服务器端口
+#define RyanMqttUserName ("test")          // 为空时填写""
+#define RyanMqttPassword ("test")          // 为空时填写""
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <semaphore.h>
+
+#define rlogEnable 1             // 是否使能日志
+#define rlogColorEnable 1        // 是否使能日志颜色
+#define rlogLevel (rlogLvlDebug) // 日志打印等级
+#define rlogTag "RyanMqttTest"   // 日志tag
+#include "RyanMqttLog.h"
+#include "RyanMqttClient.h"
+
+#define delay(ms) usleep(ms * 1000)
+
+#define checkMemory                                                          \
+    do                                                                       \
+    {                                                                        \
+        int area = 0, use = 0;                                               \
+        v_mcheck(&area, &use);                                               \
+        if (area != 0 || use != 0)                                           \
+        {                                                                    \
+            rlog_e("内存泄漏");                                              \
+            while (1)                                                        \
+            {                                                                \
+                int area = 0, use = 0;                                       \
+                v_mcheck(&area, &use);                                       \
+                rlog_w("|||----------->>> area = %d, size = %d", area, use); \
+                delay(3000);                                                 \
+            }                                                                \
+        }                                                                    \
+    } while (0)
+
+static uint32_t mqttTest[10] = {0};
+#define dataEventCount (0)      // 接收到数据次数统计
+#define PublishedEventCount (1) // qos1和qos2发布成功的次数统计
+
+static void printfArrStr(char *buf, uint32_t len, char *userData)
+{
+    rlog_raw("%s", userData);
+    for (uint32_t i = 0; i < len; i++)
+        rlog_raw("%x", buf[i]);
+
+    rlog_raw("\r\n");
+}
+
+/**
+ * @brief mqtt事件回调处理函数
+ * 事件的详细定义可以查看枚举定义
+ *
+ * @param pclient
+ * @param event
+ * @param eventData 查看事件枚举,后面有说明eventData是什么类型
+ */
+static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void const *eventData)
+{
+    RyanMqttClient_t *client = (RyanMqttClient_t *)pclient;
+
+    switch (event)
+    {
+    case RyanMqttEventError:
+        break;
+
+    case RyanMqttEventConnected: // 不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题
+        rlog_i("mqtt连接成功回调 %d", *(int32_t *)eventData);
+
+        break;
+
+    case RyanMqttEventDisconnected:
+        rlog_w("mqtt断开连接回调 %d", *(int32_t *)eventData);
+        break;
+
+    case RyanMqttEventSubscribed:
+    {
+        RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
+        rlog_w("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
+        break;
+    }
+
+    case RyanMqttEventSubscribedFaile:
+    {
+        RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
+        rlog_w("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
+        break;
+    }
+
+    case RyanMqttEventUnSubscribed:
+    {
+        RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
+        rlog_w("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
+        break;
+    }
+
+    case RyanMqttEventUnSubscribedFaile:
+    {
+        RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
+        rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
+        break;
+    }
+
+    case RyanMqttEventPublished:
+    {
+        RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
+        rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
+        mqttTest[PublishedEventCount]++;
+        break;
+    }
+
+    case RyanMqttEventData:
+    {
+        RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
+        rlog_i("接收到mqtt消息事件回调 topic: %s, packetId: %d, payload len: %d",
+               msgData->topic, msgData->packetId, msgData->payloadLen);
+
+        rlog_i("%.*s", msgData->payloadLen, msgData->payload);
+        mqttTest[dataEventCount]++;
+        break;
+    }
+
+    case RyanMqttEventRepeatPublishPacket: // qos2 / qos1重发事件回调
+    {
+        RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
+        rlog_w("发布消息进行重发了,packetType: %d, packetId: %d, topic: %s, qos: %d",
+               ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
+
+        printfArrStr(ackHandler->packet, ackHandler->packetLen, "重发数据: ");
+        break;
+    }
+
+    case RyanMqttEventReconnectBefore:
+        // 如果每次connect都需要修改连接信息,这里是最好的选择。 否则需要注意资源互斥
+        rlog_i("重连前事件回调");
+        break;
+
+    case RyanMqttEventAckCountWarning: // qos2 / qos1的ack链表超过警戒值,不进行释放会一直重发,占用额外内存
+    {
+        // 根据实际情况清除ack, 这里等待每个ack重发次数到达警戒值后清除。
+        // 在资源有限的单片机中也不应频繁发送qos2 / qos1消息
+        uint16_t ackHandlerCount = *(uint16_t *)eventData;
+        rlog_i("ack记数值超过警戒值回调: %d", ackHandlerCount);
+        break;
+    }
+
+    case RyanMqttEventAckRepeatCountWarning: // 重发次数到达警戒值事件
+    {
+        // 这里选择直接丢弃该消息
+        RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
+        rlog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d", ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
+        RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
+        break;
+    }
+
+    case RyanMqttEventAckHandlerdiscard:
+    {
+        RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
+        rlog_i("ack丢弃回调: packetType: %d, packetId: %d, topic: %s, qos: %d",
+               ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic, ackHandler->msgHandler->qos);
+        break;
+    }
+
+    case RyanMqttEventDestoryBefore:
+        rlog_i("销毁mqtt客户端前回调");
+        free(client->config->sendBuffer);
+        free(client->config->recvBuffer);
+        if (client->config->userData)
+            sem_post((sem_t *)client->config->userData);
+        break;
+
+    default:
+        break;
+    }
+}
+
+static void RyanMqttInitSync(RyanMqttClient_t **client, RyanMqttBool_e syncFlag)
+{
+
+    char aaa[64];
+
+    // 手动避免count的资源竞争了
+    static uint32_t count = 0;
+    snprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
+    count++;
+
+    sem_t *sem = NULL;
+    if (syncFlag == RyanMqttTrue)
+    {
+        sem = (sem_t *)malloc(sizeof(sem_t));
+        sem_init(sem, 0, 0);
+    }
+
+    RyanMqttError_e result = RyanMqttSuccessError;
+    RyanMqttClientConfig_t mqttConfig = {
+        .clientId = aaa,
+        .userName = RyanMqttUserName,
+        .password = RyanMqttPassword,
+        .host = RyanMqttHost,
+        .port = RyanMqttPort,
+        .taskName = "mqttThread",
+        .taskPrio = 16,
+        .taskStack = 4096,
+        .recvBufferSize = 1024,
+        .sendBufferSize = 1024,
+        .recvBuffer = malloc(1024),
+        .sendBuffer = malloc(1024),
+        .mqttVersion = 4,
+        .ackHandlerRepeatCountWarning = 6,
+        .ackHandlerCountWarning = 20,
+        .autoReconnectFlag = RyanMqttTrue,
+        .cleanSessionFlag = RyanMqttTrue,
+        .reconnectTimeout = 3000,
+        .recvTimeout = 5000,
+        .sendTimeout = 2000,
+        .ackTimeout = 10000,
+        .keepaliveTimeoutS = 120,
+        .mqttEventHandle = mqttEventHandle,
+        .userData = sem};
+
+    // 初始化mqtt客户端
+    result = RyanMqttInit(client);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 注册需要的事件回调
+    result = RyanMqttRegisterEventId(*client, RyanMqttEventAnyId);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 设置mqtt客户端config
+    result = RyanMqttSetConfig(*client, &mqttConfig);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 设置遗嘱消息
+    result = RyanMqttSetLwt(*client, "pub/test", "this is will", strlen("this is will"), RyanMqttQos0, 0);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 启动mqtt客户端线程
+    result = RyanMqttStart(*client);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    while (RyanMqttConnectState != RyanMqttGetState(*client))
+    {
+        delay(100);
+    }
+}
+
+static void RyanMqttDestorySync(RyanMqttClient_t *client)
+{
+    sem_t *sem = (sem_t *)client->config->userData;
+    // 启动mqtt客户端线程
+    RyanMqttDestroy(client);
+
+    sem_wait(sem);
+    sem_destroy(sem);
+    free(sem);
+    delay(3);
+}
+
+static RyanMqttError_e RyanMqttSubscribeTest(RyanMqttQos_e qos)
+{
+
+#define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
+
+    RyanMqttClient_t *client;
+    RyanMqttInitSync(&client, RyanMqttTrue);
+
+    const char *subscribeArr[] = {
+        "testlinux/pub",
+        "testlinux/pub2",
+        "testlinux/pub3",
+        "testlinux/pub4",
+        "testlinux/pub5",
+    };
+
+    for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
+        RyanMqttSubscribe(client, subscribeArr[i], qos);
+
+    RyanMqttMsgHandler_t msgHandles[10] = {0};
+    int32_t subscribeNum = 0;
+    int32_t result = RyanMqttSuccessError;
+
+    for (int32_t i = 0; i < 600; i++)
+    {
+        result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
+        if (result == RyanMqttNoRescourceError)
+            rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
+
+        if (subscribeNum == getArraySize(subscribeArr))
+            break;
+
+        rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
+        for (int32_t i = 0; i < subscribeNum; i++)
+            rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
+
+        if (i > 500)
+            return RyanMqttFailedError;
+
+        delay(100);
+    }
+
+    for (int32_t i = 0; i < subscribeNum; i++)
+    {
+        uint8_t flag = 0;
+        for (uint8_t j = 0; j < getArraySize(subscribeArr); j++)
+        {
+            if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
+                flag = 1;
+        }
+
+        if (flag != 1)
+        {
+            rlog_i("主题不匹配: %d", msgHandles[i].topic);
+            return RyanMqttFailedError;
+        }
+    }
+
+    for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
+        RyanMqttUnSubscribe(client, subscribeArr[i]);
+
+    RyanMqttDestorySync(client);
+
+    return RyanMqttSuccessError;
+}
+
+static RyanMqttError_e RyanMqttUnSubscribeTest(RyanMqttQos_e qos)
+{
+
+    int count = 2;
+
+#define getArraySize(arr) (sizeof(arr) / sizeof((arr)[0]))
+
+    RyanMqttClient_t *client;
+    RyanMqttInitSync(&client, RyanMqttTrue);
+
+    const char *subscribeArr[] = {
+        "testlinux/pub",
+        "testlinux/pub2",
+        "testlinux/pub3",
+        "testlinux/pub4",
+        "testlinux/pub5",
+    };
+
+    for (uint8_t i = 0; i < getArraySize(subscribeArr); i++)
+        RyanMqttSubscribe(client, subscribeArr[i], qos);
+
+    RyanMqttMsgHandler_t msgHandles[10] = {0};
+    int32_t subscribeNum = 0;
+    int32_t result = RyanMqttSuccessError;
+
+    for (int32_t i = 0; i < 600; i++)
+    {
+        result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
+        if (result == RyanMqttNoRescourceError)
+            rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
+
+        if (subscribeNum == getArraySize(subscribeArr))
+            break;
+
+        rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
+
+        if (i > 500)
+            return RyanMqttFailedError;
+
+        delay(100);
+    }
+
+    for (uint8_t i = 0; i < getArraySize(subscribeArr) - count - 1; i++)
+        RyanMqttUnSubscribe(client, subscribeArr[i]);
+
+    for (int32_t i = 0; i < 600; i++)
+    {
+        result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
+        if (result == RyanMqttNoRescourceError)
+            rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
+
+        if (subscribeNum == getArraySize(subscribeArr) - count)
+            break;
+
+        rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr) - count);
+
+        if (i > 500)
+            return RyanMqttFailedError;
+
+        delay(100);
+    }
+
+    for (int32_t i = 0; i < subscribeNum; i++)
+    {
+        uint8_t flag = 0;
+        for (uint8_t j = count - 1; j < getArraySize(subscribeArr); j++)
+        {
+            if (0 == strcmp(msgHandles[i].topic, subscribeArr[j]))
+                flag = 1;
+        }
+
+        if (flag != 1)
+        {
+            rlog_i("主题不匹配: %d", msgHandles[i].topic);
+            return RyanMqttFailedError;
+        }
+    }
+
+    RyanMqttDestorySync(client);
+
+    return RyanMqttSuccessError;
+}
+
+static void RyanMqttPublishTest(RyanMqttQos_e qos, uint32_t count, uint32_t delayms)
+{
+    RyanMqttClient_t *client;
+    RyanMqttInitSync(&client, RyanMqttTrue);
+
+    RyanMqttSubscribe(client, "testlinux/pub", qos);
+    mqttTest[PublishedEventCount] = 0;
+    mqttTest[dataEventCount] = 0;
+    for (uint32_t i = 0; i < count; i++)
+    {
+        RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), qos, RyanMqttFalse);
+        if (delayms)
+            delay(delayms);
+    }
+
+    for (uint32_t i = 0; i < 60; i++)
+    {
+        delay(1000);
+
+        uint8_t result = 0;
+        if (RyanMqttQos0 == qos)
+        {
+            if (count == mqttTest[dataEventCount])
+                result = 1;
+        }
+        else if (mqttTest[PublishedEventCount] == count && mqttTest[PublishedEventCount] == mqttTest[dataEventCount])
+            result = 1;
+
+        if (!result)
+        {
+            rlog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos, mqttTest[PublishedEventCount], mqttTest[dataEventCount]);
+        }
+        else
+        {
+            rlog_i("QOS测试成功 Qos: %d", qos);
+            break;
+        }
+    }
+
+    RyanMqttUnSubscribe(client, "testlinux/pub");
+
+    RyanMqttDestorySync(client);
+}
+
+static void RyanMqttConnectDestory(uint32_t count, uint32_t delayms)
+{
+    for (uint32_t i = 0; i < count; i++)
+    {
+
+        RyanMqttClient_t *client;
+
+        RyanMqttInitSync(&client, i == count - 1 ? RyanMqttTrue : RyanMqttFalse);
+
+        RyanMqttPublish(client, "testlinux/pub", "helloworld", strlen("helloworld"), RyanMqttQos0, RyanMqttFalse);
+
+        if (delayms)
+            delay(delayms);
+
+        if (i == count - 1) // 最后一次同步释放
+        {
+            RyanMqttDestorySync(client);
+            delay(1000);
+        }
+        else
+            RyanMqttDestroy(client);
+    }
+}
+
+static void RyanMqttReconnectTest(uint32_t count, uint32_t delayms)
+{
+    RyanMqttClient_t *client;
+    RyanMqttInitSync(&client, RyanMqttTrue);
+    for (uint32_t i = 0; i < count; i++)
+    {
+        RyanMqttDisconnect(client, i % 2 == 0);
+        while (RyanMqttConnectState != RyanMqttGetState(client))
+        {
+            delay(1);
+        }
+
+        if (delayms)
+            delay(delayms);
+    }
+
+    RyanMqttDestorySync(client);
+}
+
+static RyanMqttError_e RyanMqttKeepAliveTest()
+{
+    RyanMqttClient_t *client;
+    RyanMqttError_e result = RyanMqttSuccessError;
+
+    sem_t *sem = (sem_t *)malloc(sizeof(sem_t));
+    sem_init(sem, 0, 0);
+    RyanMqttClientConfig_t mqttConfig = {
+        .clientId = "dfawerwdfgaeruyfku",
+        .userName = RyanMqttUserName,
+        .password = RyanMqttPassword,
+        .host = RyanMqttHost,
+        .port = RyanMqttPort,
+        .taskName = "mqttThread",
+        .taskPrio = 16,
+        .taskStack = 4096,
+        .recvBufferSize = 1024,
+        .sendBufferSize = 1024,
+        .recvBuffer = malloc(1024),
+        .sendBuffer = malloc(1024),
+        .mqttVersion = 4,
+        .ackHandlerRepeatCountWarning = 6,
+        .ackHandlerCountWarning = 20,
+        .autoReconnectFlag = RyanMqttTrue,
+        .cleanSessionFlag = RyanMqttTrue,
+        .reconnectTimeout = 3000,
+        .recvTimeout = 5000,
+        .sendTimeout = 2000,
+        .ackTimeout = 10000,
+        .keepaliveTimeoutS = 30,
+        .mqttEventHandle = mqttEventHandle,
+        .userData = sem};
+
+    // 初始化mqtt客户端
+    result = RyanMqttInit(&client);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 注册需要的事件回调
+    result = RyanMqttRegisterEventId(client, RyanMqttEventAnyId);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 设置mqtt客户端config
+    result = RyanMqttSetConfig(client, &mqttConfig);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    // 启动mqtt客户端线程
+    result = RyanMqttStart(client);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    while (RyanMqttConnectState != RyanMqttGetState(client))
+    {
+        delay(100);
+    }
+
+    for (uint32_t i = 0; i < 90; i++)
+    {
+        if (RyanMqttConnectState != RyanMqttGetState(client))
+        {
+            rlog_e("mqtt断连了");
+            return RyanMqttFailedError;
+        }
+
+        rlog_w("心跳倒计时: %d", platformTimerRemain(&client->keepaliveTimer));
+        delay(1000);
+    }
+
+    RyanMqttDestorySync(client);
+}
+
+// !当测试程序出错时,并不会回收内存。交由父进程进行回收
+int main()
+{
+    vallocInit();
+
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_d, { goto __exit; });
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_d, { goto __exit; });
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_d, { goto __exit; });
+
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos0), RyanMqttFailedError, rlog_d, { goto __exit; });
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos1), RyanMqttFailedError, rlog_d, { goto __exit; });
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttUnSubscribeTest(RyanMqttQos2), RyanMqttFailedError, rlog_d, { goto __exit; });
+
+    // 发布 & 订阅 qos 测试
+    RyanMqttPublishTest(RyanMqttQos0, 100, 0);
+    checkMemory;
+
+    RyanMqttPublishTest(RyanMqttQos1, 100, 1);
+    checkMemory;
+
+    RyanMqttPublishTest(RyanMqttQos2, 100, 1);
+    checkMemory;
+
+    RyanMqttConnectDestory(100, 0);
+    checkMemory;
+
+    RyanMqttReconnectTest(3, 0);
+    checkMemory;
+
+    RyanMqttCheckCode(RyanMqttSuccessError == RyanMqttKeepAliveTest(), RyanMqttFailedError, rlog_d, { goto __exit; });
+
+__exit:
+    while (1)
+    {
+        displayMem();
+        delay(10 * 1000);
+    }
+
+    return 0;
+}