Bladeren bron

feat: 优化多线程稳定性,启用更多编译时检查,启用fsanitize运行时检测

RyanCW 4 maanden geleden
bovenliggende
commit
d7d399cf8b

+ 3 - 1
.vscode/settings.json

@@ -44,6 +44,8 @@
         "ryanmqttpublic.h": "c",
         "ryanmqttlist.h": "c",
         "platformnetwork.h": "c",
-        "inttypes.h": "c"
+        "inttypes.h": "c",
+        "memory": "c",
+        "stdbool.h": "c"
     },
 }

+ 1 - 1
common/RyanMqttLog.c

@@ -2,7 +2,7 @@
 #include "RyanMqttPlatform.h"
 #include <inttypes.h>
 
-void RyanMqttLogOutPut(char *lvl, uint8_t color, char *fileStr, uint32_t lineNum, char *const fmt, ...)
+void RyanMqttLogOutPut(const char *lvl, uint8_t color, const char *fileStr, uint32_t lineNum, char *const fmt, ...)
 {
 	// RyanLogPrintf("\033[字背景颜色;字体颜色m  用户字符串 \033[0m" );
 	char dbgBuffer[256];

+ 2 - 1
common/RyanMqttLog.h

@@ -17,7 +17,8 @@
 #endif
 
 extern void RyanMqttLogOutPutRaw(char *const fmt, ...);
-extern void RyanMqttLogOutPut(char *lvl, uint8_t color, char *fileStr, uint32_t lineNum, char *const fmt, ...);
+extern void RyanMqttLogOutPut(const char *lvl, uint8_t color, const char *fileStr, uint32_t lineNum, char *const fmt,
+			      ...);
 
 /**
  * @brief log等级检索

+ 9 - 9
mqttclient/RyanMqttClient.c

@@ -1,4 +1,5 @@
 #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
+// #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
 // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
 
 #include "RyanMqttClient.h"
@@ -589,27 +590,27 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 	{
 		RyanMqttMsgHandler_t *msgHandler;
 		RyanMqttAckHandler_t *userAckHandler;
+		uint8_t packetType = (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC;
 		// qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
 		result = RyanMqttMsgHandlerCreate(client, publishInfo.pTopicName, publishInfo.topicNameLength,
 						  RyanMqttMsgInvalidPacketId, qos, userData, &msgHandler);
 		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
 				  { platformMemoryFree(fixedBuffer.pBuffer); });
 
-		result = RyanMqttAckHandlerCreate(
-			client, (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC, packetId,
-			fixedBuffer.size, fixedBuffer.pBuffer, msgHandler, &userAckHandler, RyanMqttTrue);
+		result = RyanMqttAckHandlerCreate(client, packetType, packetId, fixedBuffer.size, fixedBuffer.pBuffer,
+						  msgHandler, &userAckHandler, RyanMqttTrue);
 		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
 			platformMemoryFree(fixedBuffer.pBuffer);
 			RyanMqttMsgHandlerDestroy(client, msgHandler);
 		});
 
-		// 一定要先加再send,要不可能线程调度mqtt返回消息会比添加ack更快执行
+		// 一定要先加再send,send一定在mqtt broker回复前执行完,要不可能线程调度mqtt返回消息会比添加ack更快执行
 		RyanMqttAckListAddToUserAckList(client, userAckHandler);
 
 		result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
 		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
-			RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
-			RyanMqttAckHandlerDestroy(client, userAckHandler);
+			// userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
+			RyanMqttClearAckSession(client, packetType, packetId);
 		});
 	}
 
@@ -952,7 +953,7 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
 	RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, RyanMqttLog_d);
-	RyanMqttCheck(clientConfig->recvTimeout <= clientConfig->keepaliveTimeoutS * 1000 / 2,
+	RyanMqttCheck(clientConfig->recvTimeout <= (uint32_t)clientConfig->keepaliveTimeoutS * 1000 / 2,
 		      RyanMqttParamInvalidError, RyanMqttLog_d);
 	RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
 
@@ -1080,9 +1081,8 @@ RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, uint8_t pack
 
 	// 删除pubrel记录
 	platformMutexLock(client->config.userData, &client->ackHandleLock);
-	result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
+	result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
-	RyanMqttAckListRemoveToAckList(client, ackHandler);
 
 __exit:
 	platformMutexUnLock(client->config.userData, &client->ackHandleLock);

+ 1 - 1
mqttclient/RyanMqttThread.c

@@ -318,7 +318,7 @@ static RyanMqttError_e RyanMqttConnectBroker(RyanMqttClient_t *client, RyanMqttC
 
 	// 等待报文
 	// mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
-	MQTTPacketInfo_t pIncomingPacket;
+	MQTTPacketInfo_t pIncomingPacket = {0};
 	result = RyanMqttGetPacketInfo(client, &pIncomingPacket);
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttSerializePacketError, RyanMqttLog_d, {
 		platformNetworkClose(client->config.userData, &client->network);

+ 15 - 10
mqttclient/RyanMqttThreadProcessPacket.c

@@ -24,14 +24,13 @@ static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *c
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
 	// 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
-	result = RyanMqttAckListNodeFind(client, pIncomingPacket->type & 0xF0U, packetId, &ackHandler);
+	result = RyanMqttAckListNodeFind(client, pIncomingPacket->type & 0xF0U, packetId, &ackHandler, RyanMqttTrue);
 	RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
 		RyanMqttLog_i("packetType: %02x, packetId: %d", pIncomingPacket->type & 0xF0U, packetId);
 	});
 
 	RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
 
-	RyanMqttAckListRemoveToAckList(client, ackHandler);
 	RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
 	return result;
 }
@@ -54,10 +53,9 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client, MQT
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
 	// 删除pubrel记录
-	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, packetId, &ackHandler);
+	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, packetId, &ackHandler, RyanMqttTrue);
 	if (RyanMqttSuccessError == result)
 	{
-		RyanMqttAckListRemoveToAckList(client, ackHandler);
 		RyanMqttAckHandlerDestroy(client, ackHandler);
 	}
 
@@ -104,7 +102,7 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
 	// 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
-	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec);
+	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec, RyanMqttFalse);
 	if (RyanMqttSuccessError == result)
 	{
 		RyanMqttMsgHandler_t *msgHandler;
@@ -117,6 +115,7 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 						  ackHandlerPubrec->msgHandler->userData, &msgHandler);
 		RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
+		// 期望收到pubcomp否则会重复发送pubrel
 		result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId,
 						  MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
 						  &ackHandler, RyanMqttFalse);
@@ -124,14 +123,19 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 				  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
 		RyanMqttAckListAddToAckList(client, ackHandler);
 
-		// 清除pubrec记录
-		RyanMqttAckListRemoveToAckList(client, ackHandlerPubrec);
-		RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
+		// 清除pubrec记录,必须使用find函数进行重新查找才能确保线程安全
+		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec,
+						 RyanMqttTrue);
+		if (RyanMqttSuccessError == result)
+		{
+			RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
+		}
 	}
 	else
 	{
 		// 没有pubrec ,并且没有pubcomp,说明这个报文是非法报文,不进行mqtt服务器回复
-		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubrec);
+		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubrec,
+						 RyanMqttFalse);
 		RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttInvalidPacketError, RyanMqttLog_d);
 	}
 
@@ -214,7 +218,8 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 
 		// 收到 publish 就期望收到 PUBREL ,如果 PUBREL 报文已经存在说明不是首次收到 publish,不进行qos2 PUBREC
 		// 消息处理
-		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler);
+		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler,
+						 RyanMqttFalse);
 		if (RyanMqttSuccessError != result)
 		{
 			// 第一次收到 PUBREL 报文

+ 23 - 15
mqttclient/RyanMqttUtileAck.c

@@ -15,12 +15,12 @@
  * @param packet
  * @param msgHandler
  * @param pAckHandler
- * @param isPreallocatedPacket packet是否预分配
+ * @param packetAllocatedExternally packet 是外部分配的
  * @return RyanMqttError_e
  */
 RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
 					 uint16_t packetLen, uint8_t *packet, RyanMqttMsgHandler_t *msgHandler,
-					 RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e isPreallocatedPacket)
+					 RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e packetAllocatedExternally)
 {
 	RyanMqttAssert(NULL != client);
 	RyanMqttAssert(NULL != pAckHandler);
@@ -28,7 +28,7 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packe
 	uint32_t mallocSize = sizeof(RyanMqttAckHandler_t);
 
 	// 为非预分配的数据包分配额外空间
-	if (RyanMqttTrue != isPreallocatedPacket)
+	if (RyanMqttTrue != packetAllocatedExternally)
 	{
 		mallocSize += packetLen + 1;
 	}
@@ -37,17 +37,17 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packe
 	RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)platformMemoryMalloc(mallocSize);
 	RyanMqttCheck(NULL != ackHandler, RyanMqttNotEnoughMemError, RyanMqttLog_d);
 
