Quellcode durchsuchen

refactor: 优化

RyanCW vor 4 Monaten
Ursprung
Commit
d8f1140a5b

+ 0 - 22
coreMqtt/core_mqtt_serializer.h

@@ -44,28 +44,6 @@
 
 
 #include "core_mqtt_config_defaults.h"
 #include "core_mqtt_config_defaults.h"
 
 
-/* MQTT packet types. */
-
-/**
- * @addtogroup mqtt_constants
- * @{
- */
-#define MQTT_PACKET_TYPE_CONNECT        ( ( uint8_t ) 0x10U )  /**< @brief CONNECT (client-to-server). */
-#define MQTT_PACKET_TYPE_CONNACK        ( ( uint8_t ) 0x20U )  /**< @brief CONNACK (server-to-client). */
-#define MQTT_PACKET_TYPE_PUBLISH        ( ( uint8_t ) 0x30U )  /**< @brief PUBLISH (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBACK         ( ( uint8_t ) 0x40U )  /**< @brief PUBACK (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBREC         ( ( uint8_t ) 0x50U )  /**< @brief PUBREC (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBREL         ( ( uint8_t ) 0x62U )  /**< @brief PUBREL (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBCOMP        ( ( uint8_t ) 0x70U )  /**< @brief PUBCOMP (bidirectional). */
-#define MQTT_PACKET_TYPE_SUBSCRIBE      ( ( uint8_t ) 0x82U )  /**< @brief SUBSCRIBE (client-to-server). */
-#define MQTT_PACKET_TYPE_SUBACK         ( ( uint8_t ) 0x90U )  /**< @brief SUBACK (server-to-client). */
-#define MQTT_PACKET_TYPE_UNSUBSCRIBE    ( ( uint8_t ) 0xA2U )  /**< @brief UNSUBSCRIBE (client-to-server). */
-#define MQTT_PACKET_TYPE_UNSUBACK       ( ( uint8_t ) 0xB0U )  /**< @brief UNSUBACK (server-to-client). */
-#define MQTT_PACKET_TYPE_PINGREQ        ( ( uint8_t ) 0xC0U )  /**< @brief PINGREQ (client-to-server). */
-#define MQTT_PACKET_TYPE_PINGRESP       ( ( uint8_t ) 0xD0U )  /**< @brief PINGRESP (server-to-client). */
-#define MQTT_PACKET_TYPE_DISCONNECT     ( ( uint8_t ) 0xE0U )  /**< @brief DISCONNECT (client-to-server). */
-/** @} */
-
 /**
 /**
  * @ingroup mqtt_constants
  * @ingroup mqtt_constants
  * @brief The size of MQTT PUBACK, PUBREC, PUBREL, and PUBCOMP packets, per MQTT spec.
  * @brief The size of MQTT PUBACK, PUBREC, PUBREL, and PUBCOMP packets, per MQTT spec.

+ 69 - 96
mqttclient/RyanMqttClient.c

@@ -7,30 +7,6 @@
 #include "RyanMqttUtil.h"
 #include "RyanMqttUtil.h"
 #include "core_mqtt_serializer.h"
 #include "core_mqtt_serializer.h"
 
 
-/**
- * @brief 获取报文标识符,报文标识符不可为0
- * 都在sendbuf锁内调用
- * @param client
- * @return uint16_t
- */
-static uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
-{
-	uint16_t packetId;
-	RyanMqttAssert(NULL != client);
-	platformCriticalEnter(client->config.userData, &client->criticalLock);
-	if (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1)
-	{
-		client->packetId = 1;
-	}
-	else
-	{
-		client->packetId++;
-	}
-	packetId = client->packetId;
-	platformCriticalExit(client->config.userData, &client->criticalLock);
-	return packetId;
-}
-
 /**
 /**
  * @brief mqtt初始化
  * @brief mqtt初始化
  *
  *
@@ -196,7 +172,7 @@ RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e send
 
 
 		// 申请断开连接数据包的空间
 		// 申请断开连接数据包的空间
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
-		RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, RyanMqttLog_d);
+		RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 
 
 		// 序列化断开连接数据包
 		// 序列化断开连接数据包
 		status = MQTT_SerializeDisconnect(&fixedBuffer);
 		status = MQTT_SerializeDisconnect(&fixedBuffer);
@@ -272,14 +248,15 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	// 检查每个主题消息是否合法
 	// 检查每个主题消息是否合法
 	for (int32_t i = 0; i < count; i++)
 	for (int32_t i = 0; i < count; i++)
 	{
 	{
-		RyanMqttCheck(NULL != subscribeManyData[i].topic, RyanMqttParamInvalidError, RyanMqttLog_d);
+		RyanMqttCheck(NULL != subscribeManyData[i].topic && subscribeManyData[i].topicLen > 0,
+			      RyanMqttParamInvalidError, RyanMqttLog_d);
 		RyanMqttCheck(RyanMqttQos0 <= subscribeManyData[i].qos && RyanMqttQos2 >= subscribeManyData[i].qos,
 		RyanMqttCheck(RyanMqttQos0 <= subscribeManyData[i].qos && RyanMqttQos2 >= subscribeManyData[i].qos,
 			      RyanMqttParamInvalidError, RyanMqttLog_d);
 			      RyanMqttParamInvalidError, RyanMqttLog_d);
 	}
 	}
 
 
 	// 申请 coreMqtt 支持的topic格式空间
 	// 申请 coreMqtt 支持的topic格式空间
 	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
 	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
-	RyanMqttCheck(NULL != subscriptionList, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 	for (int32_t i = 0; i < count; i++)
 	for (int32_t i = 0; i < count; i++)
 	{
 	{
 		subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
 		subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
@@ -298,7 +275,7 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 
 
 		// 申请数据包的空间
 		// 申请数据包的空间
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
-		RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, RyanMqttLog_d,
+		RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
 				  { platformMemoryFree(subscriptionList); });
 				  { platformMemoryFree(subscriptionList); });
 
 
 		// 序列化数据包
 		// 序列化数据包
@@ -328,7 +305,7 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 			goto __RyanMqttSubCreateAckErrorExit;
 			goto __RyanMqttSubCreateAckErrorExit;
 		});
 		});
 
 
-		// 不会失败
+		// 此函数不会失败
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		continue;
 		continue;
 
 
@@ -393,6 +370,8 @@ __exit:
  */
  */
 RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
 RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
 {
 {
+	RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
+
 	RyanMqttSubscribeData_t subscribeManyData = {.qos = qos, .topic = topic, .topicLen = RyanMqttStrlen(topic)};
 	RyanMqttSubscribeData_t subscribeManyData = {.qos = qos, .topic = topic, .topicLen = RyanMqttStrlen(topic)};
 	return RyanMqttSubscribeMany(client, 1, &subscribeManyData);
 	return RyanMqttSubscribeMany(client, 1, &subscribeManyData);
 }
 }
