Просмотр исходного кода

Merge branch 'main' of https://github.com/Ryan-CW-Code/RyanMqtt

ryancw 1 год назад
Родитель
Сommit
16084df514

+ 3 - 0
.gitignore

@@ -13,3 +13,6 @@ null
 *.out
 *.exe
 
+# 忽略
+core
+

+ 1 - 1
Makefile

@@ -13,7 +13,7 @@ src += $(wildcard ./pahoMqtt/*.c)
 src += $(wildcard ./mqttclient/*.c)
 
 obj = $(patsubst %.c, %.o, $(src))
-target = app
+target = app.o
 CC = gcc
 
 $(target): $(obj)

+ 75 - 97
mqttclient/RyanMqttClient.c

@@ -55,28 +55,10 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
     memset(client, 0, sizeof(RyanMqttClient_t));
 
     // 网络接口初始化
-    client->network = (platformNetwork_t *)platformMemoryMalloc(sizeof(platformNetwork_t));
-    RyanMqttCheckCode(NULL != client->network, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
-    memset(client->network, 0, sizeof(platformNetwork_t));
-    client->network->socket = -1;
+    client->network.socket = -1;
 
-    client->config = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
-    RyanMqttCheckCode(NULL != client->config, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
-    memset(client->config, 0, sizeof(RyanMqttClientConfig_t));
-
-    client->mqttThread = platformMemoryMalloc(sizeof(platformThread_t));
-    RyanMqttCheckCode(NULL != client->mqttThread, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
-    memset(client->mqttThread, 0, sizeof(platformThread_t));
-
-    client->sendBufLock = platformMemoryMalloc(sizeof(platformMutex_t));
-    RyanMqttCheckCode(NULL != client->sendBufLock, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
-    memset(client->sendBufLock, 0, sizeof(platformMutex_t));
-    platformMutexInit(client->config->userData, client->sendBufLock); // 初始化发送缓冲区互斥锁
-
-    client->criticalLock = platformMemoryMalloc(sizeof(platformCritical_t));
-    RyanMqttCheckCode(NULL != client->criticalLock, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
-    memset(client->criticalLock, 0, sizeof(platformMutex_t));
-    platformCriticalInit(client->config->userData, client->criticalLock); // 初始化临界区
+    platformMutexInit(client->config.userData, &client->sendBufLock);     // 初始化发送缓冲区互斥锁
+    platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
 
     client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
     client->clientState = RyanMqttInitState;
@@ -84,13 +66,12 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
     client->keepaliveTimeoutCount = 0;
     client->ackHandlerCount = 0;
     client->lwtFlag = RyanMqttFalse;
-    client->lwtOptions = NULL;
 
     RyanListInit(&client->msgHandlerList);
     RyanListInit(&client->ackHandlerList);
-    platformTimerInit(&client->keepaliveTimer);
 
     RyanMqttSetClientState(client, RyanMqttInitState);
+    platformTimerInit(&client->keepaliveTimer);
 
     *pClient = client;
     return RyanMqttSuccessError;
@@ -110,15 +91,16 @@ RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
 
     RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
 
-    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
     client->destoryFlag = RyanMqttTrue;
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 
     return RyanMqttSuccessError;
 }
 
 /**
  * @brief 启动mqtt客户端
+ * !不要重复调用
  *
  * @param client
  * @return RyanMqttError_e
@@ -129,15 +111,16 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
     RyanMqttError_e result = RyanMqttSuccessError;
     RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
     RyanMqttCheck(RyanMqttInitState == client->clientState, RyanMqttFailedError, rlog_d);
+
     RyanMqttSetClientState(client, RyanMqttStartState);
     // 连接成功,需要初始化 MQTT 线程
-    result = platformThreadInit(client->config->userData,
-                                client->mqttThread,
-                                client->config->taskName,
+    result = platformThreadInit(client->config.userData,
+                                &client->mqttThread,
+                                client->config.taskName,
                                 RyanMqttThread,
                                 client,
-                                client->config->taskStack,
-                                client->config->taskPrio);
+                                client->config.taskStack,
+                                client->config.taskPrio);
     RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttSetClientState(client, RyanMqttInitState); });
 
     return RyanMqttSuccessError;
@@ -161,12 +144,12 @@ RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e send
 
     if (RyanMqttTrue == sendDiscFlag)
     {
-        platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+        platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
         // 序列化断开连接数据包并发送
-        packetLen = MQTTSerialize_disconnect((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize);
+        packetLen = MQTTSerialize_disconnect((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
         if (packetLen > 0)
-            RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-        platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+            RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+        platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     }
 
     connectState = RyanMqttConnectUserDisconnected;
@@ -186,11 +169,10 @@ RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
 {
 
     RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
-    RyanMqttCheck(NULL != client->mqttThread, RyanMqttParamInvalidError, rlog_d);
     RyanMqttCheck(RyanMqttDisconnectState != RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
 
     RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL);
-    platformThreadStart(client->config->userData, client->mqttThread);
+    platformThreadStart(client->config.userData, &client->mqttThread);
     return RyanMqttSuccessError;
 }
 
@@ -217,25 +199,25 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
     RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
     RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
 
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
     packetId = RyanMqttGetNextPacketId(client);
 
-    packetLen = MQTTSerialize_subscribe((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName, (int *)&qos);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    packetLen = MQTTSerialize_subscribe((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, packetId, 1, &topicName, (int *)&qos);
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
         platformMemoryFree(msgHandler);
-        platformMutexUnLock(client->config->userData, client->sendBufLock);
+        platformMutexUnLock(client->config.userData, &client->sendBufLock);
     });
 
     // 添加等待 ack
     result = RyanMqttAckListAdd(client, ackHandler);
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);RyanMqttAckHandlerDestroy(client, ackHandler); });
 
@@ -267,20 +249,20 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
     result = RyanMqttMsgHandlerFind(client, topicName.cstring, strlen(topicName.cstring), RyanMqttFalse, &msgHandler);
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
     packetId = RyanMqttGetNextPacketId(client);
 
-    packetLen = MQTTSerialize_unsubscribe((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName);
+    packetLen = MQTTSerialize_unsubscribe((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, packetId, 1, &topicName);
     RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
-                      { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+                      { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
-                      { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+                      { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     result = RyanMqttAckListAdd(client, ackHandler);
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);
                         RyanMqttAckHandlerDestroy(client, ackHandler); });
@@ -321,37 +303,37 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
 
     if (RyanMqttQos0 == qos)
     {
-        platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-        packetLen = MQTTSerialize_publish((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
+        platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
+        packetLen = MQTTSerialize_publish((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, qos, retain, packetId,
                                           topicName, (uint8_t *)payload, payloadLen);
         RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
-                          { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+                          { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
-        result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-        platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+        result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+        platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
         return result;
     }
 
     // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
     packetId = RyanMqttGetNextPacketId(client);
 
-    packetLen = MQTTSerialize_publish((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
+    packetLen = MQTTSerialize_publish((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, qos, retain, packetId,
                                       topicName, (uint8_t *)payload, payloadLen);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
         platformMemoryFree(msgHandler);
-        platformMutexUnLock(client->config->userData, client->sendBufLock);
+        platformMutexUnLock(client->config.userData, &client->sendBufLock);
     });
 
     result = RyanMqttAckListAdd(client, ackHandler);
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
 
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);
@@ -494,45 +476,43 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
     RyanMqttCheck(2 < clientConfig->recvBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->recvBufferSize, RyanMqttParamInvalidError, rlog_d);
     RyanMqttCheck(2 < clientConfig->sendBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->sendBufferSize, RyanMqttParamInvalidError, rlog_d);
 
-    RyanMqttCheckCode(NULL != client->config, RyanMqttParamInvalidError, rlog_d, { goto __exit; });
-
-    result = setConfigValue(&client->config->clientId, clientConfig->clientId);
+    result = setConfigValue(&client->config.clientId, clientConfig->clientId);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
 
-    result = setConfigValue(&client->config->userName, clientConfig->userName);
+    result = setConfigValue(&client->config.userName, clientConfig->userName);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
 
-    result = setConfigValue(&client->config->password, clientConfig->password);
+    result = setConfigValue(&client->config.password, clientConfig->password);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
 
-    result = setConfigValue(&client->config->host, clientConfig->host);
+    result = setConfigValue(&client->config.host, clientConfig->host);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
 
-    result = setConfigValue(&client->config->port, clientConfig->port);
+    result = setConfigValue(&client->config.port, clientConfig->port);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
 
-    result = setConfigValue(&client->config->taskName, clientConfig->taskName);
+    result = setConfigValue(&client->config.taskName, clientConfig->taskName);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
 
-    client->config->taskPrio = clientConfig->taskPrio;
-    client->config->taskStack = clientConfig->taskStack;
-    client->config->mqttVersion = clientConfig->mqttVersion;
-    client->config->ackHandlerRepeatCountWarning = clientConfig->ackHandlerRepeatCountWarning;
-    client->config->ackHandlerCountWarning = clientConfig->ackHandlerCountWarning;
-    client->config->autoReconnectFlag = clientConfig->autoReconnectFlag;
-    client->config->cleanSessionFlag = clientConfig->cleanSessionFlag;
-    client->config->reconnectTimeout = clientConfig->reconnectTimeout;
-    client->config->recvTimeout = clientConfig->recvTimeout;
-    client->config->sendTimeout = clientConfig->sendTimeout;
-    client->config->ackTimeout = clientConfig->ackTimeout;
-    client->config->keepaliveTimeoutS = clientConfig->keepaliveTimeoutS;
-    client->config->mqttEventHandle = clientConfig->mqttEventHandle;
-    client->config->userData = clientConfig->userData;
-
-    client->config->recvBufferSize = clientConfig->recvBufferSize;
-    client->config->sendBufferSize = clientConfig->sendBufferSize;
-    client->config->recvBuffer = clientConfig->recvBuffer;
-    client->config->sendBuffer = clientConfig->sendBuffer;
+    client->config.taskPrio = clientConfig->taskPrio;
+    client->config.taskStack = clientConfig->taskStack;
+    client->config.mqttVersion = clientConfig->mqttVersion;
+    client->config.ackHandlerRepeatCountWarning = clientConfig->ackHandlerRepeatCountWarning;
+    client->config.ackHandlerCountWarning = clientConfig->ackHandlerCountWarning;
+    client->config.autoReconnectFlag = clientConfig->autoReconnectFlag;
+    client->config.cleanSessionFlag = clientConfig->cleanSessionFlag;
+    client->config.reconnectTimeout = clientConfig->reconnectTimeout;
+    client->config.recvTimeout = clientConfig->recvTimeout;
+    client->config.sendTimeout = clientConfig->sendTimeout;
+    client->config.ackTimeout = clientConfig->ackTimeout;
+    client->config.keepaliveTimeoutS = clientConfig->keepaliveTimeoutS;
+    client->config.mqttEventHandle = clientConfig->mqttEventHandle;
+    client->config.userData = clientConfig->userData;
+
+    client->config.recvBufferSize = clientConfig->recvBufferSize;
+    client->config.sendBufferSize = clientConfig->sendBufferSize;
+    client->config.recvBuffer = clientConfig->recvBuffer;
+    client->config.sendBuffer = clientConfig->sendBuffer;
 
     return RyanMqttSuccessError;
 
@@ -561,24 +541,22 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
     RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
     RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
     RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
-    RyanMqttCheck(NULL == client->lwtOptions, RyanMqttFailedError, rlog_d);
 
     if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
         return RyanMqttParamInvalidError;
 
-    client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t) + payloadLen);
-    RyanMqttCheck(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, rlog_d);
-    memset(client->lwtOptions, 0, sizeof(lwtOptions_t) + payloadLen);
+    memset(&client->lwtOptions, 0, sizeof(lwtOptions_t));
 
     client->lwtFlag = RyanMqttTrue;
-    client->lwtOptions->qos = qos;
-    client->lwtOptions->retain = retain;
-    client->lwtOptions->payloadLen = payloadLen;
-    client->lwtOptions->payload = (char *)client->lwtOptions + sizeof(lwtOptions_t);
-    memcpy(client->lwtOptions->payload, payload, payloadLen);
-
-    result = RyanMqttStringCopy(&client->lwtOptions->topic, topicName, strlen(topicName));
-    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(client->lwtOptions); });
+    client->lwtOptions.qos = qos;
+    client->lwtOptions.retain = retain;
+    client->lwtOptions.payloadLen = payloadLen;
+
+    result = RyanMqttStringCopy(&client->lwtOptions.payload, payload, payloadLen);
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
+
+    result = RyanMqttStringCopy(&client->lwtOptions.topic, topicName, strlen(topicName));
+    RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(client->lwtOptions.payload); });
 
     return RyanMqttSuccessError;
 }

+ 6 - 6
mqttclient/RyanMqttClient.h

@@ -101,12 +101,12 @@ extern "C"
         RyanList_t msgHandlerList;         // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
         RyanList_t ackHandlerList;         // 维护ack链表
         platformTimer_t keepaliveTimer;    // 保活定时器
-        platformNetwork_t *network;        // 网络组件
-        RyanMqttClientConfig_t *config;    // mqtt config
-        platformThread_t *mqttThread;      // mqtt线程
-        platformMutex_t *sendBufLock;      // 写缓冲区锁
-        platformCritical_t *criticalLock;  // 临界区锁
-        lwtOptions_t *lwtOptions;          // 遗嘱相关配置
+        platformNetwork_t network;         // 网络组件
+        RyanMqttClientConfig_t config;     // mqtt config
+        platformThread_t mqttThread;       // mqtt线程
+        platformMutex_t sendBufLock;       // 写缓冲区锁
+        platformCritical_t criticalLock;   // 临界区锁
+        lwtOptions_t lwtOptions;           // 遗嘱相关配置
     } RyanMqttClient_t;
 
     /* extern variables-----------------------------------------------------------*/

+ 88 - 127
mqttclient/RyanMqttThread.c

@@ -13,9 +13,9 @@ void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
 {
     // 服务器在心跳时间的1.5倍内没有收到keeplive消息则会断开连接
     // 这里在用户设置的心跳剩余 1/4 时手动发送保活指令
-    platformCriticalEnter(client->config->userData, client->criticalLock);
-    platformTimerCutdown(&client->keepaliveTimer, client->config->keepaliveTimeoutS / 4 * 3 * 1000); // 启动心跳定时器
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
+    platformTimerCutdown(&client->keepaliveTimer, client->config.keepaliveTimeoutS / 4 * 3 * 1000); // 启动心跳定时器
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 }
 
 /**
@@ -45,11 +45,11 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
         return RyanMqttKeepaliveTimeout;
     }
 
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-    int32_t packetLen = MQTTSerialize_pingreq((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize);
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
+    int32_t packetLen = MQTTSerialize_pingreq((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
     if (packetLen > 0)
-        RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+        RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
 
     client->keepaliveTimeoutCount++;
     return RyanMqttSuccessError;
@@ -103,7 +103,7 @@ static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *c
     RyanMqttAckHandler_t *ackHandler = NULL;
     RyanMqttAssert(NULL != client);
 
-    result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+    result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     // 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
@@ -133,17 +133,17 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
     RyanMqttAckHandler_t *ackHandler = NULL;
     RyanMqttAssert(NULL != client);
 
-    result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+    result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     // 制作确认数据包并发送
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-    packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBCOMP, 0, packetId);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
+    packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBCOMP, 0, packetId);
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     // 每次收到PUBREL都返回消息
-    result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 删除pubrel记录
@@ -175,18 +175,18 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
     RyanMqttAckHandler_t *ackHandlerPubrec = NULL;
     RyanMqttAssert(NULL != client);
 
-    result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+    result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     // 制作确认数据包并发送
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
     // 序列化发布释放报文
-    packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREL, 0, packetId);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBREL, 0, packetId);
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     // 每次收到PUBREC都返回ack
-    result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
@@ -239,7 +239,7 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
     RyanMqttAssert(NULL != client);
 
     result = MQTTDeserialize_publish(&msgData.dup, (int *)&msgData.qos, &msgData.retained, &msgData.packetId, &topicName,
-                                     (uint8_t **)&msgData.payload, (int *)&msgData.payloadLen, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+                                     (uint8_t **)&msgData.payload, (int *)&msgData.payloadLen, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     // 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
@@ -253,13 +253,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
         break;
 
     case RyanMqttQos1:
-        platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-        packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBACK, 0, msgData.packetId);
+        platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
+        packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBACK, 0, msgData.packetId);
         RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
-                          { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+                          { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
-        result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-        platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+        result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+        platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
         RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
         deliverMsgFlag = RyanMqttTrue;
@@ -267,13 +267,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
 
     case RyanMqttQos2: // qos2采用方法B
 
-        platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
-        packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREC, 0, msgData.packetId);
+        platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
+        packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBREC, 0, msgData.packetId);
         RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
-                          { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+                          { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
-        result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-        platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+        result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+        platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
         RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
         // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish, 不进行qos2 PUBREC消息处理
@@ -323,7 +323,7 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
     RyanMqttAckHandler_t *ackHandler = NULL;
     RyanMqttAssert(NULL != client);
 
-    result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+    result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     // ack链表不存在当前订阅确认节点就直接退出
@@ -376,7 +376,7 @@ static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client)
     uint16_t packetId = 0;
     RyanMqttAssert(NULL != client);
 
-    result = MQTTDeserialize_unsuback(&packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+    result = MQTTDeserialize_unsuback(&packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     // ack链表不存在当前取消订阅确认节点就直接退出
@@ -409,13 +409,13 @@ static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8
     // RyanMqttAssert(NULL != packetType); packetType == 0时会误判
 
     // 1.读取标头字节。 其中包含数据包类型
-    result = RyanMqttRecvPacket(client, client->config->recvBuffer, fixedHeaderLen);
+    result = RyanMqttRecvPacket(client, client->config.recvBuffer, fixedHeaderLen);
     if (RyanMqttRecvPacketTimeOutError == result)
         return RyanMqttRecvPacketTimeOutError;
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 填充联合体标头信息
-    header.byte = client->config->recvBuffer[0];
+    header.byte = client->config.recvBuffer[0];
     rlog_d("packetType: %d", header.bits.type);
     RyanMqttCheck(CONNECT <= header.bits.type && DISCONNECT >= header.bits.type, result, rlog_d);
 
@@ -424,14 +424,14 @@ static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
-    fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config->recvBuffer + fixedHeaderLen, payloadLen);
-    RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config->recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
-                      { RyanMqttRecvPacket(client, client->config->recvBuffer, payloadLen); });
+    fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config.recvBuffer + fixedHeaderLen, payloadLen);
+    RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config.recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
+                      { RyanMqttRecvPacket(client, client->config.recvBuffer, payloadLen); });
 
     // 3.读取mqtt载荷数据并放到读取缓冲区
     if (payloadLen > 0)
     {
-        result = RyanMqttRecvPacket(client, client->config->recvBuffer + fixedHeaderLen, payloadLen);
+        result = RyanMqttRecvPacket(client, client->config.recvBuffer + fixedHeaderLen, payloadLen);
         RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
     }
 
@@ -535,9 +535,8 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
             RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
 
             // 重置ack超时时间
-            platformTimerCutdown(&ackHandler->timer, client->config->ackTimeout);
+            platformTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
             ackHandler->repeatCount++;
-
             break;
         }
 
@@ -580,41 +579,39 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
     int32_t connackRc = 0;
     MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
     RyanMqttAssert(NULL != client);
-    RyanMqttAssert(NULL != client->network);
-    RyanMqttAssert(NULL != client->config);
 
     RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectAccepted, rlog_d);
 
     // 连接标志位
-    connectData.clientID.cstring = client->config->clientId;
-    connectData.username.cstring = client->config->userName;
-    connectData.password.cstring = client->config->password;
-    connectData.keepAliveInterval = client->config->keepaliveTimeoutS;
-    connectData.cleansession = client->config->cleanSessionFlag;
-    connectData.MQTTVersion = client->config->mqttVersion;
+    connectData.clientID.cstring = client->config.clientId;
+    connectData.username.cstring = client->config.userName;
+    connectData.password.cstring = client->config.password;
+    connectData.keepAliveInterval = client->config.keepaliveTimeoutS;
+    connectData.cleansession = client->config.cleanSessionFlag;
+    connectData.MQTTVersion = client->config.mqttVersion;
 
     if (RyanMqttTrue == client->lwtFlag)
     {
         connectData.willFlag = 1;
-        connectData.will.qos = client->lwtOptions->qos;
-        connectData.will.retained = client->lwtOptions->retain;
-        connectData.will.message.lenstring.data = client->lwtOptions->payload;
-        connectData.will.message.lenstring.len = client->lwtOptions->payloadLen;
-        connectData.will.topicName.cstring = client->lwtOptions->topic;
+        connectData.will.qos = client->lwtOptions.qos;
+        connectData.will.retained = client->lwtOptions.retain;
+        connectData.will.message.lenstring.data = client->lwtOptions.payload;
+        connectData.will.message.lenstring.len = client->lwtOptions.payloadLen;
+        connectData.will.topicName.cstring = client->lwtOptions.topic;
     }
 
     // 调用底层的连接函数连接上服务器
-    result = platformNetworkConnect(client->config->userData, client->network, client->config->host, client->config->port);
+    result = platformNetworkConnect(client->config.userData, &client->network, client->config.host, client->config.port);
     RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttConnectNetWorkFail, rlog_d);
 
-    platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
+    platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
     // 序列化mqtt的CONNECT报文
-    packetLen = MQTTSerialize_connect((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, &connectData);
-    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
+    packetLen = MQTTSerialize_connect((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, &connectData);
+    RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
     // 发送序列化mqtt的CONNECT报文
-    result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
-    platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
+    result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 等待报文
@@ -623,7 +620,7 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
     RyanMqttCheck(CONNACK == packetType, RyanMqttConnectDisconnected, rlog_d);
 
     // 解析CONNACK报文
-    result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
+    result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
     RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
 
     rlog_i("result: %d, packetLen: %d, packetType: %d connackRc: %d", result, packetLen, packetType, connackRc);
@@ -641,8 +638,6 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
 void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
 {
     RyanMqttAssert(NULL != client);
-    RyanMqttAssert(NULL != client->network);
-    RyanMqttAssert(NULL != client->config);
 
     switch (eventId)
     {
@@ -655,9 +650,9 @@ void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, v
 
     case RyanMqttEventDisconnected:                              // 断开连接事件
         RyanMqttSetClientState(client, RyanMqttDisconnectState); // 先将客户端状态设置为断开连接,避免close网络资源时用户依然在使用
-        platformNetworkClose(client->config->userData, client->network);
+        platformNetworkClose(client->config.userData, &client->network);
 
-        if (RyanMqttTrue == client->config->cleanSessionFlag)
+        if (RyanMqttTrue == client->config.cleanSessionFlag)
             RyanMqttCleanSession(client);
 
         break;
@@ -670,11 +665,11 @@ void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, v
         break;
     }
 
-    if (client->config->mqttEventHandle == NULL)
+    if (client->config.mqttEventHandle == NULL)
         return;
 
     if (client->eventFlag & eventId)
-        client->config->mqttEventHandle(client, eventId, eventData);
+        client->config.mqttEventHandle(client, eventId, eventData);
 }
 
 /**
@@ -686,86 +681,52 @@ void RyanMqttThread(void *argument)
 {
     int32_t result = 0;
     RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
-    RyanMqttAssert(NULL != client);          // RyanMqttStart前没有调用RyanMqttInit
-    RyanMqttAssert(NULL != client->network); // RyanMqttStart前没有调用RyanMqttInit
-    RyanMqttAssert(NULL != client->config);  // RyanMqttStart前没有调用RyanMqttSetConfig
+    RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
 
     while (1)
     {
-
         // 销毁客户端
         if (RyanMqttTrue == client->destoryFlag)
         {
-
             RyanMqttEventMachine(client, RyanMqttEventDestoryBefore, (void *)NULL);
 
             // 清除网络组件
-            if (NULL != client->network)
-            {
-                platformNetworkClose(client->config->userData, client->network);
-                platformMemoryFree(client->network);
-                client->network = NULL;
-            }
+            platformNetworkClose(client->config.userData, &client->network);
 
             // 清除config信息
-            if (NULL != client->config)
-            {
-                if (NULL != client->config->clientId)
-                    platformMemoryFree(client->config->clientId);
-
-                if (NULL != client->config->host)
-                    platformMemoryFree(client->config->host);
-
-                if (NULL != client->config->port)
-                    platformMemoryFree(client->config->port);
-
-                if (NULL != client->config->userName)
-                    platformMemoryFree(client->config->userName);
-
-                if (NULL != client->config->password)
-                    platformMemoryFree(client->config->password);
-
-                if (NULL != client->config->taskName)
-                    platformMemoryFree(client->config->taskName);
-
-                if (NULL != client->config)
-                    platformMemoryFree(client->config);
-            }
+            if (NULL != client->config.clientId)
+                platformMemoryFree(client->config.clientId);
+            if (NULL != client->config.userName)
+                platformMemoryFree(client->config.userName);
+            if (NULL != client->config.password)
+                platformMemoryFree(client->config.password);
+            if (NULL != client->config.host)
+                platformMemoryFree(client->config.host);
+            if (NULL != client->config.port)
+                platformMemoryFree(client->config.port);
+            if (NULL != client->config.taskName)
+                platformMemoryFree(client->config.taskName);
 
             // 清除遗嘱相关配置
-            if (RyanMqttTrue == client->lwtFlag && NULL != client->lwtOptions)
-            {
-                if (NULL != client->lwtOptions->topic)
-                    platformMemoryFree(client->lwtOptions->topic);
+            if (NULL != client->lwtOptions.payload)
+                platformMemoryFree(client->lwtOptions.payload);
 
-                platformMemoryFree(client->lwtOptions);
-            }
+            if (NULL != client->lwtOptions.topic)
+                platformMemoryFree(client->lwtOptions.topic);
 
             // 清除session  ack链表和msg链表
             RyanMqttCleanSession(client);
 
             // 清除互斥锁
-            if (NULL != client->sendBufLock)
-            {
-                platformMutexDestroy(client->config->userData, client->sendBufLock);
-                platformMemoryFree(client->sendBufLock);
-                client->sendBufLock = NULL;
-            }
+            platformMutexDestroy(client->config.userData, &client->sendBufLock);
 
             // 清除临界区
-            if (NULL != client->criticalLock)
-            {
-                platformCriticalDestroy(client->config->userData, client->criticalLock);
-                platformMemoryFree(client->criticalLock);
-                client->criticalLock = NULL;
-            }
-
-            platformThread_t mqttThread = *client->mqttThread;
-            void *userData = client->config->userData;
+            platformCriticalDestroy(client->config.userData, &client->criticalLock);
 
             // 清除掉线程动态资源
-            platformMemoryFree(client->mqttThread);
-            client->mqttThread = NULL;
+            platformThread_t mqttThread = {0};
+            memcpy(&mqttThread, &client->mqttThread, sizeof(platformThread_t));
+            void *userData = client->config.userData;
 
             platformMemoryFree(client);
             client = NULL;
@@ -794,11 +755,11 @@ void RyanMqttThread(void *argument)
 
         case RyanMqttDisconnectState: // 断开连接状态
             rlog_d("断开连接状态");
-            if (RyanMqttTrue != client->config->autoReconnectFlag) // 没有使能自动连接就休眠线程
-                platformThreadStop(client->config->userData, client->mqttThread);
+            if (RyanMqttTrue != client->config.autoReconnectFlag) // 没有使能自动连接就休眠线程
+                platformThreadStop(client->config.userData, &client->mqttThread);
 
             rlog_d("触发自动连接,%dms后开始连接\r\n", client->config->reconnectTimeout);
-            platformDelay(client->config->reconnectTimeout);
+            platformDelay(client->config.reconnectTimeout);
             RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL); // 给上层触发重新连接前事件
 
             break;

+ 16 - 20
mqttclient/RyanMqttUtile.c

@@ -22,7 +22,7 @@ RyanMqttError_e RyanMqttStringCopy(char **dest, char *rest, uint32_t strLen)
     char *str2 = NULL;
     RyanMqttAssert(NULL != dest);
     RyanMqttAssert(NULL != rest);
-    // RyanMqttCheck(0 != strLen, RyanMqttFailedError);
+    RyanMqttCheck(0 != strLen, RyanMqttFailedError, rlog_d);
 
     str2 = (char *)platformMemoryMalloc(strLen + 1);
     if (NULL == str2)
@@ -72,13 +72,11 @@ RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, char *recvBuf, int3
     int32_t connectState = RyanMqttConnectAccepted;
     RyanMqttError_e result = RyanMqttSuccessError;
     RyanMqttAssert(NULL != client);
-    RyanMqttAssert(NULL != client->network);
-    RyanMqttAssert(NULL != client->config);
     RyanMqttAssert(NULL != recvBuf);
 
     RyanMqttCheck(0 != recvLen, RyanMqttSuccessError, rlog_d);
 
-    result = platformNetworkRecvAsync(client->config->userData, client->network, recvBuf, recvLen, client->config->recvTimeout);
+    result = platformNetworkRecvAsync(client->config.userData, &client->network, recvBuf, recvLen, client->config.recvTimeout);
 
     switch (result)
     {
@@ -108,13 +106,11 @@ RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, char *sendBuf, int3
     int32_t connectState = RyanMqttConnectAccepted;
     RyanMqttError_e result = RyanMqttSuccessError;
     RyanMqttAssert(NULL != client);
-    RyanMqttAssert(NULL != client->network);
-    RyanMqttAssert(NULL != client->config);
     RyanMqttAssert(NULL != sendBuf);
 
     RyanMqttCheck(0 != sendLen, RyanMqttSuccessError, rlog_d);
 
-    result = platformNetworkSendAsync(client->config->userData, client->network, sendBuf, sendLen, client->config->sendTimeout);
+    result = platformNetworkSendAsync(client->config.userData, &client->network, sendBuf, sendLen, client->config.sendTimeout);
     switch (result)
     {
     case RyanMqttSuccessError:
@@ -140,9 +136,9 @@ void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state)
 {
     RyanMqttAssert(NULL != client);
 
-    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
     client->clientState = state;
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 }
 
 /**
@@ -414,9 +410,9 @@ RyanMqttError_e RyanMqttMsgHandlerAdd(RyanMqttClient_t *client, RyanMqttMsgHandl
     RyanMqttAssert(NULL != msgHandler);
     RyanMqttAssert(NULL != msgHandler->topic);
 
-    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
     RyanListAddTail(&msgHandler->list, &client->msgHandlerList); // 将msgHandler节点添加到链表尾部
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 
     return RyanMqttSuccessError;
 }
@@ -433,9 +429,9 @@ RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttClient_t *client, RyanMqttMsgHa
     RyanMqttAssert(NULL != msgHandler);
     RyanMqttAssert(NULL != msgHandler->topic);
 
-    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
     RyanListDel(&msgHandler->list);
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 
     return RyanMqttSuccessError;
 }
@@ -465,7 +461,7 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes
     memset(ackHandler, 0, sizeof(RyanMqttAckHandler_t) + packetLen);
 
     RyanListInit(&ackHandler->list);
-    platformTimerCutdown(&ackHandler->timer, client->config->ackTimeout); // 超时内没有响应将被销毁或重新发送
+    platformTimerCutdown(&ackHandler->timer, client->config.ackTimeout); // 超时内没有响应将被销毁或重新发送
 
     ackHandler->repeatCount = 0;
     ackHandler->packetId = packetId;
@@ -473,7 +469,7 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes
     ackHandler->packetType = packetType;
     ackHandler->msgHandler = msgHandler;
     ackHandler->packet = (char *)ackHandler + sizeof(RyanMqttAckHandler_t);
-    memcpy(ackHandler->packet, client->config->sendBuffer, packetLen); // 将packet数据保存到ack中
+    memcpy(ackHandler->packet, client->config.sendBuffer, packetLen); // 将packet数据保存到ack中
 
     *pAckHandler = ackHandler;
 
@@ -548,12 +544,12 @@ RyanMqttError_e RyanMqttAckListAdd(RyanMqttClient_t *client, RyanMqttAckHandler_
     RyanMqttAssert(NULL != ackHandler->msgHandler->topic);
 
     // 将ack节点添加到链表尾部
-    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
     RyanListAddTail(&ackHandler->list, &client->ackHandlerList);
     client->ackHandlerCount++;
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 
-    if (client->ackHandlerCount >= client->config->ackHandlerCountWarning)
+    if (client->ackHandlerCount >= client->config.ackHandlerCountWarning)
         RyanMqttEventMachine(client, RyanMqttEventAckCountWarning, (void *)&client->ackHandlerCount);
 
     return RyanMqttSuccessError;
@@ -574,11 +570,11 @@ RyanMqttError_e RyanMqttAckListRemove(RyanMqttClient_t *client, RyanMqttAckHandl
     RyanMqttAssert(NULL != ackHandler->msgHandler->topic);
 
     // 将ack节点添加到链表尾部
-    platformCriticalEnter(client->config->userData, client->criticalLock);
+    platformCriticalEnter(client->config.userData, &client->criticalLock);
     RyanListDel(&ackHandler->list);
     if (client->ackHandlerCount > 0)
         client->ackHandlerCount--;
-    platformCriticalExit(client->config->userData, client->criticalLock);
+    platformCriticalExit(client->config.userData, &client->criticalLock);
 
     return RyanMqttSuccessError;
 }

+ 35 - 35
platform/linux/platformNetwork.c

@@ -21,41 +21,11 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
     RyanMqttError_e result = RyanMqttSuccessError;
 
     // ?线程安全版本,有些设备没有实现,默认不启用。如果涉及多个客户端解析域名请使用线程安全版本
-    // char buf[256];
-    // int ret;
-    // struct hostent hostinfo, *phost;
+    char buf[256];
+    int ret;
+    struct hostent hostinfo, *phost;
 
-    // if (0 != gethostbyname_r(host, &hostinfo, buf, sizeof(buf), &phost, &ret))
-    // {
-    //     result = RyanSocketFailedError;
-    //     goto exit;
-    // }
-
-    // platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
-    // if (platformNetwork->socket < 0)
-    // {
-    //     result = RyanSocketFailedError;
-    //     goto exit;
-    // }
-
-    // struct sockaddr_in server_addr;
-    // memset(&server_addr, 0, sizeof(server_addr));
-    // server_addr.sin_family = AF_INET;
-    // server_addr.sin_port = htons(atoi(port)); // 指定端口号,这里使用HTTP默认端口80
-    // server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
-
-    // // 绑定套接字到主机地址和端口号
-    // if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
-    // {
-    //     platformNetworkClose(userData, platformNetwork);
-    //     result = RyanMqttSocketConnectFailError;
-    //     goto exit;
-    // }
-
-    // 非线程安全版本,请根据实际情况选择使用
-    struct hostent *hostinfo;
-    hostinfo = gethostbyname(host);
-    if (NULL == hostinfo)
+    if (0 != gethostbyname_r(host, &hostinfo, buf, sizeof(buf), &phost, &ret))
     {
         result = RyanSocketFailedError;
         goto exit;
@@ -72,7 +42,7 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
     memset(&server_addr, 0, sizeof(server_addr));
     server_addr.sin_family = AF_INET;
     server_addr.sin_port = htons(atoi(port)); // 指定端口号,这里使用HTTP默认端口80
-    server_addr.sin_addr = *((struct in_addr *)hostinfo->h_addr_list[0]);
+    server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
 
     // 绑定套接字到主机地址和端口号
     if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
@@ -82,6 +52,36 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
         goto exit;
     }
 
+    // 非线程安全版本,请根据实际情况选择使用
+    // struct hostent *hostinfo;
+    // hostinfo = gethostbyname(host);
+    // if (NULL == hostinfo)
+    // {
+    //     result = RyanSocketFailedError;
+    //     goto exit;
+    // }
+
+    // platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+    // if (platformNetwork->socket < 0)
+    // {
+    //     result = RyanSocketFailedError;
+    //     goto exit;
+    // }
+
+    // struct sockaddr_in server_addr;
+    // memset(&server_addr, 0, sizeof(server_addr));
+    // server_addr.sin_family = AF_INET;
+    // server_addr.sin_port = htons(atoi(port)); // 指定端口号,这里使用HTTP默认端口80
+    // server_addr.sin_addr = *((struct in_addr *)hostinfo->h_addr_list[0]);
+
+    // // 绑定套接字到主机地址和端口号
+    // if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
+    // {
+    //     platformNetworkClose(userData, platformNetwork);
+    //     result = RyanMqttSocketConnectFailError;
+    //     goto exit;
+    // }
+
 exit:
     return result;
 }

+ 1 - 0
platform/linux/platformSystem.c

@@ -74,6 +74,7 @@ RyanMqttError_e platformThreadInit(void *userData,
         return RyanMqttNoRescourceError;
 
     pthread_mutex_init(&platformThread->mutex, NULL);
+    pthread_cond_init(&platformThread->cond, NULL);
 
     return RyanMqttSuccessError;
 }

+ 4 - 4
platform/quecOpen/platformSystem.c

@@ -7,7 +7,7 @@
  * @param size
  * @return void*
  */
-void *platformMemoryMalloc(size_t size)
+inline void *platformMemoryMalloc(size_t size)
 {
     return malloc(size);
 }
@@ -17,7 +17,7 @@ void *platformMemoryMalloc(size_t size)
  *
  * @param ptr
  */
-void platformMemoryFree(void *ptr)
+inline void platformMemoryFree(void *ptr)
 {
     free(ptr);
 }
@@ -27,7 +27,7 @@ void platformMemoryFree(void *ptr)
  *
  * @param ms
  */
-void platformDelay(uint32_t ms)
+inline void platformDelay(uint32_t ms)
 {
     osDelay(ms);
 }
@@ -38,7 +38,7 @@ void platformDelay(uint32_t ms)
  * @param str
  * @param strLen
  */
-void platformPrint(char *str, uint16_t strLen)
+inline void platformPrint(char *str, uint16_t strLen)
 {
     Ql_UART_Write((Enum_SerialPort)(UART_PORT0), (u8 *)(str), strLen);
 }

+ 1 - 1
platform/quecOpen/platformSystem.h

@@ -28,7 +28,7 @@ extern "C"
 
     typedef struct
     {
-
+        uint8_t invalid; // 不使用,避免报错
     } platformCritical_t;
 
     extern void *platformMemoryMalloc(size_t size);

+ 4 - 4
platform/rtthread/platformSystem.c

@@ -7,7 +7,7 @@
  * @param size
  * @return void*
  */
-void *platformMemoryMalloc(size_t size)
+inline void *platformMemoryMalloc(size_t size)
 {
     return rt_malloc(size);
 }
@@ -17,7 +17,7 @@ void *platformMemoryMalloc(size_t size)
  *
  * @param ptr
  */
-void platformMemoryFree(void *ptr)
+inline void platformMemoryFree(void *ptr)
 {
     rt_free(ptr);
 }
@@ -27,7 +27,7 @@ void platformMemoryFree(void *ptr)
  *
  * @param ms
  */
-void platformDelay(uint32_t ms)
+inline void platformDelay(uint32_t ms)
 {
     rt_thread_mdelay(ms);
 }
@@ -38,7 +38,7 @@ void platformDelay(uint32_t ms)
  * @param str
  * @param strLen
  */
-void platformPrint(char *str, uint16_t strLen)
+inline void platformPrint(char *str, uint16_t strLen)
 {
     rt_kprintf("%.*s", strLen, str);
 }

+ 1 - 0
platform/rtthread/platformSystem.h

@@ -31,6 +31,7 @@ extern "C"
 
     typedef struct
     {
+        uint8_t invalid; // 不使用,避免报错
     } platformCritical_t;
 
     extern void *platformMemoryMalloc(size_t size);

+ 5 - 5
test/RyanMqttTestLinux.c

@@ -168,10 +168,10 @@ static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void c
 
     case RyanMqttEventDestoryBefore:
         rlog_i("销毁mqtt客户端前回调");
-        free(client->config->sendBuffer);
-        free(client->config->recvBuffer);
-        if (client->config->userData)
-            sem_post((sem_t *)client->config->userData);
+        free(client->config.sendBuffer);
+        free(client->config.recvBuffer);
+        if (client->config.userData)
+            sem_post((sem_t *)client->config.userData);
         break;
 
     default:
@@ -251,7 +251,7 @@ static void RyanMqttInitSync(RyanMqttClient_t **client, RyanMqttBool_e syncFlag)
 
 static void RyanMqttDestorySync(RyanMqttClient_t *client)
 {
-    sem_t *sem = (sem_t *)client->config->userData;
+    sem_t *sem = (sem_t *)client->config.userData;
     // 启动mqtt客户端线程
     RyanMqttDestroy(client);