Selaa lähdekoodia

refactor: 优化

RyanCW 4 kuukautta sitten
vanhempi
sitoutus
3b07e7a492

+ 2 - 1
.vscode/settings.json

@@ -46,6 +46,7 @@
         "platformnetwork.h": "c",
         "inttypes.h": "c",
         "memory": "c",
-        "stdbool.h": "c"
+        "stdbool.h": "c",
+        "ryanmqttutil.h": "c"
     },
 }

+ 12 - 12
mqttclient/RyanMqttClient.c

@@ -145,7 +145,7 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
 	RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNoRescourceError, RyanMqttLog_d,
 			  { RyanMqttSetClientState(client, RyanMqttInitState); });
 
-	return RyanMqttSuccessError;
+	return result;
 }
 
 /**
@@ -217,7 +217,7 @@ RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
 	result = platformThreadStart(client->config.userData, &client->mqttThread);
 	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
 
-	return RyanMqttSuccessError;
+	return result;
 }
 
 /**
@@ -601,7 +601,7 @@ RyanMqttError_e RyanMqttPublishAndUserData(RyanMqttClient_t *client, char *topic
 		});
 	}
 
-	return RyanMqttSuccessError;
+	return result;
 }
 
 /**
@@ -834,27 +834,27 @@ static RyanMqttError_e RyanMqttClientConfigDeepCopy(RyanMqttClientConfig_t *dest
 	destConfig->host = NULL;
 	destConfig->taskName = NULL;
 
-	result = RyanMqttStringCopy(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
+	result = RyanMqttDupString(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
 	if (NULL != srcConfig->userName)
 	{
-		result = RyanMqttStringCopy(&destConfig->userName, srcConfig->userName,
-					    RyanMqttStrlen(srcConfig->userName));
+		result = RyanMqttDupString(&destConfig->userName, srcConfig->userName,
+					   RyanMqttStrlen(srcConfig->userName));
 		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 	}
 
 	if (NULL != srcConfig->password)
 	{
-		result = RyanMqttStringCopy(&destConfig->password, srcConfig->password,
-					    RyanMqttStrlen(srcConfig->password));
+		result = RyanMqttDupString(&destConfig->password, srcConfig->password,
+					   RyanMqttStrlen(srcConfig->password));
 		RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 	}
 
-	result = RyanMqttStringCopy(&destConfig->host, srcConfig->host, RyanMqttStrlen(srcConfig->host));
+	result = RyanMqttDupString(&destConfig->host, srcConfig->host, RyanMqttStrlen(srcConfig->host));
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
-	result = RyanMqttStringCopy(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
+	result = RyanMqttDupString(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
 __exit:
@@ -1022,7 +1022,7 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
 
 	if (payloadLen > 0)
 	{
-		result = RyanMqttStringCopy(&client->lwtOptions->payload, payload, payloadLen);
+		result = RyanMqttDupString(&client->lwtOptions->payload, payload, payloadLen);
 		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 	}
 	else
@@ -1030,7 +1030,7 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
 		client->lwtOptions->payload = NULL;
 	}
 
-	result = RyanMqttStringCopy(&client->lwtOptions->topic, topicName, RyanMqttStrlen(topicName));
+	result = RyanMqttDupString(&client->lwtOptions->topic, topicName, RyanMqttStrlen(topicName));
 	RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
 
 	client->lwtOptions->lwtFlag = RyanMqttTrue;

+ 5 - 2
mqttclient/RyanMqttThread.c

@@ -166,7 +166,10 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFla
 		case MQTT_PACKET_TYPE_PUBCOMP: // 理论不会出现,冗余措施
 		{
 			// 设置重发标志位
-			MQTT_UpdateDuplicatePublishFlag(ackHandler->packet, true);
+			if (ackHandler->packet && 0 == ackHandler->repeatCount)
+			{
+				MQTT_UpdateDuplicatePublishFlag(ackHandler->packet, true);
+			}
 
 			// 重发数据事件回调
 			RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
@@ -406,9 +409,9 @@ void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, v
 	switch (eventId)
 	{
 	case RyanMqttEventConnected: // 第一次连接成功
+		RyanMqttSetClientState(client, RyanMqttConnectState);
 		RyanMqttRefreshKeepaliveTime(client);
 		RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
-		RyanMqttSetClientState(client, RyanMqttConnectState);
 		break;
 
 	case RyanMqttEventDisconnected: // 断开连接事件

+ 7 - 5
mqttclient/RyanMqttUtil.c

@@ -9,15 +9,17 @@
  * @brief 字符串拷贝,需要手动释放内存
  *
  * @param dest
- * @param rest
+ * @param src
  * @param strLen
  * @return RyanMqttError_e
  */