@@ -423,12 +402,13 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	// 检查有效性
 	// 检查有效性
 	for (int32_t i = 0; i < count; i++)
 	for (int32_t i = 0; i < count; i++)
 	{
 	{
-		RyanMqttCheck(NULL != unSubscribeManyData[i].topic, RyanMqttParamInvalidError, RyanMqttLog_d);
+		RyanMqttCheck(NULL != unSubscribeManyData[i].topic && unSubscribeManyData[i].topicLen > 0,
+			      RyanMqttParamInvalidError, RyanMqttLog_d);
 	}
 	}
 
 
 	// 申请 coreMqtt 支持的topic格式空间
 	// 申请 coreMqtt 支持的topic格式空间
 	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
 	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
-	RyanMqttCheck(NULL != subscriptionList, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 	for (int32_t i = 0; i < count; i++)
 	for (int32_t i = 0; i < count; i++)
 	{
 	{
 		subscriptionList[i].qos = (MQTTQoS_t)RyanMqttQos0; // 无效数据,仅当占位符
 		subscriptionList[i].qos = (MQTTQoS_t)RyanMqttQos0; // 无效数据,仅当占位符
@@ -447,7 +427,7 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 
 
 		// 申请数据包的空间
 		// 申请数据包的空间
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
-		RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, RyanMqttLog_d,
+		RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
 				  { platformMemoryFree(subscriptionList); });
 				  { platformMemoryFree(subscriptionList); });
 
 
 		// 序列化数据包
 		// 序列化数据包
@@ -487,7 +467,7 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 			goto __RyanMqttUnSubCreateAckErrorExit;
 			goto __RyanMqttUnSubCreateAckErrorExit;
 		});
 		});
 
 
-		// 不会失败
+		// 此函数不会失败
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		continue;
 		continue;
 
 
@@ -522,6 +502,8 @@ __RyanMqttUnSubCreateAckErrorExit:
  */
  */
 RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
 RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
 {
 {
+	RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
+
 	RyanMqttUnSubscribeData_t subscribeManyData = {.topic = topic, .topicLen = RyanMqttStrlen(topic)};
 	RyanMqttUnSubscribeData_t subscribeManyData = {.topic = topic, .topicLen = RyanMqttStrlen(topic)};
 	return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
 	return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
 }
 }
@@ -566,7 +548,7 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 
 
 	// 申请数据包的空间
 	// 申请数据包的空间
 	fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 	fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
-	RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, RyanMqttLog_d);
+	RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 
 
 	// qos0不需要 packetId
 	// qos0不需要 packetId
 	if (RyanMqttQos0 == qos)
 	if (RyanMqttQos0 == qos)
