Răsfoiți Sursa

refactor: 优化弱网下表现

RyanCW 3 luni în urmă
părinte
comite
1388e394ad

+ 5 - 1
.vscode/settings.json

@@ -47,6 +47,10 @@
         "inttypes.h": "c",
         "inttypes.h": "c",
         "memory": "c",
         "memory": "c",
         "stdbool.h": "c",
         "stdbool.h": "c",
-        "ryanmqttutil.h": "c"
+        "ryanmqttutil.h": "c",
+        "cstdlib": "c",
+        "stdatomic.h": "c",
+        "atomic": "c",
+        "cstdint": "c"
     },
     },
 }
 }

+ 1 - 1
RyanMqtt2.0发布说明及迁移指南.md

@@ -26,7 +26,7 @@ extern RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t
             RyanMqttUnSubscribeData_t unSubscribeManyData[]);
             RyanMqttUnSubscribeData_t unSubscribeManyData[]);
 
 
 // 带用户数据的发布
 // 带用户数据的发布
-extern RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen,
+extern RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen,
         char *payload, uint32_t payloadLen, RyanMqttQos_e qos,
         char *payload, uint32_t payloadLen, RyanMqttQos_e qos,
         RyanMqttBool_e retain, void *userData);
         RyanMqttBool_e retain, void *userData);
 
 

+ 11 - 2
coreMqtt/core_mqtt_serializer.c

@@ -924,14 +924,23 @@ static bool incomingPacketValid( uint8_t packetType )
     {
     {
         /* Valid incoming packet types. */
         /* Valid incoming packet types. */
         case MQTT_PACKET_TYPE_CONNACK:
         case MQTT_PACKET_TYPE_CONNACK:
-        case MQTT_PACKET_TYPE_PUBLISH:
         case MQTT_PACKET_TYPE_PUBACK:
         case MQTT_PACKET_TYPE_PUBACK:
         case MQTT_PACKET_TYPE_PUBREC:
         case MQTT_PACKET_TYPE_PUBREC:
         case MQTT_PACKET_TYPE_PUBCOMP:
         case MQTT_PACKET_TYPE_PUBCOMP:
         case MQTT_PACKET_TYPE_SUBACK:
         case MQTT_PACKET_TYPE_SUBACK:
         case MQTT_PACKET_TYPE_UNSUBACK:
         case MQTT_PACKET_TYPE_UNSUBACK:
         case MQTT_PACKET_TYPE_PINGRESP:
         case MQTT_PACKET_TYPE_PINGRESP:
-            status = true;
+            if( ( packetType & 0x0FU ) == 0U )
+            {
+                status = true;
+            }
+            break;
+
+        case MQTT_PACKET_TYPE_PUBLISH:
+            if( ( packetType & 0x06U ) != 0x06U )
+            {
+                status = true;
+            }
             break;
             break;
 
 
         case ( MQTT_PACKET_TYPE_PUBREL & 0xF0U ):
         case ( MQTT_PACKET_TYPE_PUBREL & 0xF0U ):

+ 29 - 24
mqttclient/RyanMqttClient.c

@@ -350,7 +350,7 @@ __exit:
 			msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
 			msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
 			msgMatchCriteria.packetId = packetId;
 			msgMatchCriteria.packetId = packetId;
 
 
-			RyanMqttMsgHandlerFindAndDestroyByPackId(client, &msgMatchCriteria, RyanMqttFalse);
+			RyanMqttMsgHandlerFindAndDestroyByPacketId(client, &msgMatchCriteria, RyanMqttFalse);
 		}
 		}
 	}
 	}
 
 
@@ -407,13 +407,13 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	}
 	}
 
 
 	// 申请 coreMqtt 支持的topic格式空间
 	// 申请 coreMqtt 支持的topic格式空间
-	MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
-	RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
+	MQTTSubscribeInfo_t *unSubscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
+	RyanMqttCheck(NULL != unSubscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 	for (int32_t i = 0; i < count; i++)
 	for (int32_t i = 0; i < count; i++)
 	{
 	{
-		subscriptionList[i].qos = (MQTTQoS_t)RyanMqttQos0; // 无效数据,仅当占位符
-		subscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
-		subscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
+		unSubscriptionList[i].qos = (MQTTQoS_t)RyanMqttSubFail; // 无效数据,仅当占位符
+		unSubscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
+		unSubscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
 	}
 	}
 
 
 	// 序列化数据包
 	// 序列化数据包
@@ -422,19 +422,19 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 
 
 		// 获取数据包大小
 		// 获取数据包大小
 		MQTTStatus_t status =
 		MQTTStatus_t status =
-			MQTT_GetUnsubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
+			MQTT_GetUnsubscribePacketSize(unSubscriptionList, count, &remainingLength, &fixedBuffer.size);
 		RyanMqttAssert(MQTTSuccess == status);
 		RyanMqttAssert(MQTTSuccess == status);
 
 
 		// 申请数据包的空间
 		// 申请数据包的空间
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 		fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
 		RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
 		RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
-				  { platformMemoryFree(subscriptionList); });
+				  { platformMemoryFree(unSubscriptionList); });
 
 
 		// 序列化数据包
 		// 序列化数据包
 		packetId = RyanMqttGetNextPacketId(client);
 		packetId = RyanMqttGetNextPacketId(client);
-		status = MQTT_SerializeUnsubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
+		status = MQTT_SerializeUnsubscribe(unSubscriptionList, count, packetId, remainingLength, &fixedBuffer);
 		RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
 		RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
-			platformMemoryFree(subscriptionList);
+			platformMemoryFree(unSubscriptionList);
 			platformMemoryFree(fixedBuffer.pBuffer);
 			platformMemoryFree(fixedBuffer.pBuffer);
 		});
 		});
 	}
 	}
