Explorar o código

refactor: 人工审查优化

RyanCW hai 4 meses
pai
achega
020ba245ed

+ 5 - 5
common/RyanMqttLog.c

@@ -9,8 +9,8 @@ void RyanMqttLogOutPut(char *lvl, uint8_t color, char *fileStr, uint32_t lineNum
 	uint16_t len = 0;
 
 	// 打印颜色、提示符、打印文件路径、行号
-	len += snprintf(dbgBuffer + len, sizeof(dbgBuffer) - len, "\033[%dm[%s] %s:%" PRIu32 " ", color, lvl, fileStr,
-			lineNum);
+	len += RyanMqttSnprintf(dbgBuffer + len, sizeof(dbgBuffer) - len, "\033[%dm[%s] %s:%" PRIu32 " ", color, lvl,
+				fileStr, lineNum);
 
 	// platformPrint(dbgBuffer, len);
 	// len = 0;
@@ -18,11 +18,11 @@ void RyanMqttLogOutPut(char *lvl, uint8_t color, char *fileStr, uint32_t lineNum
 	// 打印用户输入
 	va_list args;
 	va_start(args, fmt);
-	len += vsnprintf(dbgBuffer + len, sizeof(dbgBuffer) - len, fmt, args);
+	len += RyanMqttVsnprintf(dbgBuffer + len, sizeof(dbgBuffer) - len, fmt, args);
 	va_end(args);
 
 	// 打印颜色
-	len += snprintf(dbgBuffer + len, sizeof(dbgBuffer) - len, "\033[0m\r\n");
+	len += RyanMqttSnprintf(dbgBuffer + len, sizeof(dbgBuffer) - len, "\033[0m\r\n");
 
 	platformPrint(dbgBuffer, len);
 }
@@ -33,7 +33,7 @@ void RyanMqttLogOutPutRaw(char *const fmt, ...)
 
 	va_list args;
 	va_start(args, fmt);
-	uint16_t len = vsnprintf(dbgBuffer, sizeof(dbgBuffer), fmt, args);
+	uint16_t len = RyanMqttVsnprintf(dbgBuffer, sizeof(dbgBuffer), fmt, args);
 	va_end(args);
 
 	platformPrint(dbgBuffer, len);

+ 7 - 0
example/RyanMqttExample.c

@@ -149,6 +149,13 @@ static void mqttEventHandle(void *pclient, RyanMqttEventId_e event, const void *
 
 	case RyanMqttEventDestroyBefore: RyanMqttLog_i("销毁mqtt客户端前回调"); break;
 
+	case RyanMqttEventUnsubscribedData: {
+		RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
+		RyanMqttLog_i("接收到未匹配任何订阅主题的报文事件 topic: %.*s, packetId: %d, payload len: %d",
+			      msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
+		break;
+	}
+
 	default: break;
 	}
 }

+ 162 - 151
mqttclient/RyanMqttClient.c

@@ -30,32 +30,6 @@ static uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
 	return packetId;
 }
 