-RyanMqttError_e RyanMqttStringCopy(char **dest, const char *rest, uint32_t strLen)
+RyanMqttError_e RyanMqttDupString(char **dest, const char *src, uint32_t strLen)
 {
 	RyanMqttAssert(NULL != dest);
-	RyanMqttAssert(NULL != rest);
-	// RyanMqttCheck(0 != strLen, RyanMqttFailedError, RyanMqttLog_d);
+	RyanMqttAssert(NULL != src);
+	RyanMqttCheck(0 != strLen, RyanMqttFailedError, RyanMqttLog_d);
+
+	*dest = NULL;
 
 	char *s = (char *)platformMemoryMalloc(strLen + 1);
 	if (NULL == s)
@@ -25,7 +27,7 @@ RyanMqttError_e RyanMqttStringCopy(char **dest, const char *rest, uint32_t strLe
 		return RyanMqttNotEnoughMemError;
 	}
 
-	RyanMqttMemcpy(s, rest, strLen);
+	RyanMqttMemcpy(s, src, strLen);
 	s[strLen] = '\0';
 
 	*dest = s;

+ 16 - 15
mqttclient/include/RyanMqttPlatform.h

@@ -43,23 +43,23 @@ extern "C" {
  * @addtogroup mqtt_constants
  * @{
  */
-#define MQTT_PACKET_TYPE_CONNECT        ( ( uint8_t ) 0x10U )  /**< @brief CONNECT (client-to-server). */
-#define MQTT_PACKET_TYPE_CONNACK        ( ( uint8_t ) 0x20U )  /**< @brief CONNACK (server-to-client). */
-#define MQTT_PACKET_TYPE_PUBLISH        ( ( uint8_t ) 0x30U )  /**< @brief PUBLISH (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBACK         ( ( uint8_t ) 0x40U )  /**< @brief PUBACK (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBREC         ( ( uint8_t ) 0x50U )  /**< @brief PUBREC (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBREL         ( ( uint8_t ) 0x62U )  /**< @brief PUBREL (bidirectional). */
-#define MQTT_PACKET_TYPE_PUBCOMP        ( ( uint8_t ) 0x70U )  /**< @brief PUBCOMP (bidirectional). */
-#define MQTT_PACKET_TYPE_SUBSCRIBE      ( ( uint8_t ) 0x82U )  /**< @brief SUBSCRIBE (client-to-server). */
-#define MQTT_PACKET_TYPE_SUBACK         ( ( uint8_t ) 0x90U )  /**< @brief SUBACK (server-to-client). */
-#define MQTT_PACKET_TYPE_UNSUBSCRIBE    ( ( uint8_t ) 0xA2U )  /**< @brief UNSUBSCRIBE (client-to-server). */
-#define MQTT_PACKET_TYPE_UNSUBACK       ( ( uint8_t ) 0xB0U )  /**< @brief UNSUBACK (server-to-client). */
-#define MQTT_PACKET_TYPE_PINGREQ        ( ( uint8_t ) 0xC0U )  /**< @brief PINGREQ (client-to-server). */
-#define MQTT_PACKET_TYPE_PINGRESP       ( ( uint8_t ) 0xD0U )  /**< @brief PINGRESP (server-to-client). */
-#define MQTT_PACKET_TYPE_DISCONNECT     ( ( uint8_t ) 0xE0U )  /**< @brief DISCONNECT (client-to-server). */
+#define MQTT_PACKET_TYPE_CONNECT     ((uint8_t)0x10U) /**< @brief CONNECT (client-to-server). */
+#define MQTT_PACKET_TYPE_CONNACK     ((uint8_t)0x20U) /**< @brief CONNACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PUBLISH     ((uint8_t)0x30U) /**< @brief PUBLISH (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBACK      ((uint8_t)0x40U) /**< @brief PUBACK (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBREC      ((uint8_t)0x50U) /**< @brief PUBREC (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBREL      ((uint8_t)0x62U) /**< @brief PUBREL (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBCOMP     ((uint8_t)0x70U) /**< @brief PUBCOMP (bidirectional). */
+#define MQTT_PACKET_TYPE_SUBSCRIBE   ((uint8_t)0x82U) /**< @brief SUBSCRIBE (client-to-server). */
+#define MQTT_PACKET_TYPE_SUBACK      ((uint8_t)0x90U) /**< @brief SUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_UNSUBSCRIBE ((uint8_t)0xA2U) /**< @brief UNSUBSCRIBE (client-to-server). */
+#define MQTT_PACKET_TYPE_UNSUBACK    ((uint8_t)0xB0U) /**< @brief UNSUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PINGREQ     ((uint8_t)0xC0U) /**< @brief PINGREQ (client-to-server). */
+#define MQTT_PACKET_TYPE_PINGRESP    ((uint8_t)0xD0U) /**< @brief PINGRESP (server-to-client). */
+#define MQTT_PACKET_TYPE_DISCONNECT  ((uint8_t)0xE0U) /**< @brief DISCONNECT (client-to-server). */
 /** @} */
 
-// RyanMqttT内部imer接口
+// RyanMqtt内部 timer 接口
 typedef struct
 {
 	uint32_t time;
@@ -101,6 +101,7 @@ extern RyanMqttError_e platformThreadDestroy(void *userData, platformThread_t *p
 extern RyanMqttError_e platformThreadStart(void *userData, platformThread_t *platformThread);
 extern RyanMqttError_e platformThreadStop(void *userData, platformThread_t *platformThread);
 
+// 互斥锁需支持递归
 extern RyanMqttError_e platformMutexInit(void *userData, platformMutex_t *platformMutex);
 extern RyanMqttError_e platformMutexDestroy(void *userData, platformMutex_t *platformMutex);
 extern RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex);

+ 1 - 1
mqttclient/include/RyanMqttUtil.h

@@ -27,7 +27,7 @@ int32_t coreMqttTransportRecv(NetworkContext_t *pNetworkContext, void *pBuffer,
 
 extern void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state);
 extern RyanMqttState_e RyanMqttGetClientState(RyanMqttClient_t *client);
-extern RyanMqttError_e RyanMqttStringCopy(char **dest, const char *rest, uint32_t strLen);
+extern RyanMqttError_e RyanMqttDupString(char **dest, const char *src, uint32_t strLen);
 extern void RyanMqttPurgeSession(RyanMqttClient_t *client);
 extern void RyanMqttPurgeConfig(RyanMqttClientConfig_t *clientConfig);
 

+ 19 - 19
test/RyanMqttPublicApiParamCheckTest.c

@@ -1,6 +1,6 @@
 #include "RyanMqttTest.h"
 
-static RyanMqttQos_e invaildQos()
+static RyanMqttQos_e invalidQos()
 {
 	static bool aa = true;
 
@@ -11,7 +11,7 @@ static RyanMqttQos_e invaildQos()
 		return (RyanMqttQos_e)(uintptr_t)10;
 	}
 
-	aa = false;
+	aa = true;
 	return RyanMqttSubFail;
 }
 
@@ -123,7 +123,7 @@ static RyanMqttError_e RyanMqttLwtApiParamCheckTest(void)
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 无效QoS
-	result = RyanMqttSetLwt(validClient, "test/lwt", "offline", 7, invaildQos(), RyanMqttTrue);
+	result = RyanMqttSetLwt(validClient, "test/lwt", "offline", 7, invalidQos(), RyanMqttTrue);
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 无效retain
@@ -272,7 +272,7 @@ static RyanMqttError_e RyanMqttSubApiParamCheckTest(void)
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 无效QoS级别
-	result = RyanMqttSubscribe(validClient, "test/topic", invaildQos());
+	result = RyanMqttSubscribe(validClient, "test/topic", invalidQos());
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 分配测试数据
@@ -317,7 +317,7 @@ static RyanMqttError_e RyanMqttSubApiParamCheckTest(void)
 		// subscribeData 内数据无效
 		subscribeData[i].topic = "test/topic2";
 		subscribeData[i].topicLen = strlen("test/topic2");
-		subscribeData[i].qos = invaildQos();
+		subscribeData[i].qos = invalidQos();
 		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
 		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
@@ -354,24 +354,24 @@ static RyanMqttError_e RyanMqttSubApiParamCheckTest(void)
 	result = RyanMqttUnSubscribeMany(validClient, 2, NULL);
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
-	for (uint32_t i = 0; i < getArraySize(subscribeData); i++)
+	for (uint32_t i = 0; i < getArraySize(unsubscribeData); i++)
 	{
-		// subscribeData 内数据无效
-		subscribeData[i].topic = NULL;
-		subscribeData[i].topicLen = strlen("test/topic2");
+		// unsubscribeData 内数据无效
+		unsubscribeData[i].topic = NULL;
+		unsubscribeData[i].topicLen = strlen("test/topic2");
 
-		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		result = RyanMqttUnSubscribeMany(validClient, 2, unsubscribeData);
 		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
-		// subscribeData 内数据无效
-		subscribeData[i].topic = "test/topic2";
-		subscribeData[i].topicLen = 0;
-		result = RyanMqttSubscribeMany(validClient, 2, subscribeData);
+		// unsubscribeData 内数据无效
+		unsubscribeData[i].topic = "test/topic2";
+		unsubscribeData[i].topicLen = 0;
+		result = RyanMqttUnSubscribeMany(validClient, 2, unsubscribeData);
 		RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
-		// 恢复 subscribeData 内数据
-		subscribeData[i].topic = "test/topic2";
-		subscribeData[i].topicLen = strlen("test/topic2");
+		// 恢复 unsubscribeData 内数据
+		unsubscribeData[i].topic = "test/topic2";
+		unsubscribeData[i].topicLen = strlen("test/topic2");
 	}
 
 	RyanMqttMsgHandler_t *msgHandles = NULL;
@@ -444,7 +444,7 @@ static RyanMqttError_e RyanMqttPubApiParamCheckTest(void)
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 无效QoS级别
-	result = RyanMqttPublish(validClient, "test/topic", "payload", 7, invaildQos(), RyanMqttFalse);
+	result = RyanMqttPublish(validClient, "test/topic", "payload", 7, invalidQos(), RyanMqttFalse);
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 超大负载长度
@@ -466,7 +466,7 @@ static RyanMqttError_e RyanMqttPubApiParamCheckTest(void)
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e, { goto __exit; });
 
 	// 无效QoS级别
-	result = RyanMqttPublishAndUserData(validClient, "test/topic", strlen("test/topic"), "payload", 7, invaildQos(),
+	result = RyanMqttPublishAndUserData(validClient, "test/topic", strlen("test/topic"), "payload", 7, invalidQos(),
 					    RyanMqttFalse, NULL);
 	RyanMqttCheckCodeNoReturn(RyanMqttParamInvalidError == result, result, RyanMqttLog_e,
 				  { goto __exit; }); // 清理资源

+ 17 - 8
test/RyanMqttTest.c

@@ -223,6 +223,7 @@ RyanMqttError_e RyanMqttTestInit(RyanMqttClient_t **client, RyanMqttBool_e syncF
 
 	if (RyanMqttConnectState != RyanMqttGetState(*client))
 	{
+		// 不处理错误,测试代码
 		RyanMqttLog_e("Connection timeout after %d ms", timeout_ms);
 		return RyanMqttFailedError;
 	}
@@ -253,19 +254,28 @@ RyanMqttError_e checkAckList(RyanMqttClient_t *client)
 	RyanMqttLog_w("等待检查ack链表,等待 recvTime: %d", client->config.recvTimeout);
 	delay(client->config.recvTimeout + 500);
 
-	if (!RyanMqttListIsEmpty(&client->ackHandlerList))
+	platformMutexLock(client->config.userData, &client->ackHandleLock);
+	int ackEmpty = RyanMqttListIsEmpty(&client->ackHandlerList);
+	platformMutexUnLock(client->config.userData, &client->ackHandleLock);
+	if (!ackEmpty)
 	{
 		RyanMqttLog_e("mqtt空间 ack链表不为空");
 		return RyanMqttFailedError;
 	}
 
-	if (!RyanMqttListIsEmpty(&client->userAckHandlerList))
+	platformMutexLock(client->config.userData, &client->userSessionLock);
+	int userAckEmpty = RyanMqttListIsEmpty(&client->userAckHandlerList);
+	platformMutexUnLock(client->config.userData, &client->userSessionLock);
+	if (!userAckEmpty)
 	{
 		RyanMqttLog_e("用户空间 ack链表不为空");
 		return RyanMqttFailedError;
 	}
 
-	if (!RyanMqttListIsEmpty(&client->msgHandlerList))
+	platformMutexLock(client->config.userData, &client->msgHandleLock);
+	int msgEmpty = RyanMqttListIsEmpty(&client->msgHandlerList);
+	platformMutexUnLock(client->config.userData, &client->msgHandleLock);
+	if (!msgEmpty)
 	{
 		RyanMqttLog_e("mqtt空间 msg链表不为空");
 		return RyanMqttFailedError;
@@ -299,6 +309,10 @@ void RyanMqttTestExitCritical(void)
 // !当测试程序出错时,并不会回收内存。交由父进程进行回收
 int main(void)
 {
+	RyanMqttError_e result = RyanMqttSuccessError;
+	vallocInit();
+
+	pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
 
 	// 多线程测试必须设置这个,否则会导致 heap-use-after-free, 原因如下
 	// 虽然也有办法解决,不过RyanMqtt目标为嵌入式场景,不想引入需要更多资源的逻辑,嵌入式场景目前想不到有这么频繁而且还是本机emqx的场景。
@@ -312,11 +326,6 @@ int main(void)
 	pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
 	sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
 
-	RyanMqttError_e result = RyanMqttSuccessError;
-	vallocInit();
-
-	pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
-
 	result = RyanMqttPublicApiParamCheckTest();
 	RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, RyanMqttLog_e, { goto __exit; });