+	ackHandler->packetAllocatedExternally = packetAllocatedExternally;
 	ackHandler->packetType = packetType;
 	ackHandler->repeatCount = 0;
 	ackHandler->packetId = packetId;
-	ackHandler->isPreallocatedPacket = isPreallocatedPacket;
 	ackHandler->packetLen = packetLen;
 	RyanMqttListInit(&ackHandler->list);
 	// 超时内没有响应将被销毁或重新发送
 	RyanMqttTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
 	ackHandler->msgHandler = msgHandler;
 
-	if (RyanMqttTrue != isPreallocatedPacket)
+	if (RyanMqttTrue != packetAllocatedExternally)
 	{
 		if (NULL != packet && packetLen > 0)
 		{
@@ -80,7 +80,7 @@ void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *a
 	RyanMqttMsgHandlerDestroy(client, ackHandler->msgHandler); // 释放msgHandler
 
 	// 释放用户预提供的缓冲区
-	if (RyanMqttTrue == ackHandler->isPreallocatedPacket)
+	if (RyanMqttTrue == ackHandler->packetAllocatedExternally)
 	{
 		// 不加null判断,因为如果是空,一定是用户程序内存访问越界了
 		platformMemoryFree(ackHandler->packet);
@@ -96,10 +96,11 @@ void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *a
  * @param packetType
  * @param packetId
  * @param pAckHandler
+ * @param removeOnMatch
  * @return RyanMqttError_e
  */
 RyanMqttError_e RyanMqttAckListNodeFind(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
-					RyanMqttAckHandler_t **pAckHandler)
+					RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e removeOnMatch)
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttList_t *curr, *next;
@@ -118,6 +119,10 @@ RyanMqttError_e RyanMqttAckListNodeFind(RyanMqttClient_t *client, uint8_t packet
 		{
 			*pAckHandler = ackHandler;
 			result = RyanMqttSuccessError;
+			if (RyanMqttTrue == removeOnMatch)
+			{
+				RyanMqttAckListRemoveToAckList(client, ackHandler);
+			}
 			goto __exit;
 		}
 	}
@@ -186,10 +191,11 @@ RyanMqttError_e RyanMqttAckListRemoveToAckList(RyanMqttClient_t *client, RyanMqt
  * @param packetType
  * @param packetId
  * @param pAckHandler
+ * @param removeOnMatch
  * @return RyanMqttError_e
  */
 RyanMqttError_e RyanMqttAckListNodeFindByUserAckList(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
-						     RyanMqttAckHandler_t **pAckHandler)
+						     RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e removeOnMatch)
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	RyanMqttList_t *curr, *next;
@@ -208,6 +214,10 @@ RyanMqttError_e RyanMqttAckListNodeFindByUserAckList(RyanMqttClient_t *client, u
 		{
 			*pAckHandler = ackHandler;
 			result = RyanMqttSuccessError;
+			if (RyanMqttTrue == removeOnMatch)
+			{
+				RyanMqttAckListRemoveToAckList(client, ackHandler);
+			}
 			goto __exit;
 		}
 	}
@@ -248,25 +258,23 @@ void RyanMqttClearAckSession(RyanMqttClient_t *client, uint8_t packetType, uint1
 	RyanMqttAckHandler_t *ackHandler;
 
 	// 清除所有ack链表
-	while (1)
+	do
 	{
-		result = RyanMqttAckListNodeFindByUserAckList(client, packetType, packetId, &ackHandler);
+		result = RyanMqttAckListNodeFindByUserAckList(client, packetType, packetId, &ackHandler, RyanMqttTrue);
 		if (RyanMqttSuccessError == result)
 		{
-			RyanMqttAckListRemoveToUserAckList(client, ackHandler);
 			RyanMqttAckHandlerDestroy(client, ackHandler);
 			continue;
 		}
 
-		// 还有可能已经被添加到ack链表了
-		result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
+		// 不可能同时在userAck和mqttAck,还有可能已经被添加到ack链表了
+		result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
 		if (RyanMqttSuccessError == result)
 		{
-			RyanMqttAckListRemoveToAckList(client, ackHandler);
 			RyanMqttAckHandlerDestroy(client, ackHandler);
 			continue;
 		}
 
 		break;
-	}
+	} while (1);
 }

+ 9 - 9
mqttclient/include/RyanMqttClient.h

@@ -40,15 +40,15 @@ typedef struct
 
 typedef struct
 {
-	uint8_t packetType;                  // 期望接收到的ack报文类型
-	uint16_t repeatCount;                // 当前ack超时重发次数
-	uint16_t packetId;                   // 报文标识符 系统生成,用户勿动
-	RyanMqttBool_e isPreallocatedPacket; // 是否是预分配的内存
-	uint32_t packetLen;                  // 报文长度
-	RyanMqttList_t list;                 // 链表节点,用户勿动
-	RyanMqttTimer_t timer;               // ack超时定时器,用户勿动
-	RyanMqttMsgHandler_t *msgHandler;    // msg信息
-	uint8_t *packet;                     // 没有收到期望ack,重新发送的原始报文,不要求必须是最后一位,但最好保持这样
+	RyanMqttBool_e packetAllocatedExternally; // packet 是外部分配的
+	uint8_t packetType;                       // 期望接收到的ack报文类型
+	uint16_t repeatCount;                     // 当前ack超时重发次数
+	uint16_t packetId;                        // 报文标识符 系统生成,用户勿动
+	uint32_t packetLen;                       // 报文长度
+	RyanMqttList_t list;                      // 链表节点,用户勿动
+	RyanMqttTimer_t timer;                    // ack超时定时器,用户勿动
+	RyanMqttMsgHandler_t *msgHandler;         // msg信息
+	uint8_t *packet; // 没有收到期望ack,重新发送的原始报文,不要求必须是最后一位,但最好保持这样
 } RyanMqttAckHandler_t;
 
 typedef struct

+ 4 - 3
mqttclient/include/RyanMqttUtil.h

@@ -50,14 +50,15 @@ extern RyanMqttError_e RyanMqttMsgHandlerRemoveToMsgList(RyanMqttClient_t *clien
 extern RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
 						uint16_t packetLen, uint8_t *packet, RyanMqttMsgHandler_t *msgHandler,
 						RyanMqttAckHandler_t **pAckHandler,
-						RyanMqttBool_e isPreallocatedPacket);
+						RyanMqttBool_e packetAllocatedExternally);
 extern void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern RyanMqttError_e RyanMqttAckListNodeFind(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId,
-					       RyanMqttAckHandler_t **pAckHandler);
+					       RyanMqttAckHandler_t **pAckHandler, RyanMqttBool_e removeOnMatch);
 extern RyanMqttError_e RyanMqttAckListAddToAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern RyanMqttError_e RyanMqttAckListRemoveToAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern RyanMqttError_e RyanMqttAckListNodeFindByUserAckList(RyanMqttClient_t *client, uint8_t packetType,
-							    uint16_t packetId, RyanMqttAckHandler_t **pAckHandler);
+							    uint16_t packetId, RyanMqttAckHandler_t **pAckHandler,
+							    RyanMqttBool_e removeOnMatch);
 extern RyanMqttError_e RyanMqttAckListAddToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern RyanMqttError_e RyanMqttAckListRemoveToUserAckList(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
 extern void RyanMqttClearAckSession(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId);

+ 21 - 12
platform/linux/platformNetwork.c

@@ -50,8 +50,9 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
 	};
 
 	// 传递的是ip地址,不用进行dns解析,某些情况下调用dns解析反而会错误
-	if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr) == 1)
+	if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr))
 	{
+		buf = NULL;
 	}
 	// 解析域名信息
 	else
@@ -71,7 +72,8 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
 
 		if (0 != gethostbyname_r(host, &hostinfo, buf, dnsBufferSize, &phost, &h_errnop))
 		{
-			RyanMqttLog_w("平台可能不支持 gethostbyname_r 函数, 再次尝试使用 gethostbyname 获取域名信息");
+			RyanMqttLog_w(
+				(char *)"平台可能不支持 gethostbyname_r 函数, 再次尝试使用 gethostbyname 获取域名信息");
 
 			// 非线程安全版本,请根据实际情况选择使用
 			// NOLINTNEXTLINE(concurrency-mt-unsafe)
@@ -103,6 +105,8 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
 		goto __exit;
 	}
 
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wanalyzer-fd-leak"
 	// 绑定套接字到主机地址和端口号
 	if (0 != connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)))
 	{
@@ -110,6 +114,7 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
 		result = RyanMqttSocketConnectFailError;
 		goto __exit;
 	}
+#pragma GCC diagnostic pop
 
 __exit:
 	if (NULL != buf)
@@ -163,11 +168,13 @@ int32_t platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetw
 	{
 		int32_t rt_errno = errno; // 似乎RT 5.0.0以上版本需要使用 rt_get_errno
 		// 下列表示没问题,但需要退出接收
-		if (EAGAIN == rt_errno ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+		if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+#if EAGAIN != EWOULDBLOCK
 		    EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
-		    EINTR == rt_errno ||       // 操作被信号中断
-		    ETIME == rt_errno ||       // 计时器过期(部分平台)
-		    ETIMEDOUT == rt_errno)     // 超时(通用)
+#endif
+		    EINTR == rt_errno ||   // 操作被信号中断
+		    ETIME == rt_errno ||   // 计时器过期(部分平台)
+		    ETIMEDOUT == rt_errno) // 超时(通用)
 		{
 			return 0;
 		}
@@ -217,13 +224,15 @@ int32_t platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetw
 
 	if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
 	{
-		int32_t rt_errno = errno;      // 似乎5.0.0以上版本需要使用 rt_get_errno
-					       // 下列表示没问题,但需要退出发送
-		if (EAGAIN == rt_errno ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+		int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
+					  // 下列表示没问题,但需要退出发送
+		if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+#if EAGAIN != EWOULDBLOCK
 		    EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
-		    EINTR == rt_errno ||       // 操作被信号中断
-		    ETIME == rt_errno ||       // 计时器过期(部分平台)
-		    ETIMEDOUT == rt_errno)     // 超时(通用)
+#endif
+		    EINTR == rt_errno ||   // 操作被信号中断
+		    ETIME == rt_errno ||   // 计时器过期(部分平台)
+		    ETIMEDOUT == rt_errno) // 超时(通用)
 		{
 			return 0;
 		}

+ 5 - 1
platform/linux/platformSystem.c

@@ -75,7 +75,11 @@ RyanMqttError_e platformThreadInit(void *userData, platformThread_t *platformThr
 	pthread_attr_setstacksize(&attr, stackSize);
 	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 设置为分离状态
 
-	int ret = pthread_create(&platformThread->thread, &attr, (void *)entry, param);
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wcast-function-type"
+	int ret = pthread_create(&platformThread->thread, &attr, (void *(*)(void *))entry, param);
+#pragma GCC diagnostic pop
+
 	if (0 != ret)
 	{
 		return RyanMqttNoRescourceError;

+ 7 - 0
platform/linux/platformSystem.h

@@ -13,7 +13,14 @@ extern "C" {
 #include <unistd.h>
 #include <stdlib.h>
 #include <time.h>
+
+#ifdef RyanMqttLinuxTestEnable
 #include "valloc.h"
+#define malloc  v_malloc
+#define calloc  v_calloc
+#define free    v_free
+#define realloc v_realloc
+#endif
 
 #define platformAssert(EX) assert(EX)
 #define RyanMqttMemset     memset

+ 52 - 36
platform/linux/valloc/valloc.c

@@ -3,6 +3,9 @@
 #include <string.h>
 #include <pthread.h>
 #include <unistd.h>
+#include "valloc.h"
+
+#define HEADER_SIZE sizeof(int)
 
 static pthread_mutex_t mutex;
 static int count = 0;
@@ -10,90 +13,104 @@ static int use = 0;
 
 void *v_malloc(size_t size)
 {
-	void *p;
-	p = malloc(size ? size + sizeof(int) : 0);
+	if (size == 0)
+	{
+		return NULL;
+	}
+
+	void *p = malloc(size + HEADER_SIZE);
 	if (!p)
 	{
 		return NULL;
 	}
 
-	pthread_mutex_lock(&mutex); // 互斥锁上锁
+	*(int *)p = (int)size;
+
+	pthread_mutex_lock(&mutex);
 	count++;
-	*(int *)p = size;
-	use += size;
-	pthread_mutex_unlock(&mutex); // 互斥锁解锁
-	return (void *)((char *)p + sizeof(int));
+	use += (int)size;
+	pthread_mutex_unlock(&mutex);
+
+	return (char *)p + HEADER_SIZE;
 }
 
 void *v_calloc(size_t num, size_t size)
 {
-	void *p;
-	p = v_malloc(num * size);
-	if (!p)
+	size_t total = num * size;
+	void *p = v_malloc(total);
+	if (p)
 	{
-		return NULL;
+		memset(p, 0, total);
 	}
-	memset(p, 0, num * size);
 	return p;
 }
 
 void v_free(void *block)
 {
-	void *p;
 	if (!block)
 	{
 		return;
 	}
-	p = (void *)((char *)block - sizeof(int));
 
-	pthread_mutex_lock(&mutex); // 互斥锁上锁
-	use -= *(int *)p;
+	void *p = (char *)block - HEADER_SIZE;
+	int size = *(int *)p;
+
+	pthread_mutex_lock(&mutex);
 	count--;
-	pthread_mutex_unlock(&mutex); // 互斥锁解锁
+	use -= size;
+	pthread_mutex_unlock(&mutex);
 
 	free(p);
 }
 
 void *v_realloc(void *block, size_t size)
 {
-	void *p;
-	int s = 0;
+	if (size == 0)
+	{
+		v_free(block);
+		return NULL;
+	}
+
+	int old_size = 0;
+	void *raw = NULL;
+
 	if (block)
 	{
-		block = (void *)((char *)block - sizeof(int));
-		s = *(int *)block;
+		raw = (char *)block - HEADER_SIZE;
+		old_size = *(int *)raw;
 	}
-	p = realloc(block, size ? size + sizeof(int) : 0);
+
+	void *p = realloc(raw, size + HEADER_SIZE);
 	if (!p)
 	{
 		return NULL;
 	}
 
-	pthread_mutex_lock(&mutex); // 互斥锁上锁
+	*(int *)p = (int)size;
+
+	pthread_mutex_lock(&mutex);
 	if (!block)
 	{
 		count++;
 	}
-	*(int *)p = size;
-	use += (size - s);
-	pthread_mutex_unlock(&mutex); // 互斥锁解锁
+	use += (int)(size - old_size);
+	pthread_mutex_unlock(&mutex);
 
-	return (void *)((char *)p + sizeof(int));
+	return (char *)p + HEADER_SIZE;
 }
 
-int v_mcheck(int *_count, int *_use)
+int v_mcheck(int *dstCount, int *dstUse)
 {
-	pthread_mutex_lock(&mutex); // 互斥锁上锁
-	if (_count)
+	pthread_mutex_lock(&mutex);
+	if (dstCount)
 	{
-		*_count = count;
+		*dstCount = count;
 	}
-	if (_use)
+	if (dstUse)
 	{
-		*_use = use;
+		*dstUse = use;
 	}
-	pthread_mutex_unlock(&mutex); // 互斥锁解锁
-
+	pthread_mutex_unlock(&mutex);
 	return 0;
 }
 
@@ -106,6 +123,5 @@ void displayMem(void)
 
 void vallocInit(void)
 {
-	/* 初始化互斥锁 */
 	pthread_mutex_init(&mutex, NULL);
 }

+ 8 - 14
platform/linux/valloc/valloc.h

@@ -12,24 +12,18 @@
 #define __valloc_H
 
 #ifdef __cplusplus
-extern "C"
-{
+extern "C" {
 #endif
 
 #include <stdlib.h>
 
-    void *v_malloc(size_t size);
-    void *v_calloc(size_t num, size_t size);
-    void v_free(void *block);
-    void *v_realloc(void *block, size_t size);
-    int v_mcheck(int *_count, int *_use);
-    void displayMem(void);
-    void vallocInit(void);
-
-#define malloc v_malloc
-#define calloc v_calloc
-#define free v_free
-#define realloc v_realloc
+extern void *v_malloc(size_t size);
+extern void *v_calloc(size_t num, size_t size);
+extern void v_free(void *block);
+extern void *v_realloc(void *block, size_t size);
+extern int v_mcheck(int *dstCount, int *dstUse);
+extern void displayMem(void);
+extern void vallocInit(void);
 
 #ifdef __cplusplus
 }

+ 15 - 11
platform/openLuat/platformNetwork.c

@@ -50,7 +50,7 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
 	};
 
 	// 传递的是ip地址,不用进行dns解析,某些情况下调用dns解析反而会错误
-	if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr) == 1)
+	if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr))
 	{
 	}
 	// 解析域名信息
@@ -164,11 +164,13 @@ int32_t platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetw
 	{
 		int32_t rt_errno = errno; // 似乎RT 5.0.0以上版本需要使用 rt_get_errno
 		// 下列表示没问题,但需要退出接收
-		if (EAGAIN == rt_errno ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+		if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+#if EAGAIN != EWOULDBLOCK
 		    EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
-		    EINTR == rt_errno ||       // 操作被信号中断
-		    ETIME == rt_errno ||       // 计时器过期(部分平台)
-		    ETIMEDOUT == rt_errno)     // 超时(通用)
+#endif
+		    EINTR == rt_errno ||   // 操作被信号中断
+		    ETIME == rt_errno ||   // 计时器过期(部分平台)
+		    ETIMEDOUT == rt_errno) // 超时(通用)
 		{
 			return 0;
 		}
@@ -218,13 +220,15 @@ int32_t platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetw
 
 	if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
 	{
-		int32_t rt_errno = errno;      // 似乎5.0.0以上版本需要使用 rt_get_errno
-					       // 下列表示没问题,但需要退出发送
-		if (EAGAIN == rt_errno ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+		int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
+					  // 下列表示没问题,但需要退出发送
+		if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+#if EAGAIN != EWOULDBLOCK
 		    EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
-		    EINTR == rt_errno ||       // 操作被信号中断
-		    ETIME == rt_errno ||       // 计时器过期(部分平台)
-		    ETIMEDOUT == rt_errno)     // 超时(通用)
+#endif
+		    EINTR == rt_errno ||   // 操作被信号中断
+		    ETIME == rt_errno ||   // 计时器过期(部分平台)
+		    ETIMEDOUT == rt_errno) // 超时(通用)
 		{
 			return 0;
 		}

+ 13 - 9
platform/rtthread/platformNetwork.c

@@ -52,7 +52,7 @@ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platfo
 	// 传递的是ip地址,不用进行dns解析,某些情况下调用dns解析反而会错误
 	// RT-Thread平台下lwip和netdev都是通过宏定义方式定义 inet_pton 和 inet_addr,所以这里没有问题
 #ifdef inet_pton
-	if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr) == 1)
+	if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr))
 	{
 	}
 #elif defined(inet_addr)
@@ -180,11 +180,13 @@ int32_t platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetw
 		}
 
 		// 下列表示没问题,但需要退出接收
-		if (EAGAIN == rt_errno ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+		if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+#if EAGAIN != EWOULDBLOCK
 		    EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
-		    EINTR == rt_errno ||       // 操作被信号中断
-		    ETIME == rt_errno ||       // 计时器过期(部分平台)
-		    ETIMEDOUT == rt_errno)     // 超时(通用)
+#endif
+		    EINTR == rt_errno ||   // 操作被信号中断
+		    ETIME == rt_errno ||   // 计时器过期(部分平台)
+		    ETIMEDOUT == rt_errno) // 超时(通用)
 		{
 			return 0;
 		}
@@ -241,11 +243,13 @@ int32_t platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetw
 		}
 
 		// 下列表示没问题,但需要退出发送
-		if (EAGAIN == rt_errno ||      // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+		if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
+#if EAGAIN != EWOULDBLOCK
 		    EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
-		    EINTR == rt_errno ||       // 操作被信号中断
-		    ETIME == rt_errno ||       // 计时器过期(部分平台)
-		    ETIMEDOUT == rt_errno)     // 超时(通用)
+#endif
+		    EINTR == rt_errno ||   // 操作被信号中断
+		    ETIME == rt_errno ||   // 计时器过期(部分平台)
+		    ETIMEDOUT == rt_errno) // 超时(通用)
 		{
 			return 0;
 		}

+ 3 - 24
test/RyanMqttMultiThreadMultiClientTest.c

@@ -19,8 +19,6 @@ typedef struct
 	volatile int runningThreads;
 	volatile int totalPublished;
 	volatile int totalReceived;
-	pthread_mutex_t statsMutex;
-	pthread_cond_t completionCond;
 	volatile int testComplete;
 } MultiThreadTestControl_t;
 
@@ -82,7 +80,8 @@ static void *concurrentPublishThread(void *arg)
 
 	// 订阅主题
 	RyanMqttSnprintf(topic, sizeof(topic), "test/thread/%d", testData->threadIndex);
-	result = RyanMqttSubscribe(testData->client, topic, testData->threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
+	result = RyanMqttSubscribe(testData->client, topic, RyanMqttQos2);
+	// result = RyanMqttSubscribe(testData->client, topic, testData->threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
 	if (RyanMqttSuccessError != result)
 	{
 		RyanMqttLog_e("Thread %d: Failed to subscribe", testData->threadIndex);
@@ -101,7 +100,7 @@ static void *concurrentPublishThread(void *arg)
 			RyanMqttLog_e("Thread %d: Failed to publish message %d", testData->threadIndex, i);
 		}
 
-		delay(i % 2 ? 2 : 1);
+		delay_us(900);
 	}
 
 	// 等待消息处理完成
@@ -128,14 +127,6 @@ cleanup:
 		RyanMqttTestDestroyClient(testData->client);
 	}
 
-	pthread_mutex_lock(&g_testControl.statsMutex);
-	g_testControl.runningThreads--;
-	if (g_testControl.runningThreads == 0)
-	{
-		pthread_cond_signal(&g_testControl.completionCond);
-	}
-	pthread_mutex_unlock(&g_testControl.statsMutex);
-
 	return NULL;
 }
 
@@ -149,8 +140,6 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 
 	// 初始化测试控制结构
 	RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
-	pthread_mutex_init(&g_testControl.statsMutex, NULL);
-	pthread_cond_init(&g_testControl.completionCond, NULL);
 	g_testControl.runningThreads = CONCURRENT_CLIENTS;
 
 	// 创建测试线程
@@ -167,14 +156,6 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 		}
 	}
 
-	// 等待所有线程完成
-	pthread_mutex_lock(&g_testControl.statsMutex);
-	while (g_testControl.runningThreads > 0)
-	{
-		pthread_cond_wait(&g_testControl.completionCond, &g_testControl.statsMutex);
-	}
-	pthread_mutex_unlock(&g_testControl.statsMutex);
-
 	// 等待线程结束
 	for (int i = 0; i < CONCURRENT_CLIENTS; i++)
 	{
@@ -203,8 +184,6 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 	}
 
 cleanup:
-	pthread_mutex_destroy(&g_testControl.statsMutex);
-	pthread_cond_destroy(&g_testControl.completionCond);
 
 	return result;
 }

+ 60 - 53
test/RyanMqttMultiThreadSafetyTest.c

@@ -16,12 +16,9 @@ typedef struct
 // 多线程测试控制结构
 typedef struct
 {
-	int32_t runningThreads;
 	int32_t totalPublished;
 	int32_t totalReceived;
 	int32_t threadIndex;
-	pthread_mutex_t statsMutex;
-	pthread_cond_t completionCond;
 	int32_t testComplete;
 	RyanMqttClient_t *client;
 } MultiThreadTestControl_t;
@@ -29,23 +26,69 @@ typedef struct
 static MultiThreadTestControl_t g_testControl = {0};
 static ThreadTestData_t g_threadTestData[CONCURRENT_CLIENTS + 1] = {0};
 
+static bool safeParseTopic(const char *topic, uint32_t topicLen, int *threadId)
+{
+	const char *prefix = "testThread/";
+	const char *suffix = "/tttt";
+
+	size_t prefix_len = strlen(prefix);
+	size_t suffix_len = strlen(suffix);
+
+	if (topicLen <= prefix_len + suffix_len)
+	{
+		return false;
+	}
+	if (strncmp(topic, prefix, prefix_len) != 0)
+	{
+		return false;
+	}
+	if (topicLen < prefix_len + suffix_len)
+	{
+		return false;
+	}
+	if (strncmp(topic + topicLen - suffix_len, suffix, suffix_len) != 0)
+	{
+		return false;
+	}
+
+	size_t num_len = topicLen - prefix_len - suffix_len;
+	if (num_len == 0 || num_len >= 16)
+	{
+		return false;
+	}
+
+	char num_buf[16] = {0};
+	memcpy(num_buf, topic + prefix_len, num_len);
+
+	char *endptr = NULL;
+	long val = strtol(num_buf, &endptr, 10);
+	if (*endptr != '\0')
+	{
+		return false;
+	}
+
+	*threadId = (int)val;
+	return true;
+}
+
 // 多线程事件处理函数
 static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
 {
-
 	switch (event)
 	{
 	case RyanMqttEventPublished: {
 		RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
 		// RyanMqttLog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
 
-		int thread_id;
+		int threadId;
 
 		// NOLINTNEXTLINE(cert-err34-c)
-		if (1 == sscanf(msgHandler->topic, "testThread/%d/tttt", &thread_id))
+		// if (1 == sscanf(msgHandler->topic, "testThread/%d/tttt", &threadId))
+		if (safeParseTopic(msgHandler->topic, msgHandler->topicLen, &threadId))
 		{
-			ThreadTestData_t *testData = &g_threadTestData[thread_id];
+
 			RyanMqttTestEnableCritical();
+			ThreadTestData_t *testData = &g_threadTestData[threadId];
 			testData->publishedCount += 1;
 			g_testControl.totalPublished += 1;
 			RyanMqttTestExitCritical();
@@ -59,13 +102,14 @@ static void multiThreadEventHandle(void *pclient, RyanMqttEventId_e event, const
 		// RyanMqttLog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
 		// 	      msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
 
-		int thread_id;
+		int threadId;
 
-		// NOLINTNEXTLINE(cert-err34-c)
-		if (1 == sscanf(msgData->topic, "testThread/%d/tttt", &thread_id))
+		// 非线程安全
+		// if (1 == sscanf(msgData->topic, "testThread/%d/tttt", &threadId))
+		if (safeParseTopic(msgData->topic, msgData->topicLen, &threadId))
 		{
-			ThreadTestData_t *testData = &g_threadTestData[thread_id];
 			RyanMqttTestEnableCritical();
+			ThreadTestData_t *testData = &g_threadTestData[threadId];
 			testData->receivedCount += 1;
 			g_testControl.totalReceived += 1;
 			RyanMqttTestExitCritical();
@@ -94,7 +138,8 @@ static void *concurrentPublishThread(void *arg)
 
 	// 订阅主题
 	RyanMqttSnprintf(topic, sizeof(topic), "testThread/%d/tttt", threadIndex);
-	result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
+	result = RyanMqttSubscribe(g_testControl.client, topic, RyanMqttQos2);
+	// result = RyanMqttSubscribe(g_testControl.client, topic, threadIndex % 2 ? RyanMqttQos2 : RyanMqttQos1);
 	if (RyanMqttSuccessError != result)
 	{
 		RyanMqttLog_e("Thread %d: Failed to subscribe", threadIndex);
@@ -124,7 +169,7 @@ static void *concurrentPublishThread(void *arg)
 			}
 		}
 
-		delay_us(700); // 电脑配置不一样需要的时间也就不一样
+		delay_us(900); // 电脑配置不一样需要的时间也就不一样
 	}
 
 	// 等待消息处理完成
@@ -146,14 +191,6 @@ static void *concurrentPublishThread(void *arg)
 cleanup:
 	delay(50); // 让mqtt线程运行
 
-	pthread_mutex_lock(&g_testControl.statsMutex);
-	g_testControl.runningThreads--;
-	if (g_testControl.runningThreads == 0)
-	{
-		pthread_cond_signal(&g_testControl.completionCond);
-	}
-	pthread_mutex_unlock(&g_testControl.statsMutex);
-
 	return NULL;
 }
 
@@ -166,9 +203,6 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 
 	// 初始化测试控制结构
 	RyanMqttMemset(&g_testControl, 0, sizeof(g_testControl));
-	pthread_mutex_init(&g_testControl.statsMutex, NULL);
-	pthread_cond_init(&g_testControl.completionCond, NULL);
-	g_testControl.runningThreads = CONCURRENT_CLIENTS;
 
 	// 初始化客户端
 	result =
@@ -179,21 +213,6 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 	for (int i = 0; i < CONCURRENT_CLIENTS; i++)
 	{
 
-		struct sched_param param;
-
-		// 初始化线程属性
-		pthread_attr_init(&g_threadTestData[i].attr);
-
-		// 设置线程为显式调度属性(否则可能忽略优先级)
-		pthread_attr_setinheritsched(&g_threadTestData[i].attr, PTHREAD_EXPLICIT_SCHED);
-
-		// 设置调度策略,例如 SCHED_FIFO 或 SCHED_RR(实时策略)
-		pthread_attr_setschedpolicy(&g_threadTestData[i].attr, SCHED_FIFO);
-
-		// 设置优先级(范围依赖于调度策略)
-		param.sched_priority = 1; // 实时优先级范围通常是 1 ~ 99
-		pthread_attr_setschedparam(&g_threadTestData[i].attr, &param);
-
 		if (pthread_create(&g_threadTestData[i].threadId, NULL, concurrentPublishThread, NULL) != 0)
 		{
 			RyanMqttLog_e("Failed to create thread %d", i);
@@ -202,23 +221,12 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 		}
 	}
 
-	// 等待所有线程完成
-	pthread_mutex_lock(&g_testControl.statsMutex);
-	while (g_testControl.runningThreads > 0)
-	{
-		pthread_cond_wait(&g_testControl.completionCond, &g_testControl.statsMutex);
-	}
-	pthread_mutex_unlock(&g_testControl.statsMutex);
-
 	// 等待线程结束
 	for (int i = 0; i < CONCURRENT_CLIENTS; i++)
 	{
 		pthread_join(g_threadTestData[i].threadId, NULL);
-		pthread_attr_destroy(&g_threadTestData[i].attr);
 	}
 
-	RyanMqttTestDestroyClient(g_testControl.client);
-
 	// 统计结果
 	RyanMqttLog_i("Multi-client test results:");
 	RyanMqttLog_i("  Total published: %d", g_testControl.totalPublished);
@@ -240,10 +248,9 @@ static RyanMqttError_e multiClientConcurrentTest(void)
 		result = RyanMqttFailedError;
 	}
 
-cleanup:
-	pthread_mutex_destroy(&g_testControl.statsMutex);
-	pthread_cond_destroy(&g_testControl.completionCond);
+	RyanMqttTestDestroyClient(g_testControl.client);
 
+cleanup:
 	return result;
 }
 

+ 6 - 6
test/RyanMqttPubTest.c

@@ -115,14 +115,14 @@ static RyanMqttError_e RyanMqttPublishTest(RyanMqttQos_e qos, int32_t count, uin
 		// NOLINTNEXTLINE(cert-err33-c)
 		time(&timeStampNow);
 
-		pubStr = (char *)malloc(2048);
+		pubStr = (char *)platformMemoryMalloc(2048);
 		RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
 		RyanMqttMemset(pubStr, 0, 2048);
 
 		// NOLINTNEXTLINE(cert-msc32-c,cert-msc51-cpp)
 		srand(timeStampNow);
 		// NOLINTNEXTLINE(concurrency-mt-unsafe,cert-msc30-c,cert-msc50-cpp)
-		for (int32_t i = 0; i < rand() % 250 + 1 + 100; i++)
+		for (uint32_t i = 0; i < (uint32_t)(rand() % 250 + 1 + 100); i++)
 		{
 			// NOLINTNEXTLINE(concurrency-mt-unsafe,cert-msc30-c,cert-msc50-cpp)
 			snprintf(pubStr + 4 * i, 2048 - 4 * i, "%04d", rand());
@@ -183,7 +183,7 @@ static RyanMqttError_e RyanMqttPublishTest(RyanMqttQos_e qos, int32_t count, uin
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
 
 __exit:
-	free(pubStr);
+	platformMemoryFree(pubStr);
 	pubStr = NULL;
 	RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端");
 	RyanMqttTestDestroyClient(client);
@@ -243,14 +243,14 @@ static RyanMqttError_e RyanMqttPublishHybridTest(int32_t count, uint32_t delayms
 		// NOLINTNEXTLINE(cert-err33-c)
 		time(&timeStampNow);
 
-		pubStr = (char *)malloc(2048);
+		pubStr = (char *)platformMemoryMalloc(2048);
 		RyanMqttCheck(NULL != pubStr, RyanMqttNotEnoughMemError, RyanMqttLog_e);
 		RyanMqttMemset(pubStr, 0, 2048);
 
 		// NOLINTNEXTLINE(cert-msc32-c,cert-msc51-cpp)
 		srand(timeStampNow);
 		// NOLINTNEXTLINE(concurrency-mt-unsafe,cert-msc30-c,cert-msc50-cpp)
-		for (int32_t i = 0; i < rand() % 250 + 1 + 100; i++)
+		for (uint32_t i = 0; i < (uint32_t)(rand() % 250 + 1 + 100); i++)
 		{
 			// NOLINTNEXTLINE(concurrency-mt-unsafe,cert-msc30-c,cert-msc50-cpp)
 			snprintf(pubStr + 4 * i, 2048 - 4 * i, "%04d", rand());
@@ -310,7 +310,7 @@ static RyanMqttError_e RyanMqttPublishHybridTest(int32_t count, uint32_t delayms
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
 
 __exit:
-	free(pubStr);
+	platformMemoryFree(pubStr);
 	pubStr = NULL;
 	RyanMqttLog_i("mqtt 发布测试,销毁mqtt客户端");
 	RyanMqttTestDestroyClient(client);

+ 15 - 9
test/RyanMqttSubTest.c

@@ -5,6 +5,11 @@ static int32_t subTestCount = 0;
 
 static RyanMqttSubscribeData_t *topicIsSubscribeArr(char *topic)
 {
+	if (NULL == topic)
+	{
+		return NULL;
+	}
+
 	for (int32_t i = 0; i < subTestCount; i++)
 	{
 		if (0 == RyanMqttStrncmp(topic, subscribeManyData[i].topic, RyanMqttStrlen(topic)))
@@ -158,7 +163,8 @@ static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
 
 	// 生成需要订阅的主题数据
 	{
-		subscribeManyData = (RyanMqttSubscribeData_t *)malloc(sizeof(RyanMqttSubscribeData_t) * count);
+		subscribeManyData =
+			(RyanMqttSubscribeData_t *)platformMemoryMalloc(sizeof(RyanMqttSubscribeData_t) * count);
 		if (NULL == subscribeManyData)
 		{
 			RyanMqttLog_e("内存不足");
@@ -169,7 +175,7 @@ static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
 		for (int32_t i = 0; i < count; i++)
 		{
 			subscribeManyData[i].qos = i % 3;
-			char *topic = (char *)malloc(64);
+			char *topic = (char *)platformMemoryMalloc(64);
 			if (NULL == topic)
 			{
 				RyanMqttLog_e("内存不足");
@@ -207,7 +213,7 @@ static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });
 
 	// 测试取消所有订阅消息
-	unSubscribeManyData = malloc(sizeof(RyanMqttUnSubscribeData_t) * count);
+	unSubscribeManyData = platformMemoryMalloc(sizeof(RyanMqttUnSubscribeData_t) * count);
 	if (NULL == unSubscribeManyData)
 	{
 		RyanMqttLog_e("内存不足");
@@ -216,7 +222,7 @@ static RyanMqttError_e RyanMqttSubscribeHybridTest(int32_t count)
 	}
 	for (int32_t i = 0; i < count; i++)
 	{
-		char *topic = (char *)malloc(64);
+		char *topic = (char *)platformMemoryMalloc(64);
 		if (NULL == topic)
 		{
 			RyanMqttLog_e("内存不足");
@@ -283,23 +289,23 @@ __exit:
 	{
 		if (NULL != subscribeManyData && NULL != subscribeManyData[i].topic)
 		{
-			free(subscribeManyData[i].topic);
+			platformMemoryFree(subscribeManyData[i].topic);
 		}
 
-		if (NULL != subscribeManyData && NULL != unSubscribeManyData[i].topic)
+		if (NULL != unSubscribeManyData && NULL != unSubscribeManyData[i].topic)
 		{
-			free(unSubscribeManyData[i].topic);
+			platformMemoryFree(unSubscribeManyData[i].topic);
 		}
 	}
 
 	if (NULL != subscribeManyData)
 	{
-		free(subscribeManyData);
+		platformMemoryFree(subscribeManyData);
 	}
 
 	if (NULL != unSubscribeManyData)
 	{
-		free(unSubscribeManyData);
+		platformMemoryFree(unSubscribeManyData);
 	}
 
 	RyanMqttLog_i("mqtt 订阅测试,销毁mqtt客户端");

+ 9 - 11
test/RyanMqttTest.c

@@ -92,7 +92,7 @@ void mqttEventBaseHandle(void *pclient, RyanMqttEventId_e event, const void *eve
 	{
 		// 这里选择直接丢弃该消息
 		RyanMqttAckHandler_t *ackHandler = (RyanMqttAckHandler_t *)eventData;
-		RyanMqttLog_w("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d",
+		RyanMqttLog_e("ack重发次数超过警戒值回调 packetType: %d, packetId: %d, topic: %s, qos: %d",
 			      ackHandler->packetType, ackHandler->packetId, ackHandler->msgHandler->topic,
 			      ackHandler->msgHandler->qos);
 		RyanMqttDiscardAckHandler(client, ackHandler->packetType, ackHandler->packetId);
@@ -127,10 +127,6 @@ void mqttEventBaseHandle(void *pclient, RyanMqttEventId_e event, const void *eve
 	}
 }
 
-void RyanMqttTestUserDataEvent(void *pclient, RyanMqttEventId_e event, const void *eventData)
-{
-}
-
 RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncFlag, RyanMqttBool_e autoReconnectFlag,
 				 uint16_t keepaliveTimeoutS, RyanMqttEventHandle mqttEventCallback, void *userData)
 {
@@ -145,7 +141,7 @@ RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncF
 	RyanMqttSnprintf(aaa, sizeof(aaa), "%s%d", RyanMqttClientId, count);
 
 	struct RyanMqttTestEventUserData *eventUserData =
-		(struct RyanMqttTestEventUserData *)malloc(sizeof(struct RyanMqttTestEventUserData));
+		(struct RyanMqttTestEventUserData *)platformMemoryMalloc(sizeof(struct RyanMqttTestEventUserData));
 	if (NULL == eventUserData)
 	{
 		RyanMqttLog_e("内存不足");
@@ -171,7 +167,7 @@ RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncF
 					     .taskStack = 4096,
 					     .mqttVersion = 4,
 					     .ackHandlerRepeatCountWarning = 6,
-					     .ackHandlerCountWarning = 2000,
+					     .ackHandlerCountWarning = 60000,
 					     .autoReconnectFlag = autoReconnectFlag,
 					     .cleanSessionFlag = RyanMqttTrue,
 					     .reconnectTimeout = 3000,
@@ -246,8 +242,9 @@ RyanMqttError_e RyanMqttTestDestroyClient(RyanMqttClient_t *client)
 		sem_destroy(&eventUserData->sem);
 	}
 
-	free(eventUserData);
-	delay(3);
+	// RyanMqttEventDestroyBefore 后mqtt客户端还要执行清除操作
+	delay(10);
+	platformMemoryFree(eventUserData);
 	return RyanMqttSuccessError;
 }
 
@@ -288,16 +285,17 @@ void printfArrStr(uint8_t *buf, uint32_t len, char *userData)
 	RyanMqttLog_raw("\r\n");
 }
 
-void RyanMqttTestEnableCritical()
+void RyanMqttTestEnableCritical(void)
 {
 	pthread_spin_lock(&spin);
 }
 
-void RyanMqttTestExitCritical()
+void RyanMqttTestExitCritical(void)
 {
 	pthread_spin_unlock(&spin);
 }
 
+// todo 增加session测试
 // !当测试程序出错时,并不会回收内存。交由父进程进行回收
 int main(void)
 {

+ 9 - 4
test/RyanMqttTest.h

@@ -9,9 +9,15 @@ extern "C" {
 #include <stdint.h>
 #include <string.h>
 #include <stdlib.h>
+#include <stdbool.h>
 #include <unistd.h>
 #include <semaphore.h>
 #include <pthread.h>
+#include "valloc.h"
+#define malloc  v_malloc
+#define calloc  v_calloc
+#define free    v_free
+#define realloc v_realloc
 
 #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
 
@@ -40,9 +46,8 @@ extern "C" {
 			RyanMqttLog_e("内存泄漏");                                                                     \
 			while (1)                                                                                      \
 			{                                                                                              \
-				int area = 0, use = 0;                                                                 \
 				v_mcheck(&area, &use);                                                                 \
-				RyanMqttLog_w("|||----------->>> area = %d, size = %d", area, use);                    \
+				RyanMqttLog_e("|||----------->>> area = %d, size = %d", area, use);                    \
 				delay(3000);                                                                           \
 			}                                                                                              \
 		}                                                                                                      \
@@ -59,8 +64,8 @@ struct RyanMqttTestEventUserData
 };
 /* extern variables-----------------------------------------------------------*/
 
-extern void RyanMqttTestEnableCritical();
-extern void RyanMqttTestExitCritical();
+extern void RyanMqttTestEnableCritical(void);
+extern void RyanMqttTestExitCritical(void);
 extern RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncFlag,
 					RyanMqttBool_e autoReconnectFlag, uint16_t keepaliveTimeoutS,
 					RyanMqttEventHandle mqttEventCallback, void *userData);

+ 66 - 10
xmake.lua

@@ -6,24 +6,80 @@ target("RyanMqtt",function()
     set_toolchains("gcc")  -- 确保使用 GCC
     -- set_toolchains("clang-20")  
     set_languages("gnu99") -- 启用 GNU 扩展
-    set_warnings("everything") -- 启用全部警告 -Wall -Wextra -Weffc++ / -Weverything
+
+    add_defines("PKG_USING_RYANMQTT_IS_ENABLE_ASSERT") -- 开启assert
+    add_defines("RyanMqttLinuxTestEnable") -- linux测试
 
     -- set_optimize("smallest") -- -Os
     -- set_optimize("faster") -- -O2
     -- set_optimize("fastest") -- -O3
     set_optimize("aggressive") -- -Ofast
 
-    add_defines("PKG_USING_RYANMQTT_IS_ENABLE_ASSERT") -- 开启assert
+    -- 启用全部警告
+    set_warnings("everything") -- -Wall -Wextra -Weffc++ / -Weverything
+
+    -- 链接器选项:生成 map 文件
     add_ldflags("-Wl,-Map=$(buildir)/RyanMqtt.map")
+
+    -- Sanitizer 检测项(运行时错误)
+    add_ldflags(
+        "-fsanitize=address", -- 内存越界、释放后使用
+        "-fsanitize=leak", -- 内存泄漏
+        "-fsanitize=undefined", -- 未定义行为(除零、溢出、无效移位等)
+        "-fsanitize=pointer-compare", -- 无效指针比较
+        "-fsanitize=pointer-subtract", -- 无效指针相减
+        "-fsanitize=bounds", -- 数组越界
+        "-fsanitize=float-divide-by-zero", -- 浮点除零
+        "-fsanitize=float-cast-overflow", -- 浮点转整数溢出
+        -- "-fsanitize=alignment", -- 检测未对齐的内存访问
+        "-fno-sanitize=alignment", -- 某些平台不兼容
+        {force = true}
+    )
+
+    -- 编译器警告与静态分析(开发期错误检测)
     add_cxflags(
-                "-pedantic",  
-                "-Wall",
-                "-Wextra",
-                "-Wno-unused-parameter",
-                -- clang的
-                -- "-Wno-gnu-zero-variadic-macro-arguments",
-                -- "-Wno-c23-extensions",
-                {force=true})
+        "-flto", -- 链接时优化(可选)
+        "-pedantic",  
+        "-Wall",
+        "-Wextra",
+        "-fanalyzer", -- 启用 gcc 静态分析器
+        "-Wno-unused-parameter",
+        "-Wfloat-equal", -- 浮点直接比较
+        "-Wshadow", -- 局部变量遮蔽
+        "-Wcast-align", -- 类型转换对齐问题
+        "-Wpointer-arith", -- 指针运算
+        "-Warray-bounds=2", -- 数组越界访问(编译期可分析到的)
+        "-Wstringop-overflow=2", -- memcpy / strcpy 等可能的溢出
+        "-Wstringop-truncation", -- 字符串截断风险
+        "-Walloc-zero", -- malloc(0) 等分配 0 字节的情况
+        "-Wfree-nonheap-object", -- 释放非堆对象
+        -- "-Wconversion", -- 隐式类型转换可能导致精度丢失/溢出
+        "-Wnull-dereference", -- 空指针解引用
+        "-Wlogical-op", -- 逻辑运算符误用
+        "-Wstrict-overflow=5", -- 有符号溢出优化假设
+        "-Wmissing-prototypes", -- 未在头文件声明的全局函数
+        "-Wmissing-declarations", -- 未声明的全局变量/函数
+        "-Wredundant-decls", -- 重复声明
+        "-Wunreachable-code", -- 不可达代码
+        "-Wunsafe-loop-optimizations", -- 循环优化可能引入的问题
+        "-Wtype-limits", -- 比较恒真/恒假的表达式(如 unsigned < 0)
+        "-Wshift-overflow=2", -- 移位导致的溢出
+        "-Wshift-negative-value", -- 对负数进行移位
+        "-Wdiv-by-zero", -- 除以零(编译期可分析到的)
+        "-Wformat-security", -- 格式化字符串安全问题
+        "-Wdisabled-optimization", -- 被禁用的优化
+        -- "-Wwrite-strings", -- 将字符串字面量视为 const char[],防止被修改
+        "-Wreturn-local-addr", -- 返回局部变量地址
+        "-Wuse-after-free", -- 释放后使用(编译期可分析到的)
+        "-Wdangling-pointer=2", -- 悬空指针(GCC 12+)
+        "-fstack-protector-strong", -- 栈保护
+
+        -- clang 的兼容项(可选)
+        -- "-Wno-gnu-zero-variadic-macro-arguments",
+        -- "-Wno-c23-extensions",
+        {force = true}
+    )
+
     
     --加入代码和头文件
     add_includedirs('./common', {public = true})