-static RyanMqttError_e setConfigValue(char **dest, char const *const rest)
-{
-	RyanMqttAssert(NULL != dest && NULL != rest);
-	int32_t restStrLen = (int32_t)RyanMqttStrlen(rest);
-
-	if (NULL != *dest)
-	{
-		if ((int32_t)RyanMqttStrlen(*dest) == restStrLen && 0 == RyanMqttStrcmp(*dest, rest))
-		{
-			return RyanMqttSuccessError;
-		}
-	}
-
-	char *tmp;
-	RyanMqttError_e result = RyanMqttStringCopy(&tmp, (char *)rest, restStrLen);
-	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
-
-	if (NULL != *dest)
-	{
-		platformMemoryFree(*dest);
-	}
-
-	*dest = tmp;
-	return RyanMqttSuccessError;
-}
-
 /**
  * @brief mqtt初始化
  *
@@ -118,6 +92,7 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
 	return RyanMqttSuccessError;
 
 __exit:
+	// 不能按空来判断,不是指针类型
 	if (criticalLockIsOk)
 	{
 		platformCriticalDestroy(client->config.userData, &client->criticalLock);
@@ -175,7 +150,7 @@ RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
 
 /**
  * @brief 启动mqtt客户端
- * !不要重复调用,重复调用会返回错误
+ * !不要重复调用
  *
  * @param client
  * @return RyanMqttError_e
@@ -240,6 +215,10 @@ RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e send
 	return RyanMqttSuccessError;
 }
 
+// todo 这里考虑要不要使用信号量来实现吧,会增加platform厚度,目前不想加。最好用自动重连
+// ?现在取巧使用线程的暂停和恢复实现,如果mqtt线程还没有暂停,用户就调用这个函数就会没有效果。
+// ?用户不用自动重连的话,也可以通过一直判断 client 的状态是不是为 RyanMqttDisconnectState 是的话可以调用
+// ?RyanMqttReconnect。 最推荐的是用自动重连
 /**
  * @brief 手动重连mqtt客户端
  * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
@@ -261,21 +240,6 @@ RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
 	return RyanMqttSuccessError;
 }
 
-static void RyanMqttClearSubSession(RyanMqttClient_t *client, uint16_t packetId, int32_t count,
-				    MQTTSubscribeInfo_t *subscriptionList)
-{
-	RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
-	// 清除msg链表
-	RyanMqttMsgHandler_t tempMsgHandler;
-	for (int32_t i = 0; i < count; i++)
-	{
-		tempMsgHandler.topic = (char *)subscriptionList[i].pTopicFilter;
-		tempMsgHandler.topicLen = subscriptionList[i].topicFilterLength;
-		tempMsgHandler.packetId = packetId;
-
-		RyanMqttMsgHandlerFindAndDestroyByPackId(client, &tempMsgHandler, RyanMqttFalse);
-	}
-}
 /**
  * @brief 订阅主题
  *
@@ -293,7 +257,6 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	RyanMqttMsgHandler_t *msgHandler;
 	RyanMqttMsgHandler_t *msgToListHandler;
 	RyanMqttAckHandler_t *userAckHandler;
-
 	MQTTFixedBuffer_t fixedBuffer;
 
 	// 校验参数合法性
@@ -301,6 +264,8 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	RyanMqttCheck(NULL != subscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
+
+	// 检查每个主题消息是否合法
 	for (int32_t i = 0; i < count; i++)
 	{
 		RyanMqttCheck(NULL != subscribeManyData[i].topic, RyanMqttParamInvalidError, RyanMqttLog_d);
@@ -311,7 +276,6 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	// 申请 coreMqtt 支持的topic格式空间
 	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
 	RyanMqttCheck(NULL != subscriptionList, RyanMqttParamInvalidError, RyanMqttLog_d);
-	RyanMqttMemset(subscriptionList, 0, sizeof(MQTTSubscribeInfo_t) * count);
 	for (int32_t i = 0; i < count; i++)
 	{
 		subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
@@ -350,19 +314,21 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
 		result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
 						  subscriptionList[i].topicFilterLength, packetId,
 						  (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
-		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
+					  { goto __RyanMqttSubCreateAckErrorExit; });
 
 		result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_SUBACK, packetId, 0, NULL, msgHandler,
 						  &userAckHandler, RyanMqttFalse);
 		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
 			RyanMqttMsgHandlerDestroy(client, msgHandler);
-			goto __exit;
+			goto __RyanMqttSubCreateAckErrorExit;
 		});
 
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		continue;
 
-__exit:
+__RyanMqttSubCreateAckErrorExit:
+		// 创建 sub ack 数组时失败,查找所有同 packetId 的ack进行清除
 		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
 
 		platformMemoryFree(subscriptionList);
@@ -370,18 +336,13 @@ __exit:
 		return RyanMqttNotEnoughMemError;
 	}
 
+	// ?创建msg包,3.8.4响应,允许服务端在发送 SUBACK 报文之前就开始发送与订阅匹配的 PUBLISH 报文。
 	for (int32_t i = 0; i < count; i++)
 	{
-		// ?创建msg包,3.8.4响应,允许服务端在发送 SUBACK 报文之前就开始发送与订阅匹配的 PUBLISH 报文。
 		result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
 						  subscriptionList[i].topicFilterLength, packetId,
 						  (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgToListHandler);
-		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
-			RyanMqttClearSubSession(client, packetId, i, subscriptionList);
-
-			platformMemoryFree(subscriptionList);
-			platformMemoryFree(fixedBuffer.pBuffer);
-		});
+		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
 		// 将msg信息添加到订阅链表上
 		RyanMqttMsgHandlerAddToMsgList(client, msgToListHandler);
@@ -390,12 +351,26 @@ __exit:
 	// 发送订阅主题包
 	// 如果发送失败就清除ack链表,创建ack链表必须在发送前
 	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
-	RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
-		RyanMqttClearSubSession(client, packetId, count, subscriptionList);
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
-		platformMemoryFree(subscriptionList);
-		platformMemoryFree(fixedBuffer.pBuffer);
-	});
+__exit:
+	// 失败清除session
+	if (RyanMqttSuccessError != result)
+	{
+		// 清除ack链表
+		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
+
+		// 清除msg链表
+		RyanMqttMsgHandler_t msgMatchCriteria;
+		for (int32_t i = 0; i < count; i++)
+		{
+			msgMatchCriteria.topic = (char *)subscriptionList[i].pTopicFilter;
+			msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
+			msgMatchCriteria.packetId = packetId;
+
+			RyanMqttMsgHandlerFindAndDestroyByPackId(client, &msgMatchCriteria, RyanMqttFalse);
+		}
+	}
 
 	platformMemoryFree(subscriptionList);
 	platformMemoryFree(fixedBuffer.pBuffer);
@@ -439,6 +414,8 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	RyanMqttCheck(NULL != unSubscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
+
+	// 检查有效性
 	for (int32_t i = 0; i < count; i++)
 	{
 		RyanMqttCheck(NULL != unSubscribeManyData[i].topic, RyanMqttParamInvalidError, RyanMqttLog_d);
@@ -447,10 +424,9 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	// 申请 coreMqtt 支持的topic格式空间
 	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
 	RyanMqttCheck(NULL != subscriptionList, RyanMqttParamInvalidError, RyanMqttLog_d);
-	RyanMqttMemset(subscriptionList, 0, sizeof(MQTTSubscribeInfo_t) * count);
 	for (int32_t i = 0; i < count; i++)
 	{
-		subscriptionList[i].qos = (MQTTQoS_t)RyanMqttQos0; // 无效数据
+		subscriptionList[i].qos = (MQTTQoS_t)RyanMqttQos0; // 无效数据,仅当占位符
 		subscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
 		subscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
 	}
@@ -478,34 +454,38 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 		});
 	}
 
-	// 查找当前主题是否已经订阅,没有订阅就取消发送
+	// 创建ack
 	for (int32_t i = 0; i < count; i++)
 	{
 		// ?不判断是否订阅,统一都发送取消
-		RyanMqttMsgHandler_t tempMsgHandler = {.topic = (char *)subscriptionList[i].pTopicFilter,
-						       .topicLen = subscriptionList[i].topicFilterLength};
-		result = RyanMqttMsgHandlerFind(client, &tempMsgHandler, RyanMqttFalse, &subMsgHandler);
+		RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)subscriptionList[i].pTopicFilter,
+							 .topicLen = subscriptionList[i].topicFilterLength};
+		result = RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler);
 		if (RyanMqttSuccessError == result)
 		{
+			// todo
+			// !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
+			// 同步msg qos等级,之后unsub回调使用
 			subscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
 		}
 
 		result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
 						  subscriptionList[i].topicFilterLength, RyanMqttMsgInvalidPacketId,
 						  (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
-		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
+					  { goto __RyanMqttUnSubCreateAckErrorExit; });
 
 		result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, 0, NULL, msgHandler,
 						  &userAckHandler, RyanMqttFalse);
 		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
 			RyanMqttMsgHandlerDestroy(client, msgHandler);
-			goto __exit;
+			goto __RyanMqttUnSubCreateAckErrorExit;
 		});
 
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		continue;
 
-__exit:
+__RyanMqttUnSubCreateAckErrorExit:
 		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
 		platformMemoryFree(subscriptionList);
 		platformMemoryFree(fixedBuffer.pBuffer);
@@ -582,7 +562,7 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 	fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 	RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, RyanMqttLog_d);
 
-	// qos0不需要packetId
+	// qos0不需要 packetId
 	if (RyanMqttQos0 == qos)
 	{
 		packetId = 0;
@@ -623,7 +603,7 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 			RyanMqttMsgHandlerDestroy(client, msgHandler);
 		});
 
-		// 一定要先加再send,要不可能返回消息会比这个更快执行呢
+		// 一定要先加再send,要不可能线程调度mqtt返回消息会比添加ack更快执行
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 
 		result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
@@ -769,12 +749,13 @@ RyanMqttError_e RyanMqttGetSubscribeSafe(RyanMqttClient_t *client, RyanMqttMsgHa
 			goto __exit;
 		}
 
-		RyanMqttMemcpy(msgHandlerArr[subscribeCount].topic, msgHandler->topic, msgHandler->topicLen);
-		msgHandlerArr[subscribeCount].topic[msgHandler->topicLen] = '\0';
 		msgHandlerArr[subscribeCount].qos = msgHandler->qos;
+		RyanMqttMemcpy(msgHandlerArr[subscribeCount].topic, msgHandler->topic, msgHandler->topicLen);
+		msgHandlerArr[subscribeCount].topic[msgHandler->topicLen] = '\0'; // 添加字符串结束符
 		subscribeCount++;
 	}
 	platformMutexUnLock(client->config.userData, &client->msgHandleLock);
+
 	*msgHandles = msgHandlerArr;
 	*subscribeNum = subscribeCount;
 
@@ -797,6 +778,7 @@ RyanMqttError_e RyanMqttSafeFreeSubscribeResources(RyanMqttMsgHandler_t *msgHand
 
 	for (int32_t i = 0; i < subscribeNum; i++)
 	{
+		// 不加null判断,因为如果是空,一定是用户程序内存访问越界了
 		platformMemoryFree(msgHandles[i].topic);
 	}
 
@@ -829,54 +811,119 @@ RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t
 	return RyanMqttSuccessError;
 }
 
+static RyanMqttError_e RyanMqttConfigCopy(RyanMqttClientConfig_t *destConfig, RyanMqttClientConfig_t *srcConfig)
+{
+	RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
+
+	RyanMqttError_e result = RyanMqttSuccessError;
+
+	RyanMqttMemset(destConfig, 0, sizeof(RyanMqttClientConfig_t));
+
+	result = RyanMqttStringCopy(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+
+	if (NULL != srcConfig->userName)
+	{
+		result = RyanMqttStringCopy(&destConfig->userName, srcConfig->userName,
+					    RyanMqttStrlen(srcConfig->userName));
+		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+	}
+
+	if (NULL != srcConfig->password)
+	{
+		result = RyanMqttStringCopy(&destConfig->password, srcConfig->password,
+					    RyanMqttStrlen(srcConfig->password));
+		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+	}
+
+	result = RyanMqttStringCopy(&destConfig->host, srcConfig->host, RyanMqttStrlen(srcConfig->host));
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+
+	result = RyanMqttStringCopy(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
+	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+
+	// ?不使用memset是因为下面 RyanMqttPurgeConfig 可能会误判导致错误释放空间
+	destConfig->port = srcConfig->port;
+	destConfig->mqttVersion = srcConfig->mqttVersion;
+
+	destConfig->taskPrio = srcConfig->taskPrio;
+	destConfig->taskStack = srcConfig->taskStack;
+
+	destConfig->autoReconnectFlag = srcConfig->autoReconnectFlag;
+	destConfig->cleanSessionFlag = srcConfig->cleanSessionFlag;
+
+	destConfig->ackHandlerRepeatCountWarning = srcConfig->ackHandlerRepeatCountWarning;
+	destConfig->ackHandlerCountWarning = srcConfig->ackHandlerCountWarning;
+
+	destConfig->recvTimeout = srcConfig->recvTimeout;
+	destConfig->sendTimeout = srcConfig->sendTimeout;
+	destConfig->ackTimeout = srcConfig->ackTimeout;
+	destConfig->keepaliveTimeoutS = srcConfig->keepaliveTimeoutS;
+	destConfig->reconnectTimeout = srcConfig->reconnectTimeout;
+	destConfig->mqttEventHandle = srcConfig->mqttEventHandle;
+	destConfig->userData = srcConfig->userData;
+
+__exit:
+	if (RyanMqttSuccessError != result)
+	{
+		RyanMqttPurgeConfig(destConfig);
+	}
+
+	return result;
+}
+
 /**
  * @brief 获取mqtt config
- * 使用完毕后,需要用户释放pclientConfig指针内容
+ * !非线程安全,多线程通过set和get数据可能会错乱甚至崩溃。
+ * !使用完毕后,需要用户手动调用 RyanMqttFreeConfigFromGet 释放指针空间
  *
  * @param client
  * @param pclientConfig
  * @return RyanMqttError_e
  */