@@ -636,23 +618,9 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
 RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
 				RyanMqttQos_e qos, RyanMqttBool_e retain)
 				RyanMqttQos_e qos, RyanMqttBool_e retain)
 {
 {
-	return RyanMqttPublishAndUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain, NULL);
-}
-
-/**
- * @brief 获取mqtt客户端状态
- *
- * @param client
- * @return RyanMqttState_e
- */
-RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
-{
-	if (NULL == client)
-	{
-		return RyanMqttInvalidState;
-	}
+	RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
 
 
-	return RyanMqttGetClientState(client);
+	return RyanMqttPublishAndUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain, NULL);
 }
 }
 
 
 /**
 /**
@@ -780,7 +748,7 @@ RyanMqttError_e RyanMqttSafeFreeSubscribeResources(RyanMqttMsgHandler_t *msgHand
 {
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
-	// RyanMqttGetSubscribeSafe 返回地址的话 subscribeNum 一定大于0
+	// RyanMqttGetSubscribeSafe 返回地址的话 subscribeNum 一定大于0,这里就是要判断大于0才行
 	RyanMqttCheck(subscribeNum > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(subscribeNum > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
 
 
 	for (int32_t i = 0; i < subscribeNum; i++)
 	for (int32_t i = 0; i < subscribeNum; i++)
@@ -818,13 +786,53 @@ RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t
 	return RyanMqttSuccessError;
 	return RyanMqttSuccessError;
 }
 }
 
 
-static RyanMqttError_e RyanMqttConfigCopy(RyanMqttClientConfig_t *destConfig, RyanMqttClientConfig_t *srcConfig)
+/**
+ * @brief 获取mqtt客户端状态
+ *
+ * @param client
+ * @return RyanMqttState_e
+ */
+RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
 {
 {
-	RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
+	if (NULL == client)
+	{
+		return RyanMqttInvalidState;
+	}
+
+	return RyanMqttGetClientState(client);
+}
 
 
+/**
+ * @brief 获取 keepalive 剩余时间
+ *
+ * @param client
+ * @param keepAliveRemain
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain)
+{
+	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(NULL != keepAliveRemain, RyanMqttParamInvalidError, RyanMqttLog_d);
+
+	platformCriticalEnter(client->config.userData, &client->criticalLock);
+	*keepAliveRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
+	platformCriticalExit(client->config.userData, &client->criticalLock);
+	return RyanMqttSuccessError;
+}
+
+static RyanMqttError_e RyanMqttClientConfigDeepCopy(RyanMqttClientConfig_t *destConfig,
+						    RyanMqttClientConfig_t *srcConfig)
+{
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
+	RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
 
 
-	RyanMqttMemset(destConfig, 0, sizeof(RyanMqttClientConfig_t));
+	RyanMqttMemcpy(destConfig, srcConfig, sizeof(RyanMqttClientConfig_t));
+
+	destConfig->clientId = NULL;
+	destConfig->userName = NULL;
+	destConfig->password = NULL;
+	destConfig->host = NULL;
+	destConfig->taskName = NULL;
 
 
 	result = RyanMqttStringCopy(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
 	result = RyanMqttStringCopy(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
@@ -849,27 +857,6 @@ static RyanMqttError_e RyanMqttConfigCopy(RyanMqttClientConfig_t *destConfig, Ry
 	result = RyanMqttStringCopy(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
 	result = RyanMqttStringCopy(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
 
-	// ?不使用memset是因为下面 RyanMqttPurgeConfig 可能会误判导致错误释放空间
-	destConfig->port = srcConfig->port;
-	destConfig->mqttVersion = srcConfig->mqttVersion;
-
-	destConfig->taskPrio = srcConfig->taskPrio;
-	destConfig->taskStack = srcConfig->taskStack;
-
-	destConfig->autoReconnectFlag = srcConfig->autoReconnectFlag;
-	destConfig->cleanSessionFlag = srcConfig->cleanSessionFlag;
-
-	destConfig->ackHandlerRepeatCountWarning = srcConfig->ackHandlerRepeatCountWarning;
-	destConfig->ackHandlerCountWarning = srcConfig->ackHandlerCountWarning;
-
-	destConfig->recvTimeout = srcConfig->recvTimeout;
-	destConfig->sendTimeout = srcConfig->sendTimeout;
-	destConfig->ackTimeout = srcConfig->ackTimeout;
-	destConfig->keepaliveTimeoutS = srcConfig->keepaliveTimeoutS;
-	destConfig->reconnectTimeout = srcConfig->reconnectTimeout;
-	destConfig->mqttEventHandle = srcConfig->mqttEventHandle;
-	destConfig->userData = srcConfig->userData;
-
 __exit:
 __exit:
 	if (RyanMqttSuccessError != result)
 	if (RyanMqttSuccessError != result)
 	{
 	{
@@ -894,13 +881,13 @@ RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
 	RyanMqttClientConfig_t *clientConfig;
 	RyanMqttClientConfig_t *clientConfig;
 
 
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
-	RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
 
 
 	clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
 	clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
 	RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 
 
-	result = RyanMqttConfigCopy(clientConfig, &client->config);
+	result = RyanMqttClientConfigDeepCopy(clientConfig, &client->config);
 	RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 	RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
 
 	if (RyanMqttSuccessError == result)
 	if (RyanMqttSuccessError == result)
@@ -955,6 +942,7 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
 
 
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
 	RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
@@ -964,7 +952,7 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
 	RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
 
 
 	RyanMqttClientConfig_t tempConfig;
 	RyanMqttClientConfig_t tempConfig;
-	result = RyanMqttConfigCopy(&tempConfig, clientConfig);
+	result = RyanMqttClientConfigDeepCopy(&tempConfig, clientConfig);
 	RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 	RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
 
 	if (RyanMqttSuccessError == result)
 	if (RyanMqttSuccessError == result)
@@ -986,6 +974,7 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
  * @param qos
  * @param qos
  * @param retain
  * @param retain
  * @param payload
  * @param payload
+ * @param payloadLen
  * @return RyanMqttError_e
  * @return RyanMqttError_e
  */
  */
 RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
 RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
@@ -1001,7 +990,7 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
 	RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, RyanMqttLog_d);
 
 
 	// 报文支持有效载荷长度为0
 	// 报文支持有效载荷长度为0
-	if (payloadLen > 0 && NULL == payload)
+	if (NULL == payload && payloadLen > 0)
 	{
 	{
 		return RyanMqttParamInvalidError;
 		return RyanMqttParamInvalidError;
 	}
 	}
@@ -1083,16 +1072,10 @@ RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, uint8_t pack
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttAckHandler_t *ackHandler;
 	RyanMqttAckHandler_t *ackHandler;
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
-	RyanMqttCheck(0 < packetId, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(0 < packetId && packetId <= RyanMqttMaxPacketId, RyanMqttParamInvalidError, RyanMqttLog_d);
 
 
 	// 删除pubrel记录
 	// 删除pubrel记录
-	platformMutexLock(client->config.userData, &client->ackHandleLock);
 	result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
 	result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
-	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-
-__exit:
-	platformMutexUnLock(client->config.userData, &client->ackHandleLock);
-
 	if (RyanMqttSuccessError == result)
 	if (RyanMqttSuccessError == result)
 	{
 	{
 		RyanMqttEventMachine(client, RyanMqttEventAckHandlerDiscard, (void *)ackHandler); // 回调函数
 		RyanMqttEventMachine(client, RyanMqttEventAckHandlerDiscard, (void *)ackHandler); // 回调函数
@@ -1120,13 +1103,3 @@ RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_
 	platformCriticalExit(client->config.userData, &client->criticalLock);
 	platformCriticalExit(client->config.userData, &client->criticalLock);
 	return RyanMqttSuccessError;
 	return RyanMqttSuccessError;
 }
 }
-
-RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain)
-{
-	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
-
-	platformCriticalEnter(client->config.userData, &client->criticalLock);
-	*keepAliveRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
-	platformCriticalExit(client->config.userData, &client->criticalLock);
-	return RyanMqttSuccessError;
-}