@@ -443,20 +443,23 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 	for (int32_t i = 0; i < count; i++)
 	for (int32_t i = 0; i < count; i++)
 	{
 	{
 		// ?不判断是否订阅,统一都发送取消
 		// ?不判断是否订阅,统一都发送取消
-		RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)subscriptionList[i].pTopicFilter,
-							 .topicLen = subscriptionList[i].topicFilterLength};
+		RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)unSubscriptionList[i].pTopicFilter,
+							 .topicLen = unSubscriptionList[i].topicFilterLength};
+
+		platformMutexLock(client->config.userData, &client->msgHandleLock);
 		result =
 		result =
 			RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler, RyanMqttFalse);
 			RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler, RyanMqttFalse);
 		if (RyanMqttSuccessError == result)
 		if (RyanMqttSuccessError == result)
 		{
 		{
 			// !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
 			// !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
 			// 同步msg qos等级,之后unsub回调使用
 			// 同步msg qos等级,之后unsub回调使用
-			subscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
+			unSubscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
 		}
 		}
+		platformMutexUnLock(client->config.userData, &client->msgHandleLock);
 
 
-		result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
-						  subscriptionList[i].topicFilterLength, RyanMqttMsgInvalidPacketId,
-						  (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
+		result = RyanMqttMsgHandlerCreate(client, unSubscriptionList[i].pTopicFilter,
+						  unSubscriptionList[i].topicFilterLength, packetId,
+						  (RyanMqttQos_e)unSubscriptionList[i].qos, NULL, &msgHandler);
 		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
 		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
 					  { goto __RyanMqttUnSubCreateAckErrorExit; });
 					  { goto __RyanMqttUnSubCreateAckErrorExit; });
 
 
@@ -473,7 +476,7 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
 
 
 __RyanMqttUnSubCreateAckErrorExit:
 __RyanMqttUnSubCreateAckErrorExit:
 		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
 		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
-		platformMemoryFree(subscriptionList);
+		platformMemoryFree(unSubscriptionList);
 		platformMemoryFree(fixedBuffer.pBuffer);
 		platformMemoryFree(fixedBuffer.pBuffer);
 		return RyanMqttNotEnoughMemError;
 		return RyanMqttNotEnoughMemError;
 	}
 	}
@@ -484,11 +487,11 @@ __RyanMqttUnSubCreateAckErrorExit:
 	RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
 	RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
 		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
 		RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
 
 
-		platformMemoryFree(subscriptionList);
+		platformMemoryFree(unSubscriptionList);
 		platformMemoryFree(fixedBuffer.pBuffer);
 		platformMemoryFree(fixedBuffer.pBuffer);
 	});
 	});
 
 
-	platformMemoryFree(subscriptionList);
+	platformMemoryFree(unSubscriptionList);
 	platformMemoryFree(fixedBuffer.pBuffer);
 	platformMemoryFree(fixedBuffer.pBuffer);
 	return result;
 	return result;
 }
 }
@@ -508,9 +511,9 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
 	return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
 	return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
 }
 }
 
 
-RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen, char *payload,
-					   uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain,
-					   void *userData)
+RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen, char *payload,
+					    uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain,
+					    void *userData)
 {
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
 	uint16_t packetId;
 	uint16_t packetId;
@@ -595,7 +598,8 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 
 
 		result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
 		result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
-		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
+		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_e, {
+			RyanMqttLog_e("RyanMqttSendPacket failed, clear user ack session");
 			// userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
 			// userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
 			RyanMqttClearAckSession(client, packetType, packetId);
 			RyanMqttClearAckSession(client, packetType, packetId);
 		});
 		});