-/* RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client,
-RyanMqttClientConfig_t **pclientConfig)
+RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
 {
-    RyanMqttError_e result = RyanMqttSuccessError;
-    RyanMqttClientConfig_t *clientConfig = NULL;
-
-    RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
-    RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
-
-    RyanMqttCheck(NULL != client->config, RyanMqttNoRescourceError);
-
-    clientConfig = (RyanMqttClientConfig_t
-*)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t)); RyanMqttCheck(NULL !=
-clientConfig, RyanMqttNotEnoughMemError);
-
-    RyanMqttMemcpy(clientConfig, client->config, sizeof(RyanMqttClientConfig_t));
-
-    result = setConfigValue(&clientConfig->clientId, client->config->clientId);
-    RyanMqttCheck(RyanMqttSuccessError == result, result);
+	RyanMqttError_e result = RyanMqttSuccessError;
+	RyanMqttClientConfig_t *clientConfig;
 
-    result = setConfigValue(&clientConfig->userName, client->config->userName);
-    RyanMqttCheck(RyanMqttSuccessError == result, result);
+	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
+	RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
 
-    result = setConfigValue(&clientConfig->password, client->config->password);
-    RyanMqttCheck(RyanMqttSuccessError == result, result);
+	clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
+	RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 
-    result = setConfigValue(&clientConfig->host, client->config->host);
-    RyanMqttCheck(RyanMqttSuccessError == result, result);
+	result = RyanMqttConfigCopy(clientConfig, &client->config);
+	RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
-    result = setConfigValue(&clientConfig->port, client->config->port);
-    RyanMqttCheck(RyanMqttSuccessError == result, result);
+	if (RyanMqttSuccessError == result)
+	{
+		*pclientConfig = clientConfig;
+	}
+	else
+	{
+		*pclientConfig = NULL;
+		platformMemoryFree(clientConfig);
+	}
 
-    result = setConfigValue(&clientConfig->taskName, client->config->taskName);
-    RyanMqttCheck(RyanMqttSuccessError == result, result);
+	return result;
+}
 
-    *pclientConfig = clientConfig;
+/**
+ * @brief 释放通过 RyanMqttGetConfig 获取的配置信息
+ *
+ * @param clientConfig
+ * @return RyanMqttError_e
+ */
+RyanMqttError_e RyanMqttFreeConfigFromGet(RyanMqttClientConfig_t *clientConfig)
+{
+	RyanMqttError_e result = RyanMqttSuccessError;
+	RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
 
-    return RyanMqttSuccessError;
+	RyanMqttPurgeConfig(clientConfig);
+	platformMemoryFree(clientConfig);
+	return result;
 }
- */
 
 //  todo 增加更多校验,比如判断心跳包和recv的关系
 /**
@@ -889,7 +936,7 @@ clientConfig, RyanMqttNotEnoughMemError);
  * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
  *
  * !项目中用户不应频繁调用此函数
- * !
+ *
  * 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
  *
  * @param client
@@ -901,6 +948,7 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
 	RyanMqttError_e result = RyanMqttSuccessError;
 
 	RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
+	RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, RyanMqttLog_d);
@@ -908,55 +956,17 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
 		      RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
 
-	result = setConfigValue(&client->config.clientId, clientConfig->clientId);
-	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-
-	if (NULL == clientConfig->userName)
-	{
-		client->config.userName = NULL;
-	}
-	else
-	{
-		result = setConfigValue(&client->config.userName, clientConfig->userName);
-		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-	}
+	RyanMqttClientConfig_t tempConfig;
+	result = RyanMqttConfigCopy(&tempConfig, clientConfig);
+	RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
-	if (NULL == clientConfig->password)
-	{
-		client->config.password = NULL;
-	}
-	else
+	if (RyanMqttSuccessError == result)
 	{
-		result = setConfigValue(&client->config.password, clientConfig->password);
-		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
+		// todo !因为这里是非线程安全的
+		RyanMqttPurgeConfig(&client->config);
+		client->config = tempConfig;
 	}
 
-	result = setConfigValue(&client->config.host, clientConfig->host);
-	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-
-	result = setConfigValue(&client->config.taskName, clientConfig->taskName);
-	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-
-	client->config.port = clientConfig->port;
-	client->config.taskPrio = clientConfig->taskPrio;
-	client->config.taskStack = clientConfig->taskStack;
-	client->config.mqttVersion = clientConfig->mqttVersion;
-	client->config.ackHandlerRepeatCountWarning = clientConfig->ackHandlerRepeatCountWarning;
-	client->config.ackHandlerCountWarning = clientConfig->ackHandlerCountWarning;
-	client->config.autoReconnectFlag = clientConfig->autoReconnectFlag;
-	client->config.cleanSessionFlag = clientConfig->cleanSessionFlag;
-	client->config.reconnectTimeout = clientConfig->reconnectTimeout;
-	client->config.recvTimeout = clientConfig->recvTimeout;
-	client->config.sendTimeout = clientConfig->sendTimeout;
-	client->config.ackTimeout = clientConfig->ackTimeout;
-	client->config.keepaliveTimeoutS = clientConfig->keepaliveTimeoutS;
-	client->config.mqttEventHandle = clientConfig->mqttEventHandle;
-	client->config.userData = clientConfig->userData;
-
-	return RyanMqttSuccessError;
-
-__exit:
-	RyanMqttDestroy(client);
 	return result;
 }
 
@@ -990,6 +1000,8 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
 	}
 
 	platformMutexLock(client->config.userData, &client->userSessionLock);
+
+	// 之前如果设置过遗嘱就进行资源释放,否则申请空间
 	if (NULL == client->lwtOptions)
 	{
 		client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t));
@@ -1010,7 +1022,6 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
 			platformMemoryFree(client->lwtOptions->payload);
 		}
 	}
-
 	RyanMqttMemset(client->lwtOptions, 0, sizeof(lwtOptions_t));
 
 	if (payloadLen > 0)

+ 21 - 35
mqttclient/RyanMqttThread.c

@@ -44,8 +44,8 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 		return RyanMqttFailedError;
 	}
 
-	// 当剩余时间小于 recvtimeout 时强制发送心跳包
-	if (timeRemain > client->config.recvTimeout)
+	// 当剩余时间大于 recvtimeout 并且小于 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
+	if (timeRemain - client->config.recvTimeout > 100)
 	{
 		// 当没有到达 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
 		if (timeRemain > client->config.keepaliveTimeoutS * 1000 * (RyanMqttKeepAliveMultiplier - 0.9))
@@ -81,7 +81,7 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 }
 
 // todo 也可以考虑有ack链表的时候recvTime可以短一些,有坑点
-// todo 也可以考虑将发送操作独立出去,异步发送,目前没有遇到性能瓶颈,需要超高性能的时候再考虑吧
+// todo 也可以考虑将发送操作独立出去,异步发送,目前没有遇到性能瓶颈,需要超高性能的时候再考虑吧
 /**
  * @brief 遍历ack链表,进行相应的处理
  *
@@ -131,7 +131,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 			continue;
 		}
 
-		// 超过最大处理时间,直接跳出处理函数
+		// 超过最大处理时间,直接跳出处理函数,等待下次再处理
 		if (0 == RyanMqttTimerRemain(&ackScanRemainTimer))
 		{
 			break;
@@ -144,6 +144,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 		uint32_t ackRemainTime = RyanMqttTimerRemain(&ackHandler->timer);
 		if (0 != ackRemainTime)
 		{
+			// 如果ack剩余时间小于节流时间,就把ack剩余时间更新到节流上
 			if (ackRemainTime < ackScanThrottleTime)
 			{
 				ackScanThrottleTime = ackRemainTime;
@@ -222,7 +223,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
  * @param client
  * @return RyanMqttError_e
  */
-static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client, RyanMqttConnectStatus_e *connectState)
+static RyanMqttError_e RyanMqttConnectBroker(RyanMqttClient_t *client, RyanMqttConnectStatus_e *connectState)
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	MQTTStatus_t status;
@@ -236,7 +237,7 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client, RyanMqttConnect
 
 	RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectError, RyanMqttLog_d);
 