+ 3 - 2
mqttclient/RyanMqttThread.c

@@ -48,6 +48,7 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 	if (timeRemain > (uint32_t)(client->config.recvTimeout + 100))
 	if (timeRemain > (uint32_t)(client->config.recvTimeout + 100))
 	{
 	{
 		// 当没有到达 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
 		// 当没有到达 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
+		// timeRemain 是剩余时间,所以 timeRemain > RyanMqttKeepAliveMultiplier - 0.9 就是还没有到达0.9倍时间
 		if (timeRemain > client->config.keepaliveTimeoutS * 1000 * (RyanMqttKeepAliveMultiplier - 0.9))
 		if (timeRemain > client->config.keepaliveTimeoutS * 1000 * (RyanMqttKeepAliveMultiplier - 0.9))
 		{
 		{
 			return RyanMqttSuccessError;
 			return RyanMqttSuccessError;
@@ -300,8 +301,8 @@ static RyanMqttError_e RyanMqttConnectBroker(RyanMqttClient_t *client, RyanMqttC
 
 
 	// 申请数据包的空间
 	// 申请数据包的空间
 	fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 	fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
-	RyanMqttCheckCodeNoReturn(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, RyanMqttLog_d, {
-		result = RyanMqttNoRescourceError;
+	RyanMqttCheckCodeNoReturn(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
+		result = RyanMqttNotEnoughMemError;
 		*connectState = RyanMqttConnectFailedError;
 		*connectState = RyanMqttConnectFailedError;
 		goto __exit;
 		goto __exit;
 	});
 	});

+ 2 - 1
mqttclient/RyanMqttThreadProcessPacket.c

@@ -477,7 +477,8 @@ RyanMqttError_e RyanMqttGetPacketInfo(RyanMqttClient_t *client, MQTTPacketInfo_t
 		if (pIncomingPacket->remainingLength > 0)
 		if (pIncomingPacket->remainingLength > 0)
 		{
 		{
 			pIncomingPacket->pRemainingData = platformMemoryMalloc(pIncomingPacket->remainingLength);
 			pIncomingPacket->pRemainingData = platformMemoryMalloc(pIncomingPacket->remainingLength);
-			RyanMqttCheck(NULL != pIncomingPacket->pRemainingData, RyanMqttNoRescourceError, RyanMqttLog_d);
+			RyanMqttCheck(NULL != pIncomingPacket->pRemainingData, RyanMqttNotEnoughMemError,
+				      RyanMqttLog_d);
 		}
 		}
 	}
 	}
 	else if (MQTTNoDataAvailable == status)
 	else if (MQTTNoDataAvailable == status)

+ 24 - 0
mqttclient/RyanMqttUtil.c

@@ -326,6 +326,30 @@ uint32_t RyanMqttTimerRemain(RyanMqttTimer_t *platformTimer)
 	return platformTimer->timeOut - elapsed;
 	return platformTimer->timeOut - elapsed;
 }
 }
 
 
+/**
+ * @brief 获取报文标识符,报文标识符不可为0
+ * 都在sendbuf锁内调用
+ * @param client
+ * @return uint16_t
+ */
+uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
+{
+	uint16_t packetId;
+	RyanMqttAssert(NULL != client);
+	platformCriticalEnter(client->config.userData, &client->criticalLock);
+	if (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1)
+	{
+		client->packetId = 1;
+	}
+	else
+	{
+		client->packetId++;
+	}
+	packetId = client->packetId;
+	platformCriticalExit(client->config.userData, &client->criticalLock);
+	return packetId;
+}
+
 const char *RyanMqttStrError(int32_t state)
 const char *RyanMqttStrError(int32_t state)
 {
 {
 	const char *str;
 	const char *str;

+ 22 - 0
mqttclient/include/RyanMqttPlatform.h

@@ -37,6 +37,28 @@ extern "C" {
 #define RyanMqttVsnprintf vsnprintf
 #define RyanMqttVsnprintf vsnprintf
 #endif
 #endif
 
 
+/* MQTT packet types. */
+
+/**
+ * @addtogroup mqtt_constants
+ * @{
+ */
+#define MQTT_PACKET_TYPE_CONNECT        ( ( uint8_t ) 0x10U )  /**< @brief CONNECT (client-to-server). */
+#define MQTT_PACKET_TYPE_CONNACK        ( ( uint8_t ) 0x20U )  /**< @brief CONNACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PUBLISH        ( ( uint8_t ) 0x30U )  /**< @brief PUBLISH (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBACK         ( ( uint8_t ) 0x40U )  /**< @brief PUBACK (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBREC         ( ( uint8_t ) 0x50U )  /**< @brief PUBREC (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBREL         ( ( uint8_t ) 0x62U )  /**< @brief PUBREL (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBCOMP        ( ( uint8_t ) 0x70U )  /**< @brief PUBCOMP (bidirectional). */
+#define MQTT_PACKET_TYPE_SUBSCRIBE      ( ( uint8_t ) 0x82U )  /**< @brief SUBSCRIBE (client-to-server). */
+#define MQTT_PACKET_TYPE_SUBACK         ( ( uint8_t ) 0x90U )  /**< @brief SUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_UNSUBSCRIBE    ( ( uint8_t ) 0xA2U )  /**< @brief UNSUBSCRIBE (client-to-server). */
+#define MQTT_PACKET_TYPE_UNSUBACK       ( ( uint8_t ) 0xB0U )  /**< @brief UNSUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PINGREQ        ( ( uint8_t ) 0xC0U )  /**< @brief PINGREQ (client-to-server). */
+#define MQTT_PACKET_TYPE_PINGRESP       ( ( uint8_t ) 0xD0U )  /**< @brief PINGRESP (server-to-client). */
+#define MQTT_PACKET_TYPE_DISCONNECT     ( ( uint8_t ) 0xE0U )  /**< @brief DISCONNECT (client-to-server). */
+/** @} */
+
 // RyanMqttT内部imer接口
 // RyanMqttT内部imer接口
 typedef struct
 typedef struct
 {
 {

+ 2 - 0
mqttclient/include/RyanMqttUtil.h

@@ -63,6 +63,8 @@ extern RyanMqttError_e RyanMqttAckListAddToUserAckList(RyanMqttClient_t *client,
 extern RyanMqttError_e RyanMqttAckListRemoveToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern RyanMqttError_e RyanMqttAckListRemoveToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern void RyanMqttClearAckSession(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId);
 extern void RyanMqttClearAckSession(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId);
 
 
+extern uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 1 - 1
test/RyanMqttMultiThreadMultiClientTest.c

@@ -100,7 +100,7 @@ static void *concurrentPublishThread(void *arg)
 			RyanMqttLog_e("Thread %d: Failed to publish message %d", testData->threadIndex, i);
 			RyanMqttLog_e("Thread %d: Failed to publish message %d", testData->threadIndex, i);
 		}
 		}
 
 
-		delay_us(900);
+		delay_us(1100);
 	}
 	}
 
 
 	// 等待消息处理完成
 	// 等待消息处理完成

+ 19 - 4
test/RyanMqttMultiThreadSafetyTest.c

@@ -138,8 +138,7 @@ static void *concurrentPublishThread(void *arg)
 
 
 	// 订阅主题
 	// 订阅主题
 	RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
 	RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
-	result = RyanMqttSubscribe(g_testControl.client, topic, RyanMqttQos2);
-	// result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
+	result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
 	if (RyanMqttSuccessError != result)
 	if (RyanMqttSuccessError != result)
 	{
 	{
 		RyanMqttLog_e("Thread %d: Failed to subscribe", threadIndex);
 		RyanMqttLog_e("Thread %d: Failed to subscribe", threadIndex);
@@ -169,7 +168,7 @@ static void *concurrentPublishThread(void *arg)
 			}
 			}
 		}
 		}
 
 
-		delay_us(900); // 电脑配置不一样需要的时间也就不一样
+		delay_us(1100); // 电脑配置不一样需要的时间也就不一样
 	}
 	}
 
 
 	// 等待消息处理完成
 	// 等待消息处理完成