@@ -620,7 +624,8 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
 {
 {
 	RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
 
 
-	return RyanMqttPublishAndUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain, NULL);
+	return RyanMqttPublishWithUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain,
+					   NULL);
 }
 }
 
 
 /**
 /**

+ 19 - 14
mqttclient/RyanMqttThread.c

@@ -1,4 +1,5 @@
 #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
 #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
+// #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
 // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
 // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
 
 
 #include "RyanMqttThread.h"
 #include "RyanMqttThread.h"
@@ -34,16 +35,6 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 
 
 	uint32_t timeRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
 	uint32_t timeRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
 
 
-	// 超过设置的 1.5 倍心跳周期,主动通知用户断开连接
-	if (0 == timeRemain)
-	{
-		RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
-		RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
-		RyanMqttLog_d("ErrorCode: %d, strError: %s", RyanMqttKeepaliveTimeout,
-			      RyanMqttStrError(RyanMqttKeepaliveTimeout));
-		return RyanMqttFailedError;
-	}
-
 	// 当剩余时间大于 recvtimeout 并且小于 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
 	// 当剩余时间大于 recvtimeout 并且小于 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
 	if (timeRemain > (uint32_t)(client->config.recvTimeout + 100))
 	if (timeRemain > (uint32_t)(client->config.recvTimeout + 100))
 	{
 	{
@@ -61,6 +52,16 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 		}
 		}
 	}
 	}
 
 
+	// 超过设置的 1.5 倍心跳周期,主动通知用户断开连接
+	if (0 == timeRemain)
+	{
+		RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
+		RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
+		RyanMqttLog_d("ErrorCode: %d, strError: %s", RyanMqttKeepaliveTimeout,
+			      RyanMqttStrError(RyanMqttKeepaliveTimeout));
+		return RyanMqttFailedError;
+	}
+
 	// 发送mqtt心跳包
 	// 发送mqtt心跳包
 	{
 	{
 		// MQTT_PACKET_PINGREQ_SIZE
 		// MQTT_PACKET_PINGREQ_SIZE
@@ -81,7 +82,6 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
 	return RyanMqttSuccessError;
 	return RyanMqttSuccessError;
 }
 }
 
 
-// todo 也可以考虑有ack链表的时候recvTime可以短一些,有坑点
 // todo 也可以考虑将发送操作独立出去,异步发送,目前没有遇到性能瓶颈,需要超高性能的时候再考虑吧
 // todo 也可以考虑将发送操作独立出去,异步发送,目前没有遇到性能瓶颈,需要超高性能的时候再考虑吧
 /**
 /**
  * @brief 遍历ack链表,进行相应的处理
  * @brief 遍历ack链表,进行相应的处理
@@ -96,7 +96,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 	RyanMqttList_t *curr, *next;
 	RyanMqttList_t *curr, *next;
 	RyanMqttAckHandler_t *ackHandler;
 	RyanMqttAckHandler_t *ackHandler;
 	RyanMqttTimer_t ackScanRemainTimer;
 	RyanMqttTimer_t ackScanRemainTimer;
-	uint32_t ackScanThrottleTime = 1000; // 最长一秒
+	uint32_t ackScanThrottleTime = 1000; // ack扫描节流最长一秒
 	RyanMqttAssert(NULL != client);
 	RyanMqttAssert(NULL != client);
 
 
 	// mqtt没有连接就退出
 	// mqtt没有连接就退出
@@ -166,7 +166,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 		case MQTT_PACKET_TYPE_PUBCOMP: // 理论不会出现,冗余措施
 		case MQTT_PACKET_TYPE_PUBCOMP: // 理论不会出现,冗余措施
 		{
 		{
 			// 设置重发标志位
 			// 设置重发标志位
-			if (ackHandler->packet && 0 == ackHandler->repeatCount)
+			if (0 == ackHandler->repeatCount && ackHandler->packet)
 			{
 			{
 				MQTT_UpdateDuplicatePublishFlag(ackHandler->packet, true);
 				MQTT_UpdateDuplicatePublishFlag(ackHandler->packet, true);
 			}
 			}
@@ -192,7 +192,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 		// 订阅 / 取消订阅超时就认为失败
 		// 订阅 / 取消订阅超时就认为失败
 		case MQTT_PACKET_TYPE_SUBACK: {
 		case MQTT_PACKET_TYPE_SUBACK: {
 			RyanMqttMsgHandler_t *msgMatchCriteria = ackHandler->msgHandler;
 			RyanMqttMsgHandler_t *msgMatchCriteria = ackHandler->msgHandler;
-			RyanMqttMsgHandlerFindAndDestroyByPackId(client, msgMatchCriteria, RyanMqttFalse);
+			RyanMqttMsgHandlerFindAndDestroyByPacketId(client, msgMatchCriteria, RyanMqttFalse);
 			RyanMqttEventMachine(client, RyanMqttEventSubscribedFailed, (void *)ackHandler->msgHandler);
 			RyanMqttEventMachine(client, RyanMqttEventSubscribedFailed, (void *)ackHandler->msgHandler);
 			RyanMqttAckListRemoveToAckList(client, ackHandler);
 			RyanMqttAckListRemoveToAckList(client, ackHandler);
 			RyanMqttAckHandlerDestroy(client, ackHandler); // 清除句柄
 			RyanMqttAckHandlerDestroy(client, ackHandler); // 清除句柄
@@ -220,6 +220,11 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 	{
 	{
 		// 启动ack scan节流定时器
 		// 启动ack scan节流定时器
 		RyanMqttTimerCutdown(&client->ackScanThrottleTimer, ackScanThrottleTime);
 		RyanMqttTimerCutdown(&client->ackScanThrottleTimer, ackScanThrottleTime);
+		client->pendingAckFlag = RyanMqttFalse;
+	}
+	else
+	{
+		client->pendingAckFlag = RyanMqttTrue;
 	}
 	}
 }
 }
 
 

+ 126 - 66
mqttclient/RyanMqttThreadProcessPacket.c

@@ -1,5 +1,6 @@
 #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
 #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
-// #define RyanMqttLogLevel (RyanMqttLogLevelDebug)  // 日志打印等级
+// #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
+// #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
 
 
 #include "RyanMqttThread.h"
 #include "RyanMqttThread.h"
 #include "RyanMqttLog.h"
 #include "RyanMqttLog.h"
@@ -91,18 +92,26 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 
 
 	// 反序列化ack包
 	// 反序列化ack包
 	MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
 	MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
-	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
+	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_e);
+
+	// 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
+	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec, RyanMqttFalse);
+	if (RyanMqttSuccessError != result)
+	{
+		// 没有pubrec ,并且没有pubcomp,说明这个报文是非法报文,不进行mqtt服务器回复
+		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubrec,
+						 RyanMqttFalse);
+		RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttInvalidPacketError, RyanMqttLog_d);
+	}
 
 
-	// 每次收到 PUBREC 都返回ack,确保服务器可以认为数据包被发送了
 	uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
 	uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
 	MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
 	MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
 
 
 	// 序列化ack数据包
 	// 序列化ack数据包
 	status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREL, packetId);
 	status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREL, packetId);
-	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
+	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_e);
 
 
-	// 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
-	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec, RyanMqttFalse);
+	// 需要小心 result 被覆盖
 	if (RyanMqttSuccessError == result)
 	if (RyanMqttSuccessError == result)
 	{
 	{
 		RyanMqttMsgHandler_t *msgHandler;
 		RyanMqttMsgHandler_t *msgHandler;
@@ -131,13 +140,6 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 			RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
 			RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
 		}
 		}
 	}
 	}
-	else
-	{
-		// 没有pubrec ,并且没有pubcomp,说明这个报文是非法报文,不进行mqtt服务器回复
-		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubrec,
-						 RyanMqttFalse);
-		RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttInvalidPacketError, RyanMqttLog_d);
-	}
 
 
 	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
@@ -216,8 +218,8 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 		// 上面代码不太可能出错,如果出错了就让服务器重新发送吧
 		// 上面代码不太可能出错,如果出错了就让服务器重新发送吧
 		RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 		RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
 
-		// 收到 publish 就期望收到 PUBREL ,如果 PUBREL 报文已经存在说明不是首次收到 publish,不进行qos2 PUBREC
-		// 消息处理
+		// 收到 publish 就期望收到 PUBREL .
+		// 如果 PUBREL 报文已经存在说明不是首次收到 publish,不进行qos2 PUBREC消息处理
 		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler,
 		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler,
 						 RyanMqttFalse);
 						 RyanMqttFalse);
 		if (RyanMqttSuccessError != result)
 		if (RyanMqttSuccessError != result)
@@ -237,6 +239,11 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 					  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
 					  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
 			RyanMqttAckListAddToAckList(client, ackHandler);
 			RyanMqttAckListAddToAckList(client, ackHandler);
 		}
 		}
+		else
+		{
+			// 不是第一次收到 publish 报文,不进行消息分发
+			RyanMqttLog_e("Not the first time to receive publish packet, packetId: %d", msgData.packetId);
+		}
 
 
 		// 无论是不是第一次收到,都回复 pub ack报文
 		// 无论是不是第一次收到,都回复 pub ack报文
 		result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 		result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
@@ -329,12 +336,13 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacke
 		// 处理非订阅ack
 		// 处理非订阅ack
 		if (MQTT_PACKET_TYPE_SUBACK != ackHandler->packetType)
 		if (MQTT_PACKET_TYPE_SUBACK != ackHandler->packetType)
 		{
 		{
+			RyanMqttLog_e("packetType error: %02x", ackHandler->packetType);
 			goto __next;
 			goto __next;
 		}
 		}
 
 
 		platformMutexLock(client->config.userData, &client->msgHandleLock);
 		platformMutexLock(client->config.userData, &client->msgHandleLock);
 		// 查找同名订阅但是packetid不一样的进行删除,保证订阅主题列表只有一个最新的
 		// 查找同名订阅但是packetid不一样的进行删除,保证订阅主题列表只有一个最新的
-		RyanMqttMsgHandlerFindAndDestroyByPackId(client, ackHandler->msgHandler, RyanMqttTrue);
+		RyanMqttMsgHandlerFindAndDestroyByPacketId(client, ackHandler->msgHandler, RyanMqttTrue);
 
 
 		// 到这里就可以保证没有同名订阅了
 		// 到这里就可以保证没有同名订阅了
 		// 查找之前记录的topic句柄,根据服务器授权Qos进行更新
 		// 查找之前记录的topic句柄,根据服务器授权Qos进行更新
@@ -415,6 +423,7 @@ static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client, MQTTPac
 			continue;
 			continue;
 		}
 		}
 
 
+		// 必须先判断packetId是否相等,再判断类型
 		if (MQTT_PACKET_TYPE_UNSUBACK != ackHandler->packetType)
 		if (MQTT_PACKET_TYPE_UNSUBACK != ackHandler->packetType)
 		{
 		{
 			goto __next;
 			goto __next;
@@ -466,51 +475,101 @@ RyanMqttError_e RyanMqttGetPacketInfo(RyanMqttClient_t *client, MQTTPacketInfo_t
 {
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttAssert(NULL != client);
 	RyanMqttAssert(NULL != client);
+	uint8_t pBuffer[5]; // MQTT 固定报头最大 5 字节
+	uint8_t needReadSize = 2;
+	size_t readIndex = 0;
+	MQTTStatus_t status;
 
 
-	NetworkContext_t pNetworkContext = {.client = client};
-	// todo 可以考虑增加包大小限制,目前不准备加
-	MQTTStatus_t status =
-		MQTT_GetIncomingPacketTypeAndLength(coreMqttTransportRecv, &pNetworkContext, pIncomingPacket);
+	// // todo 可以考虑增加包大小限制,目前不准备加,错误需要更复杂的实现
+	// MQTTStatus_t status =
+	// 	MQTT_GetIncomingPacketTypeAndLength(coreMqttTransportRecv, &pNetworkContext, pIncomingPacket);
 
 
-	// 先同步用户接口的ack链表
-	RyanMqttSyncUserAckHandle(client);
-
-	if (MQTTSuccess == status)
+	do
 	{
 	{
-		// 申请断开连接数据包的空间
-		if (pIncomingPacket->remainingLength > 0)
+		// 第一次直接读取 2 个字节
+		result = RyanMqttRecvPacket(client, pBuffer + readIndex, needReadSize);
+		if (RyanMqttRecvPacketTimeOutError == result)
 		{
 		{
-			pIncomingPacket->pRemainingData = platformMemoryMalloc(pIncomingPacket->remainingLength);
-			RyanMqttCheck(NULL != pIncomingPacket->pRemainingData, RyanMqttNotEnoughMemError,
-				      RyanMqttLog_d);
+			goto __next; // 超时直接退出
+		}
+		else if (RyanMqttSuccessError != result)
+		{
+			RyanMqttLog_e("读取固定报头失败");
+			goto __next;
 		}
 		}
-	}
-	else if (MQTTNoDataAvailable == status)
-	{
-		return RyanMqttRecvPacketTimeOutError;
-	}
-	else
-	{
-		RyanMqttLog_e("获取包长度失败");
-		return RyanMqttFailedError;
-	}
 
 
-	// 3.读取mqtt载荷数据并放到读取缓冲区
-	if (pIncomingPacket->remainingLength > 0)
-	{
-		result = RyanMqttRecvPacket(client, pIncomingPacket->pRemainingData, pIncomingPacket->remainingLength);
-		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-	}
+		readIndex += needReadSize;
 
 
-	return result;
+		// 尝试解析
+		status = MQTT_ProcessIncomingPacketTypeAndLength(pBuffer, &readIndex, pIncomingPacket);
+		if (MQTTNeedMoreBytes == status)
+		{
+			needReadSize = 3; // 最多还需要 3 个字节
+
+			// // 冗余,理论上不可能发生的
+			// if (sizeof(pBuffer) - readIndex > 0)
+			// {
+			// 	needReadSize = sizeof(pBuffer) - readIndex;F
+			// }
+			// else
+			// {
+			// 	result = RyanMqttFailedError;
+			// 	goto __next;
+			// }
+			continue;
+		}
 
 
-__exit:
-	if (NULL != pIncomingPacket->pRemainingData)
-	{
-		platformMemoryFree(pIncomingPacket->pRemainingData);
-		pIncomingPacket->pRemainingData = NULL;
-	}
+		if (MQTTSuccess != status)
+		{
+			RyanMqttLog_e("解析固定报头失败 %d", status);
+			result = RyanMqttFailedError;
+			goto __next;
+		}
+
+		if (pIncomingPacket->remainingLength <= 0)
+		{
+			break; // 不包含可变长度报文
+		}
+
+		// 申请 payload 的空间
+		pIncomingPacket->pRemainingData = platformMemoryMalloc(pIncomingPacket->remainingLength);
+		RyanMqttCheckCode(NULL != pIncomingPacket->pRemainingData, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
+			result = RyanMqttNotEnoughMemError;
+			goto __next;
+		});
+
+		// 如果固定报头解析时已经多读了 payload 的一部分
+		uint8_t alreadyRead = readIndex - pIncomingPacket->headerLength;
+		for (uint8_t i = 0; i < alreadyRead; i++)
+		{
+			pIncomingPacket->pRemainingData[i] = *(pBuffer + pIncomingPacket->headerLength + i);
+		}
+		// // ? memcpy可能性能更高
+		// if (alreadyRead > 0)
+		// {
+		// 	RyanMqttMemcpy(pIncomingPacket->pRemainingData, (char *)pBuffer + pIncomingPacket->headerLength,
+		// 		       alreadyRead);
+		// }
+
+		// 读取剩余 payload
+		if (alreadyRead < pIncomingPacket->remainingLength)
+		{
+			result = RyanMqttRecvPacket(client, pIncomingPacket->pRemainingData + alreadyRead,
+						    pIncomingPacket->remainingLength - alreadyRead);
+			// 返回 result 没错
+			RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
+				platformMemoryFree(pIncomingPacket->pRemainingData);
+				pIncomingPacket->pRemainingData = NULL;
+				goto __next;
+			});
+		}
+
+		break;
+	} while (1);
 
 
+__next:
+	// 先同步用户接口的ack链表
+	RyanMqttSyncUserAckHandle(client);
 	return result;
 	return result;
 }
 }
 
 
@@ -548,15 +607,10 @@ RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
 		result = RyanMqttPublishPacketHandler(client, &pIncomingPacket);
 		result = RyanMqttPublishPacketHandler(client, &pIncomingPacket);
 		break;
 		break;
 
 
-	case MQTT_PACKET_TYPE_CONNACK: // 连接报文确认
-	{
-		// ?客户端已处于连接状态时又收到CONNACK报文,应该视为严重错误,断开连接
-		RyanMqttLog_e("收到 CONNACK 时已连接,正在断开连接");
-		RyanMqttConnectStatus_e connectState = RyanMqttConnectProtocolError;
-		RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
-		result = RyanMqttHaveRescourceError;
-	}
-	break;
+	case MQTT_PACKET_TYPE_PINGRESP: // 心跳响应
+		RyanMqttRefreshKeepaliveTime(client);
+		result = RyanMqttSuccessError;
+		break;
 
 
 	case MQTT_PACKET_TYPE_PUBACK:  // 客户端发送QoS 1消息,服务端发布收到确认
 	case MQTT_PACKET_TYPE_PUBACK:  // 客户端发送QoS 1消息,服务端发布收到确认
 	case MQTT_PACKET_TYPE_PUBCOMP: // 发送QOS2 发布完成
 	case MQTT_PACKET_TYPE_PUBCOMP: // 发送QOS2 发布完成
@@ -570,7 +624,7 @@ RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
 	case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放 pubrel
 	case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放 pubrel
 		result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
 		result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
 
 
-		// !RyanMqttGetPacketInfo 检查报文type错误时不会惊醒返回,所以下面逻辑暂时没用
+		// !RyanMqttGetPacketInfo 检查报文type错误时不会进行返回,所以下面逻辑暂时没用
 		// // PUBREL 控制报文固定报头的第 3,2,1,0
 		// // PUBREL 控制报文固定报头的第 3,2,1,0
 		// // 位必须被设置为0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
 		// // 位必须被设置为0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
 		// if (pIncomingPacket.type & 0x02U)
 		// if (pIncomingPacket.type & 0x02U)
@@ -595,10 +649,16 @@ RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
 		result = RyanMqttUnSubackHandler(client, &pIncomingPacket);
 		result = RyanMqttUnSubackHandler(client, &pIncomingPacket);
 		break;
 		break;
 
 
-	case MQTT_PACKET_TYPE_PINGRESP: // 心跳响应
-		RyanMqttRefreshKeepaliveTime(client);
-		result = RyanMqttSuccessError;
-		break;
+	case MQTT_PACKET_TYPE_CONNACK: // 连接报文确认
+	{
+		// ?没必要这么严格,考虑兼容性多一些吧
+		// // ?客户端已处于连接状态时又收到CONNACK报文,应该视为严重错误,断开连接
+		// RyanMqttLog_e("收到 CONNACK 时已连接,正在断开连接");
+		// RyanMqttConnectStatus_e connectState = RyanMqttConnectProtocolError;
+		// RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
+		// result = RyanMqttHaveRescourceError;
+	}
+	break;
 
 
 	default:
 	default:
 		RyanMqttLog_w("Unhandled packet type: 0x%02X", pIncomingPacket.type & 0xF0U);
 		RyanMqttLog_w("Unhandled packet type: 0x%02X", pIncomingPacket.type & 0xF0U);

+ 43 - 27
mqttclient/RyanMqttUtil.c

@@ -5,6 +5,10 @@
 #include "RyanMqttLog.h"
 #include "RyanMqttLog.h"
 #include "RyanMqttThread.h"
 #include "RyanMqttThread.h"
 
 
+#ifdef RyanMqttLinuxTestEnable
+#include "RyanMqttTest.h"
+#endif
+
 /**
 /**
  * @brief 字符串拷贝,需要手动释放内存
  * @brief 字符串拷贝,需要手动释放内存
  *
  *
@@ -34,33 +38,6 @@ RyanMqttError_e RyanMqttDupString(char **dest, const char *src, uint32_t strLen)
 	return RyanMqttSuccessError;
 	return RyanMqttSuccessError;
 }
 }
 
 
-/**
- * @brief RyanMqtt 针对 coreMqtt 特殊场景专用
- *
- * @param pNetworkContext
- * @param pBuffer
- * @param bytesToRecv
- * @return int32_t
- */
-int32_t coreMqttTransportRecv(NetworkContext_t *pNetworkContext, void *pBuffer, size_t bytesToRecv)
-{
-	RyanMqttAssert(NULL != pNetworkContext);
-	RyanMqttAssert(NULL != pNetworkContext->client);
-	RyanMqttAssert(NULL != pBuffer);
-	RyanMqttAssert(bytesToRecv > 0);
-
-	RyanMqttError_e result = RyanMqttRecvPacket(pNetworkContext->client, pBuffer, bytesToRecv);
-
-	switch (result)
-	{
-	case RyanMqttRecvPacketTimeOutError: return 0;
-	case RyanMqttSuccessError: return (int32_t)bytesToRecv;
-
-	case RyanSocketFailedError:
-	default: return -1;
-	}
-}
-
 /**
 /**
  * @brief mqtt读取报文,此函数仅Mqtt线程进行调用
  * @brief mqtt读取报文,此函数仅Mqtt线程进行调用
  *
  *
@@ -79,6 +56,12 @@ RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, uint8_t *recvBuf, u
 	RyanMqttAssert(NULL != recvBuf);
 	RyanMqttAssert(NULL != recvBuf);
 	RyanMqttAssert(0 != recvLen);
 	RyanMqttAssert(0 != recvLen);
 
 
+	// 如果需要处理ack,就缩短读取超时时间,避免阻塞太久
+	if (RyanMqttTrue == client->pendingAckFlag)
+	{
+		timeOut = 100;
+	}
+
 	RyanMqttTimerCutdown(&timer, timeOut);
 	RyanMqttTimerCutdown(&timer, timeOut);
 
 
 	while ((offset < recvLen) && (timeOut > 0))
 	while ((offset < recvLen) && (timeOut > 0))
@@ -113,6 +96,22 @@ RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, uint8_t *recvBuf, u
 		return RyanMqttRecvPacketTimeOutError;
 		return RyanMqttRecvPacketTimeOutError;
 	}
 	}
 
 
+#ifdef RyanMqttLinuxTestEnable
+	RyanMqttTestEnableCritical();
+	if (RyanMqttTrue == isEnableRandomNetworkFault)
+	{
+		randomCount++;
+		if (randomCount >= RyanRand(10, 100))
+		{
+			randomCount = 0;
+			RyanMqttTestExitCritical();
+			// printf("模拟接收超时\r\n");
+			return RyanMqttRecvPacketTimeOutError;
+		}
+	}
+	RyanMqttTestExitCritical();
+#endif
+
 	return RyanMqttSuccessError;
 	return RyanMqttSuccessError;
 }
 }
 
 
@@ -134,6 +133,23 @@ RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, uint8_t *sendBuf, u
 	RyanMqttAssert(NULL != sendBuf);
 	RyanMqttAssert(NULL != sendBuf);
 	RyanMqttAssert(0 != sendLen);
 	RyanMqttAssert(0 != sendLen);
 
 
+#ifdef RyanMqttLinuxTestEnable
+	RyanMqttTestEnableCritical();
+	if (RyanMqttTrue == isEnableRandomNetworkFault)
+	{
+		sendRandomCount++;
+		if (sendRandomCount >= RyanRand(1, 10))
+		{
+			sendRandomCount = 0;
+			RyanMqttTestExitCritical();
+
+			// printf("模拟发送超时\r\n");
+			return RyanMqttSendPacketTimeOutError;
+		}
+	}
+	RyanMqttTestExitCritical();
+#endif
+
 	RyanMqttTimerCutdown(&timer, timeOut);
 	RyanMqttTimerCutdown(&timer, timeOut);
 
 
 	platformMutexLock(client->config.userData, &client->sendLock); // 获取互斥锁
 	platformMutexLock(client->config.userData, &client->sendLock); // 获取互斥锁

+ 1 - 1
mqttclient/RyanMqttUtileAck.c

@@ -30,7 +30,7 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packe
 	// 为非预分配的数据包分配额外空间
 	// 为非预分配的数据包分配额外空间
 	if (RyanMqttTrue != packetAllocatedExternally)
 	if (RyanMqttTrue != packetAllocatedExternally)
 	{
 	{
-		mallocSize += packetLen + 1;
+		mallocSize += packetLen;
 	}
 	}
 
 
 	// 为非预分配包申请额外空间
 	// 为非预分配包申请额外空间

+ 4 - 4
mqttclient/RyanMqttUtileMsg.c

@@ -220,7 +220,7 @@ RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, const char *t
 	RyanMqttAssert(NULL != client);
 	RyanMqttAssert(NULL != client);
 	RyanMqttAssert(NULL != topic);
 	RyanMqttAssert(NULL != topic);
 	RyanMqttAssert(NULL != pMsgHandler);
 	RyanMqttAssert(NULL != pMsgHandler);
-	RyanMqttAssert(RyanMqttQos0 == qos || RyanMqttQos1 == qos || RyanMqttQos2 == qos);
+	RyanMqttAssert(RyanMqttQos0 == qos || RyanMqttQos1 == qos || RyanMqttQos2 == qos || RyanMqttSubFail == qos);
 
 
 	uint32_t mallocSize = sizeof(RyanMqttMsgHandler_t) + topicLen + 1;
 	uint32_t mallocSize = sizeof(RyanMqttMsgHandler_t) + topicLen + 1;
 	RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)platformMemoryMalloc(mallocSize);
 	RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)platformMemoryMalloc(mallocSize);
@@ -233,7 +233,7 @@ RyanMqttError_e RyanMqttMsgHandlerCreate(RyanMqttClient_t *client, const char *t
 	msgHandler->userData = userData;
 	msgHandler->userData = userData;
 	msgHandler->topic = (char *)msgHandler + sizeof(RyanMqttMsgHandler_t);
 	msgHandler->topic = (char *)msgHandler + sizeof(RyanMqttMsgHandler_t);
 	RyanMqttMemcpy(msgHandler->topic, topic, topicLen);
 	RyanMqttMemcpy(msgHandler->topic, topic, topicLen);
-	msgHandler->topic[topicLen] = '\0';
+	msgHandler->topic[topicLen] = '\0'; // 兼容旧版本
 
 
 	*pMsgHandler = msgHandler;
 	*pMsgHandler = msgHandler;
 	return RyanMqttSuccessError;
 	return RyanMqttSuccessError;
@@ -309,8 +309,8 @@ __exit:
  * @param msgMatchCriteria
  * @param msgMatchCriteria
  * @param skipSamePacketId
  * @param skipSamePacketId
  */
  */
-void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
-					      RyanMqttBool_e skipSamePacketId)
+void RyanMqttMsgHandlerFindAndDestroyByPacketId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
+						RyanMqttBool_e skipSamePacketId)
 {
 {
 	RyanMqttList_t *curr, *next;
 	RyanMqttList_t *curr, *next;
 	RyanMqttMsgHandler_t *msgHandler;
 	RyanMqttMsgHandler_t *msgHandler;

+ 27 - 25
mqttclient/include/RyanMqttClient.h

@@ -40,25 +40,25 @@ typedef struct
 
 
 typedef struct
 typedef struct
 {
 {
-	RyanMqttBool_e packetAllocatedExternally; // packet 是外部分配的
-	uint8_t packetType;                       // 期望接收到的ack报文类型
-	uint16_t repeatCount;                     // 当前ack超时重发次数
-	uint16_t packetId;                        // 报文标识符 系统生成,用户勿动
-	uint32_t packetLen;                       // 报文长度
-	RyanMqttList_t list;                      // 链表节点,用户勿动
-	RyanMqttTimer_t timer;                    // ack超时定时器,用户勿动
-	RyanMqttMsgHandler_t *msgHandler;         // msg信息
+	RyanMqttBool_e packetAllocatedExternally: 1; // packet 是外部分配的
+	uint8_t packetType;                          // 期望接收到的ack报文类型
+	uint16_t repeatCount;                        // 当前ack超时重发次数
+	uint16_t packetId;                           // 报文标识符 系统生成,用户勿动
+	uint32_t packetLen;                          // 报文长度
+	RyanMqttList_t list;                         // 链表节点,用户勿动
+	RyanMqttTimer_t timer;                       // ack超时定时器,用户勿动
+	RyanMqttMsgHandler_t *msgHandler;            // msg信息
 	uint8_t *packet; // 没有收到期望ack,重新发送的原始报文,不要求必须是最后一位,但最好保持这样
 	uint8_t *packet; // 没有收到期望ack,重新发送的原始报文,不要求必须是最后一位,但最好保持这样
 } RyanMqttAckHandler_t;
 } RyanMqttAckHandler_t;
 
 
 typedef struct
 typedef struct
 {
 {
-	RyanMqttBool_e lwtFlag; // 遗嘱标志位
-	RyanMqttBool_e retain;  // 遗嘱保留标志位
-	RyanMqttQos_e qos;      // 遗嘱qos等级
-	uint32_t payloadLen;    // 消息长度
-	char *topic;            // 遗嘱主题
-	char *payload;          // 遗嘱消息
+	RyanMqttBool_e lwtFlag: 1; // 遗嘱标志位
+	RyanMqttBool_e retain: 1;  // 遗嘱保留标志位
+	RyanMqttQos_e qos;         // 遗嘱qos等级
+	uint32_t payloadLen;       // 消息长度
+	char *topic;               // 遗嘱主题
+	char *payload;             // 遗嘱消息
 } lwtOptions_t;
 } lwtOptions_t;
 
 
 typedef struct
 typedef struct
@@ -87,8 +87,8 @@ typedef struct
 	uint16_t taskPrio;  // mqtt线程优先级
 	uint16_t taskPrio;  // mqtt线程优先级
 	uint16_t taskStack; // 线程栈大小
 	uint16_t taskStack; // 线程栈大小
 
 
-	RyanMqttBool_e autoReconnectFlag; // 自动重连标志位
-	RyanMqttBool_e cleanSessionFlag;  // 清除会话标志位
+	RyanMqttBool_e autoReconnectFlag: 1; // 自动重连标志位
+	RyanMqttBool_e cleanSessionFlag: 1;  // 清除会话标志位
 
 
 	// ack重发超过这个数值后触发事件回调,根据实际硬件选择。典型值为 5
 	// ack重发超过这个数值后触发事件回调,根据实际硬件选择。典型值为 5
 	uint16_t ackHandlerRepeatCountWarning;
 	uint16_t ackHandlerRepeatCountWarning;
@@ -109,11 +109,12 @@ typedef struct
 
 
 typedef struct
 typedef struct
 {
 {
-	RyanMqttBool_e destroyFlag;  // 销毁标志位
-	uint16_t ackHandlerCount;    // 等待ack的记录个数
-	uint16_t packetId;           // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
-	uint32_t eventFlag;          // 事件标志位
-	RyanMqttState_e clientState; // mqtt客户端的状态
+	RyanMqttBool_e destroyFlag: 1;    // 销毁标志位
+	RyanMqttBool_e pendingAckFlag: 1; // 需要处理ack, 缩短recv超时时间,避免阻塞太久
+	uint16_t ackHandlerCount;         // 等待ack的记录个数
+	uint16_t packetId;                // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
+	uint32_t eventFlag;               // 事件标志位
+	RyanMqttState_e clientState;      // mqtt客户端的状态
 
 
 	// 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
 	// 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
 	RyanMqttList_t msgHandlerList;
 	RyanMqttList_t msgHandlerList;
@@ -141,12 +142,13 @@ extern RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client);
 extern RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag);
 extern RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag);
 extern RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client);
 extern RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client);
 
 
-extern RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen,
-						  char *payload, uint32_t payloadLen, RyanMqttQos_e qos,
-						  RyanMqttBool_e retain, void *userData);
-// !推荐使用 RyanMqttPublishAndUserData , RyanMqttPublish不能正确处理topic结尾为0的情况
+extern RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen,
+						   char *payload, uint32_t payloadLen, RyanMqttQos_e qos,
+						   RyanMqttBool_e retain, void *userData);
+// !推荐使用 RyanMqttPublishWithUserData , RyanMqttPublish不能正确处理topic结尾为0的情况
 extern RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
 extern RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
 				       RyanMqttQos_e qos, RyanMqttBool_e retain);
 				       RyanMqttQos_e qos, RyanMqttBool_e retain);
+#define RyanMqttPublishAndUserData RyanMqttPublishWithUserData // 兼容旧版本
 
 
 // !推荐使用 RyanMqttSubscribeMany , RyanMqttSubscribe不能正确处理topic结尾为0的情况
 // !推荐使用 RyanMqttSubscribeMany , RyanMqttSubscribe不能正确处理topic结尾为0的情况
 extern RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos);
 extern RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos);

+ 1 - 6
mqttclient/include/RyanMqttUtil.h

@@ -17,11 +17,6 @@ extern "C" {
 // 定义枚举类型
 // 定义枚举类型
 
 
 // 定义结构体类型
 // 定义结构体类型
-struct NetworkContext
-{
-	RyanMqttClient_t *client;
-};
-int32_t coreMqttTransportRecv(NetworkContext_t *pNetworkContext, void *pBuffer, size_t bytesToRecv);
 
 
 /* extern variables-----------------------------------------------------------*/
 /* extern variables-----------------------------------------------------------*/
 
 
@@ -42,7 +37,7 @@ extern void RyanMqttMsgHandlerDestroy(RyanMqttClient_t *client, RyanMqttMsgHandl
 extern RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
 extern RyanMqttError_e RyanMqttMsgHandlerFind(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
 					      RyanMqttBool_e isTopicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler,
 					      RyanMqttBool_e isTopicMatchedFlag, RyanMqttMsgHandler_t **pMsgHandler,
 					      RyanMqttBool_e removeOnMatch);
 					      RyanMqttBool_e removeOnMatch);
-extern void RyanMqttMsgHandlerFindAndDestroyByPackId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
+extern void RyanMqttMsgHandlerFindAndDestroyByPacketId(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgMatchCriteria,
 						     RyanMqttBool_e skipSamePacketId);
 						     RyanMqttBool_e skipSamePacketId);
 extern RyanMqttError_e RyanMqttMsgHandlerAddToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 extern RyanMqttError_e RyanMqttMsgHandlerAddToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 extern RyanMqttError_e RyanMqttMsgHandlerRemoveToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 extern RyanMqttError_e RyanMqttMsgHandlerRemoveToMsgList(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);

+ 4 - 2
xmake.lua

@@ -2,7 +2,7 @@ add_rules("plugin.compile_commands.autoupdate", {outputdir = ".vscode"})
 target("RyanMqtt",function()
 target("RyanMqtt",function()
     set_kind("binary")
     set_kind("binary")
 
 
-    add_syslinks("pthread")
+    add_syslinks("pthread","rt")
     set_toolchains("gcc")  -- 确保使用 GCC
     set_toolchains("gcc")  -- 确保使用 GCC
     -- set_toolchains("clang-20")  
     -- set_toolchains("clang-20")  
     set_languages("gnu99") -- 启用 GNU 扩展
     set_languages("gnu99") -- 启用 GNU 扩展
@@ -88,11 +88,13 @@ target("RyanMqtt",function()
     add_includedirs('./platform/linux', {public = true})
     add_includedirs('./platform/linux', {public = true})
     add_includedirs('./platform/linux/valloc', {public = true})
     add_includedirs('./platform/linux/valloc', {public = true})
 
 
-    add_files('./test/*.c', {public = true})
     add_files('./common/*.c', {public = true})
     add_files('./common/*.c', {public = true})
     add_files('./coreMqtt/*.c', {public = true})
     add_files('./coreMqtt/*.c', {public = true})
     add_files('./mqttclient/*.c', {public = true})
     add_files('./mqttclient/*.c', {public = true})
     add_files('./platform/linux/*.c', {public = true})
     add_files('./platform/linux/*.c', {public = true})
     add_files('./platform/linux/valloc/*.c', {public = true})
     add_files('./platform/linux/valloc/*.c', {public = true})
 
 
+    add_includedirs('./test', {public = true})
+    add_files('./test/*.c', {public = true})
+
 end)
 end)