-	// connect 信息
+	// 填充 connect 信息
 	{
 		connectInfo.pClientIdentifier = client->config.clientId;
 		connectInfo.clientIdentifierLength = RyanMqttStrlen(client->config.clientId);
@@ -329,7 +330,8 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client, RyanMqttConnect
 		uint16_t packetId;
 		bool sessionPresent; // 会话位
 
-		// 反序列化ack包,MQTTSuccess 和 MQTTServerRefused都会返回正确的connectState
+		// 反序列化ack包,MQTTSuccess 和 MQTTServerRefused 都会返回正确的connectState
+		// MQTTServerRefused 表示连接被拒绝,但是可以获取原因思什么
 		status = MQTT_DeserializeAck(&pIncomingPacket, &packetId, &sessionPresent);
 		if (MQTTSuccess != status && MQTTServerRefused != status)
 		{
@@ -337,6 +339,7 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client, RyanMqttConnect
 		}
 		else
 		{
+			// 获取连接返回值
 			*connectState = pIncomingPacket.pRemainingData[1];
 			if (RyanMqttConnectAccepted != *connectState)
 			{
@@ -379,8 +382,7 @@ void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, v
 	{
 	case RyanMqttEventConnected: // 第一次连接成功
 		RyanMqttRefreshKeepaliveTime(client);
-		RyanMqttAckListScan(client,
-				    RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
+		RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
 		RyanMqttSetClientState(client, RyanMqttConnectState);
 		break;
 
@@ -440,26 +442,7 @@ void RyanMqttThread(void *argument)
 			platformNetworkDestroy(client->config.userData, &client->network);
 
 			// 清除config信息
-			if (NULL != client->config.clientId)
-			{
-				platformMemoryFree(client->config.clientId);
-			}
-			if (NULL != client->config.userName)
-			{
-				platformMemoryFree(client->config.userName);
-			}
-			if (NULL != client->config.password)
-			{
-				platformMemoryFree(client->config.password);
-			}
-			if (NULL != client->config.host)
-			{
-				platformMemoryFree(client->config.host);
-			}
-			if (NULL != client->config.taskName)
-			{
-				platformMemoryFree(client->config.taskName);
-			}
+			RyanMqttPurgeConfig(&client->config);
 
 			// 清除遗嘱相关配置
 			if (NULL != client->lwtOptions)
@@ -505,16 +488,19 @@ void RyanMqttThread(void *argument)
 		// 客户端状态变更状态机
 		switch (RyanMqttGetClientState(client))
 		{
-
 		case RyanMqttStartState: // 开始状态状态
 		case RyanMqttReconnectState: {
 			RyanMqttLog_d("开始连接");
 			RyanMqttConnectStatus_e connectState;
-			RyanMqttError_e result = RyanMqttConnect(client, &connectState);
-			RyanMqttEventMachine(client,
-					     RyanMqttSuccessError == result ? RyanMqttEventConnected
-									    : RyanMqttEventDisconnected,
-					     (void *)&connectState);
+			RyanMqttError_e result = RyanMqttConnectBroker(client, &connectState);
+			if (RyanMqttSuccessError == result)
+			{
+				RyanMqttEventMachine(client, RyanMqttEventConnected, (void *)&connectState);
+			}
+			else
+			{
+				RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
+			}
 		}
 		break;
 

+ 83 - 56
mqttclient/RyanMqttThreadProcessPacket.c

@@ -69,6 +69,7 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client, MQT
 	status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBCOMP, packetId);
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
+	// ?这里没法判断packetid是否非法,只能每次都回复咯
 	// 每次收到PUBREL都返回消息
 	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
@@ -94,7 +95,7 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 	MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
-	// 每次收到PUBREC都返回ack,确保服务器可以认为数据包被发送了
+	// 每次收到 PUBREC 都返回ack,确保服务器可以认为数据包被发送了
 	uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
 	MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
 
@@ -109,33 +110,29 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 		RyanMqttMsgHandler_t *msgHandler;
 		RyanMqttAckHandler_t *ackHandler;
 
-		// 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
-		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandler);
-		if (RyanMqttSuccessError != result)
-		{
-			// 首次收到消息
-			result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
-							  ackHandlerPubrec->msgHandler->topicLen,
-							  RyanMqttMsgInvalidPacketId, ackHandlerPubrec->msgHandler->qos,
-							  ackHandlerPubrec->msgHandler->userData, &msgHandler);
-			RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
+		// 首次收到消息,创建 pubcomp ack
+		result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
+						  ackHandlerPubrec->msgHandler->topicLen, RyanMqttMsgInvalidPacketId,
+						  ackHandlerPubrec->msgHandler->qos,
+						  ackHandlerPubrec->msgHandler->userData, &msgHandler);
+		RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
-			result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId,
-							  MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
-							  &ackHandler, RyanMqttFalse);
-			RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
-					  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
-			RyanMqttAckListAddToAckList(client, ackHandler);
+		result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId,
+						  MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
+						  &ackHandler, RyanMqttFalse);
+		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
+				  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
+		RyanMqttAckListAddToAckList(client, ackHandler);
 
-			RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
-			RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
-		}
-		// 出现pubrec和pubcomp同时存在的情况,清除pubrec。理论上不会出现(冗余措施)
-		else
-		{
-			RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
-			RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
-		}
+		// 清除pubrec记录
+		RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
+		RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
+	}
+	else
+	{
+		// 没有pubrec ,并且没有pubcomp,说明这个报文是非法报文,不进行mqtt服务器回复
+		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubrec);
+		RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttInvalidPacketError, RyanMqttLog_d);
 	}
 
 	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
@@ -174,11 +171,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 		msgData.retained = publishInfo.retain;
 		msgData.dup = publishInfo.dup;
 
-		// 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
-		RyanMqttMsgHandler_t tempMsgHandler = {.topic = msgData.topic, .topicLen = msgData.topicLen};
-		result = RyanMqttMsgHandlerFind(client, &tempMsgHandler, RyanMqttTrue, &msgHandler);
-		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
-				  { RyanMqttLog_w("主题不匹配: %.*s", msgData.topicLen, msgData.topic); });
+		// 查看订阅列表是否包含此消息主题,进行通配符匹配
+		RyanMqttMsgHandler_t msgMatchCriteria = {.topic = msgData.topic, .topicLen = msgData.topicLen};
+		result = RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttTrue, &msgHandler);
+		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
+			RyanMqttLog_w("主题不匹配: %.*s", msgData.topicLen, msgData.topic);
+			RyanMqttEventMachine(client, RyanMqttEventUnsubscribedData, (void *)&msgData);
+		});
 	}
 
 	switch (msgData.qos)
@@ -210,14 +209,18 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 
 		// !序列化ack数据包,必须先执行,因为创建ack需要用到这个报文
 		MQTTStatus_t status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREC, packetId);
+		// 上面代码不太可能出错,如果出错了就让服务器重新发送吧
 		RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
-		// 上面代码不太可能出错,出错后就让服务器重新发送吧
-		// 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish,
-		// 不进行qos2 PUBREC消息处理
+		// 收到 publish 就期望收到 PUBREL ,如果 PUBREL 报文已经存在说明不是首次收到 publish,不进行qos2 PUBREC
+		// 消息处理
 		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler);
 		if (RyanMqttSuccessError != result)
 		{
+			// 第一次收到 PUBREL 报文
+			RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
+
+			// 期望下一次收到 PUBREL 报文
 			result = RyanMqttMsgHandlerCreate(client, msgData.topic, msgData.topicLen,
 							  RyanMqttMsgInvalidPacketId, msgData.qos, NULL, &msgHandler);
 			RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
@@ -228,10 +231,9 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 			RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
 					  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
 			RyanMqttAckListAddToAckList(client, ackHandler);
-
-			RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
 		}
 
+		// 无论是不是第一次收到,都回复 pub ack报文
 		result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 		RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 	}
@@ -258,14 +260,15 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacke
 	RyanMqttList_t *curr, *next;
 	RyanMqttAssert(NULL != client);
 
-	// 反序列化ack包,MQTTSuccess和MQTTServerRefused都是成功的
+	// 反序列化ack包,MQTTSuccess 和 MQTTServerRefused 都是成功的
+	// coreMqtt 检测到qos等级为 0x80 就会返回 MQTTServerRefused
 	MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
 	RyanMqttCheck(MQTTSuccess == status || MQTTServerRefused == status, RyanMqttDeserializePacketError,
 		      RyanMqttLog_d);
 
 	// 检查ack的msgCount和返回消息的msgCount是否一致
 	{
-		// MQTT_DeserializeAck会保证 pIncomingPacket->remainingLength >= 3
+		// MQTT_DeserializeAck 会保证 pIncomingPacket->remainingLength >= 3
 		uint32_t statusCount = pIncomingPacket->remainingLength - sizeof(uint16_t);
 		uint32_t ackMsgCount = 0;
 
@@ -274,7 +277,6 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacke
 		RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
 		{
 			msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
-
 			if (packetId == msgHandler->packetId)
 			{
 				ackMsgCount++;
@@ -284,12 +286,14 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacke
 
 		// 服务器回复的ack数和记录的ack数不一致就清除所有ack
 		RyanMqttCheckCode(ackMsgCount == statusCount, RyanMqttNoRescourceError, RyanMqttLog_d, {
+			// 清除所有ack
 			RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
+
+			// 清除所有msg
 			platformMutexLock(client->config.userData, &client->msgHandleLock);
 			RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
 			{
 				msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
-
 				if (packetId == msgHandler->packetId)
 				{
 					RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
@@ -312,22 +316,26 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacke
 	{
 		ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
 
-		if (packetId != ackHandler->packetId || MQTT_PACKET_TYPE_SUBACK != ackHandler->packetType)
+		if (packetId != ackHandler->packetId)
 		{
 			continue;
 		}
 
-		// 查找同名订阅并删除,保证订阅主题列表只有一个最新的
+		// 处理非订阅ack
+		if (MQTT_PACKET_TYPE_SUBACK != ackHandler->packetType)
+		{
+			goto __next;
+		}
+
+		// 查找同名订阅但是packetid不一样的进行删除,保证订阅主题列表只有一个最新的
 		RyanMqttMsgHandlerFindAndDestroyByPackId(client, ackHandler->msgHandler, RyanMqttTrue);
 
 		// 到这里就可以保证没有同名订阅了
 		// 查找之前记录的topic句柄,根据服务器授权Qos进行更新
+		// 几乎不可能查找不到,可以查找到 ackHandler 就一定有 msgHandler
 		RyanMqttError_e result =
 			RyanMqttMsgHandlerFind(client, ackHandler->msgHandler, RyanMqttFalse, &msgHandler);
-
-		// 几乎不可能,可以查找到 ackHandler 就一定有 msgHandler
-		// 没有的话可以打印一条信息吧
-		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { continue; });
+		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __next; });
 
 		// 解析服务端授权 QoS(0,1,2)或失败(0x80)
 		subscriptionQos = pStatusStart[ackMsgIndex];
@@ -358,6 +366,7 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacke
 			break;
 		}
 
+__next:
 		RyanMqttAckListRemoveToAckList(client, ackHandler);
 		RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
 	}
@@ -386,20 +395,24 @@ static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client, MQTTPac
 	MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
-	// todo 这里效率非常低,订阅属于用的少的功能,暂时可以接受
+	// todo 这里效率低,取消订阅属于用的少的功能,暂时可以接受
 	platformMutexLock(client->config.userData, &client->ackHandleLock);
 	RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
 	{
 		ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
 
-		if ((packetId != ackHandler->packetId) || (MQTT_PACKET_TYPE_UNSUBACK != ackHandler->packetType))
+		if (packetId != ackHandler->packetId)
 		{
 			continue;
 		}
 
+		if (MQTT_PACKET_TYPE_UNSUBACK != ackHandler->packetType)
+		{
+			goto __next;
+		}
+
 		// 查找当前主题是否已经订阅,进行取消订阅
 		result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler, RyanMqttFalse, &subMsgHandler);
-
 		if (RyanMqttSuccessError == result)
 		{
 			ackHandler->msgHandler->qos = subMsgHandler->qos;
@@ -410,6 +423,7 @@ static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client, MQTTPac
 		// mqtt事件回调
 		RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
 
+__next:
 		RyanMqttAckListRemoveToAckList(client, ackHandler);
 		RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
 	}
@@ -515,6 +529,7 @@ RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
 	RyanMqttLog_d("pIncomingPacket.type: %x ", pIncomingPacket.type & 0xF0U);
 
 	// 控制报文类型
+	// QoS2 使用官方推荐的方法B
 	// 发送者QoS2动作 发布PUBLISH报文 -> 等待PUBREC报文 -> 发送PUBREL报文 -> 等待PUBCOMP报文
 	// 接收者QoS2动作 等待PUBLISH报文 -> 发送PUBREC报文 -> 等待PUBREL报文 -> 发送PUBCOMP报文
 	switch (pIncomingPacket.type & 0xF0U)
@@ -525,7 +540,7 @@ RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
 
 	case MQTT_PACKET_TYPE_CONNACK: // 连接报文确认
 	{
-		// 客户端已处于连接状态时又收到CONNACK报文,应该视为严重错误,断开连接
+		// ?客户端已处于连接状态时又收到CONNACK报文,应该视为严重错误,断开连接
 		RyanMqttLog_e("收到 CONNACK 时已连接,正在断开连接");
 		RyanMqttConnectStatus_e connectState = RyanMqttConnectProtocolError;
 		RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
@@ -542,12 +557,24 @@ RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
 		result = RyanMqttPubrecPacketHandler(client, &pIncomingPacket);
 		break;
 
-	case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放
-		if (pIncomingPacket.type & 0x02U)
-		{ // PUBREL 控制报文固定报头的第 3,2,1,0 位必须被设置为
-		  // 0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
-			result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
-		}
+	case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放 pubrel
+		result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
+
+		// !RyanMqttGetPacketInfo 检查报文type错误时不会惊醒返回,所以下面逻辑暂时没用
+		// // PUBREL 控制报文固定报头的第 3,2,1,0
+		// // 位必须被设置为0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
+		// if (pIncomingPacket.type & 0x02U)
+		// {
+		// 	result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
+		// }
+		// else
+		// {
+		// 	RyanMqttLog_e("PUBREL 控制报文固定报头的第 3,2,1,0 "
+		// 		      "位必须被设置为0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接");
+		// 	RyanMqttConnectStatus_e connectState = RyanMqttConnectInvalidPacketError;
+		// 	RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
+		// 	result = RyanMqttInvalidPacketError;
+		// }
 		break;
 
 	case MQTT_PACKET_TYPE_SUBACK: // 订阅确认

+ 30 - 0
mqttclient/RyanMqttUtil.c

@@ -243,6 +243,36 @@ void RyanMqttPurgeSession(RyanMqttClient_t *client)
 	platformMutexUnLock(client->config.userData, &client->userSessionLock);
 }
 
+void RyanMqttPurgeConfig(RyanMqttClientConfig_t *clientConfig)
+{
+	RyanMqttAssert(NULL != clientConfig);
+
+	if (clientConfig->clientId)
+	{
+		platformMemoryFree(clientConfig->clientId);
+	}
+
+	if (clientConfig->userName)
+	{
+		platformMemoryFree(clientConfig->userName);
+	}
+
+	if (clientConfig->password)
+	{
+		platformMemoryFree(clientConfig->password);
+	}
+
+	if (clientConfig->host)
+	{
+		platformMemoryFree(clientConfig->host);
+	}
+
+	if (clientConfig->taskName)
+	{
+		platformMemoryFree(clientConfig->taskName);
+	}
+}
+
 /**
  * @brief 初始化计时器
  *

+ 13 - 30
mqttclient/RyanMqttUtileAck.c

@@ -15,7 +15,7 @@
  * @param packet
  * @param msgHandler
  * @param pAckHandler
- * @param isPreallocatedPacket
+ * @param isPreallocatedPacket packet是否预分配
  * @return RyanMqttError_e
  */
 RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
@@ -36,16 +36,15 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packe
 	// 为非预分配包申请额外空间
 	RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)platformMemoryMalloc(mallocSize);
 	RyanMqttCheck(NULL != ackHandler, RyanMqttNotEnoughMemError, RyanMqttLog_d);
-	RyanMqttMemset(ackHandler, 0, mallocSize);
 
+	ackHandler->packetType = packetType;
+	ackHandler->repeatCount = 0;
+	ackHandler->packetId = packetId;
+	ackHandler->isPreallocatedPacket = isPreallocatedPacket;
+	ackHandler->packetLen = packetLen;
 	RyanMqttListInit(&ackHandler->list);
 	// 超时内没有响应将被销毁或重新发送
 	RyanMqttTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
-
-	ackHandler->isPreallocatedPacket = isPreallocatedPacket;
-	ackHandler->packetId = packetId;
-	ackHandler->packetLen = packetLen;
-	ackHandler->packetType = packetType;
 	ackHandler->msgHandler = msgHandler;
 
 	if (RyanMqttTrue != isPreallocatedPacket)
@@ -81,8 +80,9 @@ void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *a
 	RyanMqttMsgHandlerDestroy(client, ackHandler->msgHandler); // 释放msgHandler
 
 	// 释放用户预提供的缓冲区
-	if (RyanMqttTrue == ackHandler->isPreallocatedPacket && NULL != ackHandler->packet)
+	if (RyanMqttTrue == ackHandler->isPreallocatedPacket)
 	{
+		// 不加null判断,因为如果是空,一定是用户程序内存访问越界了
 		platformMemoryFree(ackHandler->packet);
 	}
 
@@ -139,21 +139,18 @@ RyanMqttError_e RyanMqttAckListAddToAckList(RyanMqttClient_t *client, RyanMqttAc
 {
 	RyanMqttAssert(NULL != client);
 	RyanMqttAssert(NULL != ackHandler);
-	RyanMqttBool_e isAckCountWarning = RyanMqttFalse;
+	uint16_t tmpAckHandlerCount;
 
 	platformMutexLock(client->config.userData, &client->ackHandleLock);
 	// 将ack节点添加到链表尾部
 	RyanMqttListAddTail(&ackHandler->list, &client->ackHandlerList);
 	client->ackHandlerCount++;
-	if (client->ackHandlerCount >= client->config.ackHandlerCountWarning)
-	{
-		isAckCountWarning = RyanMqttTrue;
-	}
+	tmpAckHandlerCount = client->ackHandlerCount;
 	platformMutexUnLock(client->config.userData, &client->ackHandleLock);
 
-	if (RyanMqttTrue == isAckCountWarning)
+	if (tmpAckHandlerCount >= client->config.ackHandlerCountWarning)
 	{
-		RyanMqttEventMachine(client, RyanMqttEventAckCountWarning, (void *)&client->ackHandlerCount);
+		RyanMqttEventMachine(client, RyanMqttEventAckCountWarning, (void *)&tmpAckHandlerCount);
 	}
 
 	return RyanMqttSuccessError;
@@ -183,7 +180,7 @@ RyanMqttError_e RyanMqttAckListRemoveToAckList(RyanMqttClient_t *client, RyanMqt
 }
 
 /**
- * @brief 检查链表中是否存在ack句柄
+ * @brief 检查用户层链表中是否存在ack句柄
  *
  * @param client
  * @param packetType
@@ -221,13 +218,6 @@ __exit:
 	return result;
 }
 
-/**
- * @brief 添加等待ack到链表
- *
- * @param client
- * @param ackHandler
- * @return RyanMqttError_e
- */
 RyanMqttError_e RyanMqttAckListAddToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
 {
 	RyanMqttAssert(NULL != client);
@@ -240,13 +230,6 @@ RyanMqttError_e RyanMqttAckListAddToUserAckList(RyanMqttClient_t *client, RyanMq
 	return RyanMqttSuccessError;
 }
 
-/**
- * @brief 从链表移除ack
- *
- * @param client
- * @param ackHandler
- * @return RyanMqttError_e
- */
 RyanMqttError_e RyanMqttAckListRemoveToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
 {
 	RyanMqttAssert(NULL != client);

+ 71 - 56
mqttclient/RyanMqttUtileMsg.c

@@ -165,6 +165,45 @@ static RyanMqttBool_e RyanMqttMatchTopic(const char *topic, const uint16_t topic
 	return matchFound;
 }
 
+/**
+ * @brief 检查消息处理程序的主题是否与给定主题匹配
+ *
+ * @param msgHandler
+ * @param topic
+ * @param topicLen
+ * @param isTopicMatchedFlag
+ * @return RyanMqttBool_e
+ */
+static RyanMqttBool_e RyanMqttMsgTopicIsMatch(RyanMqttMsgHandler_t *msgHandler, const char *topic, uint16_t topicLen,
+					      RyanMqttBool_e isTopicMatchedFlag)
+{
+	// 不进行通配符匹配
+	if (RyanMqttTrue != isTopicMatchedFlag)
+	{
+		// 不相等跳过
+		if (topicLen != msgHandler->topicLen)
+		{
+			return RyanMqttFalse;
+		}
+
+		// 主题名称不相等且没有使能通配符匹配
+		if (0 != RyanMqttStrncmp(topic, msgHandler->topic, topicLen))
+		{
+			return RyanMqttFalse;
+		}
+	}
+	else
+	{
+		// 进行通配符匹配
+		if (RyanMqttTrue != RyanMqttMatchTopic(topic, topicLen, msgHandler->topic, msgHandler->topicLen))
+		{
+			return RyanMqttFalse;
+		}
+	}
+
+	return RyanMqttTrue;
+}
+
 /**
  * @brief 创建msg句柄
  *
@@ -186,23 +225,22 @@ RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, const char *t
 	uint32_t mallocSize = sizeof(RyanMqttMsgHandler_t) + topicLen + 1;
 	RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)platformMemoryMalloc(mallocSize);
 	RyanMqttCheck(NULL != msgHandler, RyanMqttNotEnoughMemError, RyanMqttLog_d);
-	RyanMqttMemset(msgHandler, 0, mallocSize);
 
-	// 初始化链表
-	RyanMqttListInit(&msgHandler->list);
 	msgHandler->packetId = packetId;
-	msgHandler->qos = qos;
 	msgHandler->topicLen = topicLen;
+	msgHandler->qos = qos;
+	RyanMqttListInit(&msgHandler->list); // 初始化链表
 	msgHandler->userData = userData;
 	msgHandler->topic = (char *)msgHandler + sizeof(RyanMqttMsgHandler_t);
-	RyanMqttMemcpy(msgHandler->topic, topic, topicLen); // 将packet数据保存到ack中
+	RyanMqttMemcpy(msgHandler->topic, topic, topicLen);
+	msgHandler->topic[topicLen] = '\0';
 
 	*pMsgHandler = msgHandler;
 	return RyanMqttSuccessError;
 }
 
 /**
- * @brief 销毁msg 句柄
+ * @brief 销毁 msg 句柄
  *
  * @param msgHandler
  */
@@ -213,56 +251,26 @@ void RyanMqttMsgHandlerDestroy(RyanMqttClient_t *client, RyanMqttMsgHandler_t *m
 	platformMemoryFree(msgHandler);
 }
 
-RyanMqttBool_e RyanMqttMsgIsMatch(RyanMqttMsgHandler_t *msgHandler, const char *topic, uint16_t topicLen,
-				  RyanMqttBool_e topicMatchedFlag)
-{
-	// 不进行通配符匹配
-	if (RyanMqttTrue != topicMatchedFlag)
-	{
-		// 不相等跳过
-		if (topicLen != msgHandler->topicLen)
-		{
-			return RyanMqttFalse;
-		}
-
-		// 主题名称不相等且没有使能通配符匹配
-		if (0 != strncmp(topic, msgHandler->topic, topicLen))
-		{
-			return RyanMqttFalse;
-		}
-	}
-	else
-	{
-		// 进行通配符匹配
-		if (RyanMqttTrue != RyanMqttMatchTopic(topic, topicLen, msgHandler->topic, msgHandler->topicLen))
-		{
-			return RyanMqttFalse;
-		}
-	}
-
-	return RyanMqttTrue;
-}
-
 /**
  * @brief 查找msg句柄
  *
  * @param client
  * @param topic
  * @param topicLen
- * @param topicMatchedFlag
+ * @param isTopicMatchedFlag
  * @param pMsgHandler
  * @return RyanMqttError_e
  */
-RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *findMsgHandler,
-				       RyanMqttBool_e topicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler)
+RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
+				       RyanMqttBool_e isTopicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler)
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttList_t *curr, *next;
 	RyanMqttMsgHandler_t *msgHandler;
 
 	RyanMqttAssert(NULL != client);
-	RyanMqttAssert(NULL != findMsgHandler);
-	RyanMqttAssert(NULL != findMsgHandler->topic && 0 != findMsgHandler->topicLen);
+	RyanMqttAssert(NULL != msgMatchCriteria);
+	RyanMqttAssert(NULL != msgMatchCriteria->topic && 0 != msgMatchCriteria->topicLen);
 	RyanMqttAssert(NULL != pMsgHandler);
 
 	platformMutexLock(client->config.userData, &client->msgHandleLock);
@@ -270,8 +278,8 @@ RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHand
 	{
 		msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
 
-		if (RyanMqttFalse ==
-		    RyanMqttMsgIsMatch(msgHandler, findMsgHandler->topic, findMsgHandler->topicLen, topicMatchedFlag))
+		if (RyanMqttFalse == RyanMqttMsgTopicIsMatch(msgHandler, msgMatchCriteria->topic,
+							     msgMatchCriteria->topicLen, isTopicMatchedFlag))
 		{
 			continue;
 		}
@@ -289,38 +297,46 @@ __exit:
 	return result;
 }
 
-void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *findMsgHandler,
-					      RyanMqttBool_e isSkipMatchingId)
+/**
+ * @brief 查找同名进行删除,并根据skipSamePacketId查找packetid一样和不一样的进行删除后
+ * packetid不一样保证msg列表只有一个, packetid一样清除所有同名同packetid msg。用于清空批量订阅和取消订阅
+ *
+ * @param client
+ * @param msgMatchCriteria
+ * @param skipSamePacketId
+ */
+void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
+					      RyanMqttBool_e skipSamePacketId)
 {
 	RyanMqttList_t *curr, *next;
 	RyanMqttMsgHandler_t *msgHandler;
 
 	RyanMqttAssert(NULL != client);
-	RyanMqttAssert(NULL != findMsgHandler);
-	RyanMqttAssert(NULL != findMsgHandler->topic && 0 != findMsgHandler->topicLen);
+	RyanMqttAssert(NULL != msgMatchCriteria);
+	RyanMqttAssert(NULL != msgMatchCriteria->topic && 0 != msgMatchCriteria->topicLen);
 
 	platformMutexLock(client->config.userData, &client->msgHandleLock);
 	RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
 	{
 		msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
 
-		if (RyanMqttFalse == isSkipMatchingId)
+		if (RyanMqttFalse == skipSamePacketId)
 		{
-			if (msgHandler->packetId != findMsgHandler->packetId)
+			if (msgHandler->packetId != msgMatchCriteria->packetId)
 			{
 				continue;
 			}
 		}
 		else
 		{
-			if (msgHandler->packetId == findMsgHandler->packetId)
+			if (msgHandler->packetId == msgMatchCriteria->packetId)
 			{
 				continue;
 			}
 		}
 
-		if (RyanMqttFalse ==
-		    RyanMqttMsgIsMatch(msgHandler, findMsgHandler->topic, findMsgHandler->topicLen, RyanMqttFalse))
+		if (RyanMqttFalse == RyanMqttMsgTopicIsMatch(msgHandler, msgMatchCriteria->topic,
+							     msgMatchCriteria->topicLen, RyanMqttFalse))
 		{
 			continue;
 		}
@@ -329,8 +345,8 @@ void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqtt
 		RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
 		RyanMqttMsgHandlerDestroy(client, msgHandler);
 
-		// ?理论上最多只会有一个同名订阅,或许也不好说? 比如订阅的时侯数组内部有同名的?场景太少,可以不考虑
-		break;
+		// ?理论上最多只会有一个同名订阅,或许也不好说? 比如订阅的时侯数组内部有同名的?
+		// break;
 	}
 	platformMutexUnLock(client->config.userData, &client->msgHandleLock);
 }
@@ -348,8 +364,7 @@ RyanMqttError_e RyanMqttMsgHandlerAddToMsgList(RyanMqttClient_t *client, RyanMqt
 	RyanMqttAssert(NULL != msgHandler);
 
 	platformMutexLock(client->config.userData, &client->msgHandleLock);
-	RyanMqttListAddTail(&msgHandler->list,
-			    &client->msgHandlerList); // 将msgHandler节点添加到链表尾部
+	RyanMqttListAddTail(&msgHandler->list, &client->msgHandlerList); // 将msgHandler节点添加到链表尾部
 	platformMutexUnLock(client->config.userData, &client->msgHandleLock);
 
 	return RyanMqttSuccessError;

+ 23 - 20
mqttclient/include/RyanMqttClient.h

@@ -34,8 +34,8 @@ typedef struct
 	uint16_t topicLen;   // 主题长度
 	RyanMqttQos_e qos;   // qos等级
 	RyanMqttList_t list; // 链表节点,用户勿动
-	char *topic;         // 主题
 	void *userData;      // 用户自定义数据
+	char *topic;         // 主题,不要求必须是最后一位,但最好保持这样
 } RyanMqttMsgHandler_t;
 
 typedef struct
@@ -48,7 +48,7 @@ typedef struct
 	RyanMqttList_t list;                 // 链表节点,用户勿动
 	RyanMqttTimer_t timer;               // ack超时定时器,用户勿动
 	RyanMqttMsgHandler_t *msgHandler;    // msg信息
-	uint8_t *packet;                     // 没有收到期望ack,重新发送的原始报文
+	uint8_t *packet;                     // 没有收到期望ack,重新发送的原始报文,不要求必须是最后一位,但最好保持这样
 } RyanMqttAckHandler_t;
 
 typedef struct
@@ -76,32 +76,33 @@ typedef struct
 
 typedef struct
 {
-	char *clientId;                   // 客户端ID
-	char *userName;                   // 用户名
-	char *password;                   // 密码
-	char *host;                       // mqtt服务器地址
-	char *taskName;                   // 线程名字
+	char *clientId;      // 客户端ID
+	char *userName;      // 用户名
+	char *password;      // 密码
+	char *host;          // mqtt服务器地址
+	uint16_t port;       // mqtt服务器端口
+	uint8_t mqttVersion; // mqtt版本 3.1.1是4, 3.1是3
+
+	char *taskName;     // 线程名字
+	uint16_t taskPrio;  // mqtt线程优先级
+	uint16_t taskStack; // 线程栈大小
+
 	RyanMqttBool_e autoReconnectFlag; // 自动重连标志位
 	RyanMqttBool_e cleanSessionFlag;  // 清除会话标志位
-	uint8_t mqttVersion;              // mqtt版本 3.1.1是4, 3.1是3
-	uint16_t port;                    // mqtt服务器端口
 
-	// ack重发超过这个数值后触发事件回调,根据实际硬件选择。典型值为 *
-	// ackTimeout ~= 300秒
+	// ack重发超过这个数值后触发事件回调,根据实际硬件选择。典型值为 5
 	uint16_t ackHandlerRepeatCountWarning;
-	uint16_t taskPrio;  // mqtt线程优先级
-	uint16_t taskStack; // 线程栈大小
-
-	// mqtt等待接收命令超时时间, 根据实际硬件选择。推荐 > ackTimeout && <= (keepaliveTimeoutS / 2)
-	uint16_t recvTimeout;
-	uint16_t sendTimeout;       // mqtt发送给命令超时时间, 根据实际硬件选择。
-	uint16_t ackTimeout;        // mqtack等待命令超时时间, 典型值为5 - 30
-	uint16_t keepaliveTimeoutS; // mqtt心跳时间间隔秒
 
 	// 等待ack的警告数.每次添加ack,ack总数大于或等于该值将触发事件回调,根据实际硬件选择。典型值是32
 	uint16_t ackHandlerCountWarning;
 
-	uint16_t reconnectTimeout;           // mqtt重连间隔时间
+	// mqtt等待接收命令超时时间, 根据实际硬件选择。推荐 > ackTimeout && <= (keepaliveTimeoutS / 2)
+	uint16_t recvTimeout;       // mqtt接收命令超时时间, 根据实际硬件选择。单位ms
+	uint16_t sendTimeout;       // mqtt发送命令超时时间, 根据实际硬件选择。单位ms
+	uint16_t ackTimeout;        // mqtt ack 等待回复的超时时间, 典型值为5 - 60秒。单位ms
+	uint16_t keepaliveTimeoutS; // mqtt心跳时间间隔。单位S
+	uint16_t reconnectTimeout;  // mqtt重连间隔时间。单位S
+
 	RyanMqttEventHandle mqttEventHandle; // mqtt事件回调函数
 	void *userData;                      // 用户自定义数据,用户需要保证指针指向内容的持久性
 } RyanMqttClientConfig_t;
@@ -162,6 +163,8 @@ extern RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client,
 
 extern RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client);
 extern RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain);
+extern RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig);
+extern RyanMqttError_e RyanMqttFreeConfigFromGet(RyanMqttClientConfig_t *clientConfig);
 extern RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig);
 extern RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
 				      RyanMqttQos_e qos, RyanMqttBool_e retain);

+ 12 - 4
mqttclient/include/RyanMqttPlatform.h

@@ -13,22 +13,30 @@ extern "C" {
 #define RyanMqttMemset memset
 #endif
 
-#ifndef RyanMqttMemset
+#ifndef RyanMqttStrlen
 #define RyanMqttStrlen strlen
 #endif
 
-#ifndef RyanMqttMemset
+#ifndef RyanMqttMemcpy
 #define RyanMqttMemcpy memcpy
 #endif
 
-#ifndef RyanMqttMemset
-#define RyanMqttStrcmp strcmp
+#ifndef RyanMqttStrncmp
+#define RyanMqttStrncmp strncmp
 #endif
 
 #ifndef platformAssert
 #define platformAssert assert
 #endif
 
+#ifndef RyanMqttSnprintf
+#define RyanMqttSnprintf snprintf
+#endif
+
+#ifndef RyanMqttVsnprintf
+#define RyanMqttVsnprintf vsnprintf
+#endif
+
 // RyanMqttT内部imer接口
 typedef struct
 {

+ 11 - 0
mqttclient/include/RyanMqttPublic.h

@@ -178,6 +178,15 @@ typedef enum
 	 */
 	RyanMqttEventData = RyanMqttBit14,
 
+	/**
+	 * @brief 接收到未匹配任何订阅主题的报文事件
+	 *
+	 * 此事件触发时,报文的主题不在当前客户端订阅的 topic 列表中,
+	 *
+	 * @eventData RyanMqttMsgData_t*
+	 */
+	RyanMqttEventUnsubscribedData = RyanMqttBit15,
+
 	RyanMqttEventAnyId = UINT32_MAX,
 } RyanMqttEventId_e;
 
@@ -200,6 +209,7 @@ typedef enum
 	RyanMqttSendBufToShortError,        // MQTT 缓冲区太短
 	RyanMqttNotEnoughMemError,          // MQTT 内存不足
 	RyanMqttFailedError,                // 失败
+	RyanMqttInvalidPacketError,         // 收到非法的报文
 	RyanMqttSuccessError = 0x0000,      // 成功
 	RyanMqttErrorForceInt32 = INT32_MAX // 强制编译器使用int32_t类型
 } RyanMqttError_e;
@@ -223,6 +233,7 @@ typedef enum
 	RyanMqttConnectTimeout,                     // 超时断开
 	RyanMqttConnectFirstPackNotConnack,         // 发送connect后接受到的第一个报文不是connack
 	RyanMqttConnectProtocolError,               // 多次收到connack
+	RyanMqttConnectInvalidPacketError,          // 收到不符合MQTT3.1.1协议的报文,并且要求关闭客户端的
 	RyanMqttConnectStatusForceInt32 = INT32_MAX // 强制编译器使用int32_t类型
 } RyanMqttConnectStatus_e;
 

+ 5 - 6
mqttclient/include/RyanMqttUtil.h

@@ -29,21 +29,20 @@ extern void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e sta
 extern RyanMqttState_e RyanMqttGetClientState(RyanMqttClient_t *client);
 extern RyanMqttError_e RyanMqttStringCopy(char **dest, const char *rest, uint32_t strLen);
 extern void RyanMqttPurgeSession(RyanMqttClient_t *client);
+extern void RyanMqttPurgeConfig(RyanMqttClientConfig_t *clientConfig);
 
 extern RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, uint8_t *buf, uint32_t length);
 extern RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, uint8_t *buf, uint32_t length);
 
 // msg
-extern RyanMqttBool_e RyanMqttMsgIsMatch(RyanMqttMsgHandler_t *msgHandler, const char *topic, uint16_t topicLen,
-					 RyanMqttBool_e topicMatchedFlag);
 extern RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, const char *topic, uint16_t topicLen,
 						uint16_t packetId, RyanMqttQos_e qos, void *userData,
 						RyanMqttMsgHandler_t **pMsgHandler);
 extern void RyanMqttMsgHandlerDestroy(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
-extern RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *findMsgHandler,
-					      RyanMqttBool_e topicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler);
-extern void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *findMsgHandler,
-						     RyanMqttBool_e isSkipMatchingId);
+extern RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
+					      RyanMqttBool_e isTopicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler);
+extern void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
+						     RyanMqttBool_e skipSamePacketId);
 extern RyanMqttError_e RyanMqttMsgHandlerAddToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 extern RyanMqttError_e RyanMqttMsgHandlerRemoveToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 

+ 3 - 1
platform/linux/platformSystem.h

@@ -19,7 +19,9 @@ extern "C" {
 #define RyanMqttMemset     memset
 #define RyanMqttStrlen     strlen
 #define RyanMqttMemcpy     memcpy
-#define RyanMqttStrcmp     strcmp
+#define RyanMqttStrncmp    strncmp
+#define RyanMqttSnprintf   snprintf
+#define RyanMqttVsnprintf  vsnprintf
 
 typedef struct
 {

+ 3 - 1
platform/openLuat/platformSystem.h

@@ -17,7 +17,9 @@ extern "C" {
 #define RyanMqttMemset     memset
 #define RyanMqttStrlen     strlen
 #define RyanMqttMemcpy     memcpy
-#define RyanMqttStrcmp     strcmp
+#define RyanMqttStrncmp    strncmp
+#define RyanMqttSnprintf   snprintf
+#define RyanMqttVsnprintf  vsnprintf
 
 typedef struct
 {

+ 3 - 1
platform/rtthread/platformSystem.h

@@ -14,7 +14,9 @@ extern "C" {
 #define RyanMqttMemset     rt_memset
 #define RyanMqttStrlen     rt_strlen
 #define RyanMqttMemcpy     rt_memcpy
-#define RyanMqttStrcmp     rt_strcmp
+#define RyanMqttStrncmp    rt_strncmp
+#define RyanMqttSnprintf   rt_snprintf
+#define RyanMqttVsnprintf  rt_vsnprintf
 
 typedef struct
 {

+ 2 - 2
test/RyanMqttMultiThreadMultiClientTest.c

@@ -81,7 +81,7 @@ static void *concurrentPublishThread(void *arg)
 	}
 
 	// 订阅主题
-	snprintf(topic, sizeof(topic), "test/thread/%d", testData->threadIndex);
+	RyanMqttSnprintf(topic, sizeof(topic), "test/thread/%d", testData->threadIndex);
 	result = RyanMqttSubscribe(testData->client, topic, testData->threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
 	if (RyanMqttSuccessError != result)
 	{
@@ -92,7 +92,7 @@ static void *concurrentPublishThread(void *arg)
 	// 发布消息
 	for (int i = 0; i < MESSAGES_PER_THREAD; i++)
 	{
-		snprintf(payload, sizeof(payload), "Message %d from thread %d", i, testData->threadIndex);
+		RyanMqttSnprintf(payload, sizeof(payload), "Message %d from thread %d", i, testData->threadIndex);
 
 		result = RyanMqttPublish(testData->client, topic, payload, RyanMqttStrlen(payload),
 					 i % 2 ? RyanMqttQos2 : RyanMqttQos1, RyanMqttFalse);

+ 2 - 2
test/RyanMqttMultiThreadSafetyTest.c

@@ -93,7 +93,7 @@ static void *concurrentPublishThread(void *arg)
 	ThreadTestData_t *testData = &g_threadTestData[threadIndex];
 
 	// 订阅主题
-	snprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
+	RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
 	result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
 	if (RyanMqttSuccessError != result)
 	{
@@ -104,7 +104,7 @@ static void *concurrentPublishThread(void *arg)
 	// 发布消息
 	for (int i = 0; i < MESSAGES_PER_THREAD; i++)
 	{
-		snprintf(payload, sizeof(payload), "M %d %d", i, threadIndex);
+		RyanMqttSnprintf(payload, sizeof(payload), "M %d %d", i, threadIndex);
 		RyanMqttQos_e qos = (RyanMqttQos_e)(i % 3);
 
 		result = RyanMqttPublish(g_testControl.client, topic, payload, RyanMqttStrlen(payload), qos,

+ 1 - 1
test/RyanMqttPubTest.c

@@ -87,7 +87,7 @@ static RyanMqttError_e RyanMqttPublishTest(RyanMqttQos_e qos, int32_t count, uin
 	// 等待订阅主题成功
 	result = RyanMqttSubscribe(client, RyanMqttPubTestSubTopic, qos);
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
-    // !不等待topic订阅成功,检查是否正常接收消息
+	// !不等待topic订阅成功,检查是否正常接收消息
 	// for (int32_t i = 0;; i++)
 	// {
 	// 	int32_t subscribeTotal = 0;

+ 3 - 3
test/RyanMqttSubTest.c

@@ -7,7 +7,7 @@ static RyanMqttSubscribeData_t *topicIsSubscribeArr(char *topic)
 {
 	for (int32_t i = 0; i < subTestCount; i++)
 	{
-		if (0 == RyanMqttStrcmp(topic, subscribeManyData[i].topic))
+		if (0 == RyanMqttStrncmp(topic, subscribeManyData[i].topic, RyanMqttStrlen(topic)))
 		{
 			return &subscribeManyData[i];
 		}
@@ -176,7 +176,7 @@ static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
 				result = RyanMqttNotEnoughMemError;
 				goto __exit;
 			}
-			snprintf(topic, 64, "test/subscribe/%d", i);
+			RyanMqttSnprintf(topic, 64, "test/subscribe/%d", i);
 			subscribeManyData[i].topic = topic;
 			subscribeManyData[i].topicLen = RyanMqttStrlen(topic);
 		}
@@ -223,7 +223,7 @@ static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
 			result = RyanMqttNotEnoughMemError;
 			goto __exit;
 		}
-		snprintf(topic, 64, "test/subscribe/%d", i);
+		RyanMqttSnprintf(topic, 64, "test/subscribe/%d", i);
 		unSubscribeManyData[i].topic = topic;
 		unSubscribeManyData[i].topicLen = RyanMqttStrlen(topic);
 	}

+ 14 - 1
test/RyanMqttTest.c

@@ -116,6 +116,13 @@ void mqttEventBaseHandle(void *pclient, RyanMqttEventId_e event, const void *eve
 		}
 		break;
 
+	case RyanMqttEventUnsubscribedData: {
+		RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
+		RyanMqttLog_i("接收到未匹配任何订阅主题的报文事件 topic: %.*s, packetId: %d, payload len: %d",
+			      msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen);
+		break;
+	}
+
 	default: break;
 	}
 }
@@ -135,7 +142,7 @@ RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncF
 	count++;
 	RyanMqttTestExitCritical();
 
-	snprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
+	RyanMqttSnprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
 
 	struct RyanMqttTestEventUserData *eventUserData =
 		(struct RyanMqttTestEventUserData *)malloc(sizeof(struct RyanMqttTestEventUserData));
@@ -191,6 +198,12 @@ RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncF
 	result = RyanMqttSetConfig(*client, &mqttConfig);
 	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
 
+	// 获取config测试
+	RyanMqttClientConfig_t *mqttConfig22;
+	result = RyanMqttGetConfig(*client, &mqttConfig22);
+	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_e);
+	RyanMqttFreeConfigFromGet(mqttConfig22);
+
 	// 设置遗嘱消息
 	result = RyanMqttSetLwt(*client, "pub/lwt/test", "this is will", RyanMqttStrlen("this is will"), RyanMqttQos2,
 				0);

+ 3 - 4
xmake.lua

@@ -4,8 +4,8 @@ target("RyanMqtt",function()
 
     add_syslinks("pthread")
     set_toolchains("gcc")  -- 确保使用 GCC
-    -- set_toolchains("clang-20")  -- 确保使用 GCC
-    set_languages("gnu99") -- 关键!启用 GNU 扩展
+    -- set_toolchains("clang-20")  
+    set_languages("gnu99") -- 启用 GNU 扩展
     set_warnings("everything") -- 启用全部警告 -Wall -Wextra -Weffc++ / -Weverything
 
     -- set_optimize("smallest") -- -Os
@@ -14,7 +14,7 @@ target("RyanMqtt",function()
     set_optimize("aggressive") -- -Ofast
 
     add_defines("PKG_USING_RYANMQTT_IS_ENABLE_ASSERT") -- 开启assert
-    add_ldflags("-Wl,-Map=$(buildir)/RyanMqtt.map") 
+    add_ldflags("-Wl,-Map=$(buildir)/RyanMqtt.map")
     add_cxflags(
                 "-pedantic",  
                 "-Wall",
@@ -39,5 +39,4 @@ target("RyanMqtt",function()
     add_files('./platform/linux/*.c', {public = true})
     add_files('./platform/linux/valloc/*.c', {public = true})
 
-
 end)