@@ -212,9 +211,25 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 	// 创建测试线程
 	// 创建测试线程
 	for (int i = 0; i < CONCURRENT_CLIENTS; i++)
 	for (int i = 0; i < CONCURRENT_CLIENTS; i++)
 	{
 	{
+		// struct sched_param param;
 
 
-		if (pthread_create(&g_threadTestData[i].threadId, NULL, concurrentPublishThread, NULL) != 0)
+		// pthread_attr_init(&g_threadTestData[i].attr);
+
+		// // 设置调度策略为实时策略
+		// pthread_attr_setschedpolicy(&g_threadTestData[i].attr, SCHED_FIFO);
+
+		// // 获取该策略的最大优先级
+		// int max_prio = sched_get_priority_max(SCHED_FIFO);
+		// param.sched_priority = max_prio;
+
+		// // 设置优先级
+		// pthread_attr_setschedparam(&g_threadTestData[i].attr, &param);
+
+		int result222 = pthread_create(&g_threadTestData[i].threadId, NULL, concurrentPublishThread, NULL);
+		// pthread_attr_destroy(&g_threadTestData[i].attr);
+		if (result222 != 0)
 		{
 		{
+
 			RyanMqttLog_e("Failed to create thread %d", i);
 			RyanMqttLog_e("Failed to create thread %d", i);
 			result = RyanMqttFailedError;
 			result = RyanMqttFailedError;
 			goto cleanup;
 			goto cleanup;

+ 516 - 0
test/RyanMqttPublicApiParamCheckTest.c

@@ -0,0 +1,516 @@
+#include "RyanMqttTest.h"
+
+static RyanMqttQos_e invaildQos()
+{
+	static bool aa = true;
+
+	if (aa)
+	{
+		aa = false;
+		// NOLINTNEXTLINE(clang-analyzer-optin.core.EnumCastOutOfRange)
+		return (RyanMqttQos_e)(uintptr_t)10;
+	}
+
+	aa = false;
+	return RyanMqttSubFail;
+}
+
+static RyanMqttError_e RyanMqttBaseApiParamCheckTest(void)
+{
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	RyanMqttClient_t *validClient = NULL;
+	// 准备一个有效的客户端用于某些测试
+	result = RyanMqttTestInit(&validClient, RyanMqttTrue, RyanMqttFalse, 120, NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	result = RyanMqttInit(NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	result = RyanMqttDestroy(NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// todo 可能增加状态判断
+	result = RyanMqttStart(NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// todo 可能增加状态判断
+	result = RyanMqttDisconnect(NULL, RyanMqttTrue);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// todo 可能增加状态判断
+	result = RyanMqttReconnect(NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	RyanMqttState_e state = RyanMqttGetState(NULL);
+	RyanMqttCheckCodeNoReturn(state == RyanMqttInvalidState, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
+
+	result = RyanMqttGetKeepAliveRemain(NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	uint32_t keepAliveRemain;
+	// NULL客户端指针
+	result = RyanMqttGetKeepAliveRemain(NULL, &keepAliveRemain);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL剩余时间指针
+	result = RyanMqttGetKeepAliveRemain(validClient, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttDiscardAckHandler(NULL, MQTT_PACKET_TYPE_PUBACK, 1);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效包ID (0 是无效的)
+	result = RyanMqttDiscardAckHandler(validClient, MQTT_PACKET_TYPE_PUBACK, 0);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 超过最大包ID
+	result = RyanMqttDiscardAckHandler(validClient, MQTT_PACKET_TYPE_PUBACK, RyanMqttMaxPacketId + 1);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttRegisterEventId(NULL, RyanMqttEventConnected);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttCancelEventId(NULL, RyanMqttEventConnected);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttSuccessError;
+
+__exit:
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttFailedError;
+}
+
+static RyanMqttError_e RyanMqttLwtApiParamCheckTest(void)
+{
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	RyanMqttClient_t *validClient = NULL;
+	// 准备一个有效的客户端用于某些测试
+	result = RyanMqttTestInit(&validClient, RyanMqttTrue, RyanMqttFalse, 120, NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	RyanMqttDisconnect(validClient, RyanMqttTrue);
+
+	// 增加一个get接口
+	// NULL客户端指针
+	result = RyanMqttSetLwt(NULL, "test/lwt", "offline", 7, RyanMqttQos1, RyanMqttTrue);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL主题指针
+	result = RyanMqttSetLwt(validClient, NULL, "offline", 7, RyanMqttQos1, RyanMqttTrue);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL负载但长度不为0
+	result = RyanMqttSetLwt(validClient, "test/lwt", NULL, 7, RyanMqttQos1, RyanMqttTrue);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// payloadLen 超长
+	result = RyanMqttSetLwt(validClient, "test/lwt", "offline", RyanMqttMaxPayloadLen + 1, RyanMqttQos1,
+				RyanMqttTrue);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效QoS
+	result = RyanMqttSetLwt(validClient, "test/lwt", "offline", 7, invaildQos(), RyanMqttTrue);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效retain
+	// NOLINTNEXTLINE(clang-analyzer-optin.core.EnumCastOutOfRange)
+	result = RyanMqttSetLwt(validClient, "test/lwt", "offline", 7, RyanMqttQos1, 200);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttSuccessError;
+
+__exit:
+	RyanMqttLog_e("result: %d", result);
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+
+	return RyanMqttFailedError;
+}
+
+static RyanMqttError_e RyanMqttConfigApiParamCheckTest(void)
+{
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	RyanMqttClient_t *validClient = NULL;
+	// 准备一个有效的客户端用于某些测试
+	result = RyanMqttTestInit(&validClient, RyanMqttTrue, RyanMqttFalse, 120, NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	RyanMqttDisconnect(validClient, RyanMqttTrue);
+
+	// NULL客户端指针
+	result = RyanMqttGetConfig(NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	RyanMqttClientConfig_t *getConfig = NULL;
+
+	/**
+	 * @brief RyanMqttGetConfig
+	 *
+	 */
+	// NULL客户端指针
+	result = RyanMqttGetConfig(NULL, &getConfig);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL配置指针
+	result = RyanMqttGetConfig(validClient, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL配置指针
+	result = RyanMqttFreeConfigFromGet(NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	/**
+	 * @brief RyanMqttSetConfig
+	 *
+	 */
+	RyanMqttClientConfig_t baseMqttConfig = {.clientId = RyanMqttClientId,
+						 .userName = RyanMqttUserName,
+						 .password = RyanMqttPassword,
+						 .host = RyanMqttHost,
+						 .port = RyanMqttPort,
+						 .taskName = "mqttThread",
+						 .taskPrio = 16,
+						 .taskStack = 4096,
+						 .mqttVersion = 4,
+						 .ackHandlerRepeatCountWarning = 6,
+						 .ackHandlerCountWarning = 60000,
+						 .autoReconnectFlag = RyanMqttTrue,
+						 .cleanSessionFlag = RyanMqttTrue,
+						 .reconnectTimeout = 3000,
+						 .recvTimeout = 2000,
+						 .sendTimeout = 1800,
+						 .ackTimeout = 10000,
+						 .keepaliveTimeoutS = 120,
+						 .mqttEventHandle = NULL,
+						 .userData = NULL};
+
+	RyanMqttClientConfig_t mqttConfig = {0};
+
+	// NULL客户端指针
+	result = RyanMqttSetConfig(NULL, &mqttConfig);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL配置指针
+	result = RyanMqttSetConfig(validClient, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+#define checkSetConfigParam(code)                                                                                      \
+	do                                                                                                             \
+	{                                                                                                              \
+		RyanMqttMemcpy(&mqttConfig, &baseMqttConfig, sizeof(RyanMqttClientConfig_t));                          \
+		{code};                                                                                                \
+		result = RyanMqttSetConfig(validClient, &mqttConfig);                                                  \
+		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e,                  \
+					  { goto __exit; });                                                           \
+	} while (0)
+
+	checkSetConfigParam({ mqttConfig.clientId = NULL; });
+	checkSetConfigParam({ mqttConfig.host = NULL; });
+	checkSetConfigParam({ mqttConfig.taskName = NULL; });
+	checkSetConfigParam({
+		mqttConfig.recvTimeout = 10;
+		mqttConfig.keepaliveTimeoutS = 20 - 1;
+	});
+	checkSetConfigParam({
+		mqttConfig.recvTimeout = 10;
+		mqttConfig.sendTimeout = 11;
+	});
+
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttSuccessError;
+
+__exit:
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+
+	return RyanMqttFailedError;
+}
+
+static RyanMqttError_e RyanMqttSubApiParamCheckTest(void)
+{
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	RyanMqttClient_t *validClient = NULL;
+	// 准备一个有效的客户端用于某些测试
+	result = RyanMqttTestInit(&validClient, RyanMqttTrue, RyanMqttFalse, 120, NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttSubscribe(NULL, "test/topic", RyanMqttQos1);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL主题指针
+	result = RyanMqttSubscribe(validClient, NULL, RyanMqttQos1);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效QoS级别
+	result = RyanMqttSubscribe(validClient, "test/topic", invaildQos());
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 分配测试数据
+	RyanMqttSubscribeData_t subscribeData[2] = {0};
+	subscribeData[0].topic = "test/topic1";
+	subscribeData[0].topicLen = strlen("test/topic1");
+	subscribeData[0].qos = RyanMqttQos1;
+	subscribeData[1].topic = "test/topic2";
+	subscribeData[1].topicLen = strlen("test/topic2");
+	subscribeData[1].qos = RyanMqttQos2;
+	// NULL客户端指针
+	result = RyanMqttSubscribeMany(NULL, 2, subscribeData);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效数量
+	result = RyanMqttSubscribeMany(validClient, 0, subscribeData);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	result = RyanMqttSubscribeMany(validClient, -1, subscribeData);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL数据指针
+	result = RyanMqttSubscribeMany(validClient, 2, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	for (uint32_t i = 0; i < getArraySize(subscribeData); i++)
+	{
+		// subscribeData 内数据无效
+		subscribeData[i].topic = NULL;
+		subscribeData[i].topicLen = strlen("test/topic2");
+		subscribeData[i].qos = RyanMqttQos2;
+		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+		// subscribeData 内数据无效
+		subscribeData[i].topic = "test/topic2";
+		subscribeData[i].topicLen = 0;
+		subscribeData[i].qos = RyanMqttQos2;
+		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+		// subscribeData 内数据无效
+		subscribeData[i].topic = "test/topic2";
+		subscribeData[i].topicLen = strlen("test/topic2");
+		subscribeData[i].qos = invaildQos();
+		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+		// 恢复 subscribeData 内数据
+		subscribeData[i].topic = "test/topic2";
+		subscribeData[i].topicLen = strlen("test/topic2");
+		subscribeData[i].qos = RyanMqttQos2;
+	}
+
+	// NULL客户端指针
+	result = RyanMqttUnSubscribe(NULL, "test/topic");
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL主题指针
+	result = RyanMqttUnSubscribe(validClient, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 分配测试数据
+	RyanMqttUnSubscribeData_t unsubscribeData[2] = {0};
+	unsubscribeData[0].topic = "test/topic1";
+	unsubscribeData[0].topicLen = strlen("test/topic1");
+	unsubscribeData[1].topic = "test/topic2";
+	unsubscribeData[1].topicLen = strlen("test/topic2");
+
+	// NULL客户端指针
+	result = RyanMqttUnSubscribeMany(NULL, 2, unsubscribeData);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效数量
+	result = RyanMqttUnSubscribeMany(validClient, 0, unsubscribeData);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL数据指针
+	result = RyanMqttUnSubscribeMany(validClient, 2, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	for (uint32_t i = 0; i < getArraySize(subscribeData); i++)
+	{
+		// subscribeData 内数据无效
+		subscribeData[i].topic = NULL;
+		subscribeData[i].topicLen = strlen("test/topic2");
+
+		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+		// subscribeData 内数据无效
+		subscribeData[i].topic = "test/topic2";
+		subscribeData[i].topicLen = 0;
+		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+		// 恢复 subscribeData 内数据
+		subscribeData[i].topic = "test/topic2";
+		subscribeData[i].topicLen = strlen("test/topic2");
+	}
+
+	RyanMqttMsgHandler_t *msgHandles = NULL;
+	int32_t subscribeNum = 0;
+	int32_t totalCount = 0;
+	// NULL客户端指针
+	result = RyanMqttGetSubscribeSafe(NULL, &msgHandles, &subscribeNum);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL消息句柄指针
+	result = RyanMqttGetSubscribeSafe(validClient, NULL, &subscribeNum);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL数量指针
+	result = RyanMqttGetSubscribeSafe(validClient, &msgHandles, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL指针但数量不为0
+	result = RyanMqttSafeFreeSubscribeResources(NULL, 5);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// !一定失败否则要报错
+	result = RyanMqttSafeFreeSubscribeResources((void *)&msgHandles, 0);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttGetSubscribeTotalCount(NULL, &totalCount);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL总数指针
+	result = RyanMqttGetSubscribeTotalCount(validClient, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e,
+				  { goto __exit; }); // 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttSuccessError;
+
+__exit:
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+
+	return RyanMqttFailedError;
+}
+
+static RyanMqttError_e RyanMqttPubApiParamCheckTest(void)
+{
+
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	RyanMqttClient_t *validClient = NULL;
+	// 准备一个有效的客户端用于某些测试
+	result = RyanMqttTestInit(&validClient, RyanMqttTrue, RyanMqttFalse, 120, NULL, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttPublish(NULL, "test/topic", "payload", 7, RyanMqttQos0, RyanMqttFalse);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL主题指针
+	result = RyanMqttPublish(validClient, NULL, "payload", 7, RyanMqttQos0, RyanMqttFalse);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL负载指针但长度不为0
+	result = RyanMqttPublish(validClient, "test/topic", NULL, 7, RyanMqttQos0, RyanMqttFalse);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效QoS级别
+	result = RyanMqttPublish(validClient, "test/topic", "payload", 7, invaildQos(), RyanMqttFalse);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 超大负载长度
+	result = RyanMqttPublish(validClient, "test/topic", "payload", RyanMqttMaxPayloadLen + 1, RyanMqttQos0,
+				 RyanMqttFalse);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL客户端指针
+	result = RyanMqttPublishAndUserData(NULL, "test/topic", 10, "payload", 7, RyanMqttQos1, RyanMqttFalse, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// NULL主题指针
+	result = RyanMqttPublishAndUserData(validClient, NULL, 10, "payload", 7, RyanMqttQos1, RyanMqttFalse, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 主题长度为0
+	result = RyanMqttPublishAndUserData(validClient, "test/topic", 0, "payload", 7, RyanMqttQos1, RyanMqttFalse,
+					    NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
+
+	// 无效QoS级别
+	result = RyanMqttPublishAndUserData(validClient, "test/topic", strlen("test/topic"), "payload", 7, invaildQos(),
+					    RyanMqttFalse, NULL);
+	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e,
+				  { goto __exit; }); // 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttSuccessError;
+
+__exit:
+	// 清理资源
+	if (validClient)
+	{
+		RyanMqttTestDestroyClient(validClient);
+	}
+	return RyanMqttFailedError;
+}
+
+RyanMqttError_e RyanMqttPublicApiParamCheckTest(void)
+{
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	result = RyanMqttBaseApiParamCheckTest();
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	checkMemory;
+
+	result = RyanMqttLwtApiParamCheckTest();
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	checkMemory;
+
+	result = RyanMqttConfigApiParamCheckTest();
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	checkMemory;
+
+	result = RyanMqttSubApiParamCheckTest();
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	checkMemory;
+
+	result = RyanMqttPubApiParamCheckTest();
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_e, { goto __exit; });
+	checkMemory;
+
+	return RyanMqttSuccessError;
+
+__exit:
+	return RyanMqttFailedError;
+}

+ 17 - 1
test/RyanMqttTest.c

@@ -172,7 +172,7 @@ RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncF
 					     .cleanSessionFlag = RyanMqttTrue,
 					     .cleanSessionFlag = RyanMqttTrue,
 					     .reconnectTimeout = 3000,
 					     .reconnectTimeout = 3000,
 					     .recvTimeout = 2000,
 					     .recvTimeout = 2000,
-					     .sendTimeout = 1000,
+					     .sendTimeout = 1800,
 					     .ackTimeout = 10000,
 					     .ackTimeout = 10000,
 					     .keepaliveTimeoutS = keepaliveTimeoutS,
 					     .keepaliveTimeoutS = keepaliveTimeoutS,
 					     .mqttEventHandle =
 					     .mqttEventHandle =
@@ -299,11 +299,27 @@ void RyanMqttTestExitCritical(void)
 // !当测试程序出错时,并不会回收内存。交由父进程进行回收
 // !当测试程序出错时,并不会回收内存。交由父进程进行回收
 int main(void)
 int main(void)
 {
 {
+
+	// 多线程测试必须设置这个,否则会导致 heap-use-after-free, 原因如下
+	// 虽然也有办法解决,不过RyanMqtt目标为嵌入式场景,不想引入需要更多资源的逻辑,嵌入式场景目前想不到有这么频繁而且还是本机emqx的场景。
+
+	// 用户线程send -> emqx回复报文 -> mqtt线程recv。
+	// recv线程收到数据后,会释放用户线程send的sendbuf缓冲区。
+	// 但是在本机部署的emqx并且多核心同时运行,发送的数据量非常大的情况下会出现mqtt线程recv已经收到数据,但是用户线程send函数还没有返回。
+	cpu_set_t cpuset;
+	CPU_ZERO(&cpuset);
+	CPU_SET(0, &cpuset);
+	pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
+	sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
+
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
 	vallocInit();
 	vallocInit();
 
 
 	pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
 	pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
 
 
+	result = RyanMqttPublicApiParamCheckTest();
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
+
 	result = RyanMqttSubTest();
 	result = RyanMqttSubTest();
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
 
 

+ 3 - 0
test/RyanMqttTest.h

@@ -5,6 +5,7 @@
 extern "C" {
 extern "C" {
 #endif
 #endif
 
 
+#define _GNU_SOURCE // 必须放在所有头文件之前
 #include <stdio.h>
 #include <stdio.h>
 #include <stdint.h>
 #include <stdint.h>
 #include <string.h>
 #include <string.h>
@@ -13,6 +14,7 @@ extern "C" {
 #include <unistd.h>
 #include <unistd.h>
 #include <semaphore.h>
 #include <semaphore.h>
 #include <pthread.h>
 #include <pthread.h>
+#include <sched.h>
 #include "valloc.h"
 #include "valloc.h"
 #define malloc  v_malloc
 #define malloc  v_malloc
 #define calloc  v_calloc
 #define calloc  v_calloc
@@ -82,6 +84,7 @@ extern RyanMqttError_e RyanMqttSubTest(void);
 extern RyanMqttError_e RyanMqttWildcardTest(void);
 extern RyanMqttError_e RyanMqttWildcardTest(void);
 extern RyanMqttError_e RyanMqttMultiThreadMultiClientTest(void);
 extern RyanMqttError_e RyanMqttMultiThreadMultiClientTest(void);
 extern RyanMqttError_e RyanMqttMultiThreadSafetyTest(void);
 extern RyanMqttError_e RyanMqttMultiThreadSafetyTest(void);
+extern RyanMqttError_e RyanMqttPublicApiParamCheckTest(void);
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }