Просмотр исходного кода

refactor: 优化 PubRec 处理逻辑

RyanCW 3 месяцев назад
Родитель
Сommit
82f18604d9
1 измененных файлов с 38 добавлено и 46 удалено
  1. 38 46
      mqttclient/RyanMqttThreadProcessPacket.c

+ 38 - 46
mqttclient/RyanMqttThreadProcessPacket.c

@@ -70,8 +70,8 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client, MQT
 
 	// ?这里没法判断packetid是否非法,只能每次都回复咯
 	// 每次收到PUBREL都返回消息
-	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
-	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
+	// 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
+	RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 
 	return RyanMqttSuccessError;
 }
@@ -86,64 +86,58 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQT
 {
 	RyanMqttError_e result = RyanMqttSuccessError;
 	uint16_t packetId;
-	RyanMqttAckHandler_t *ackHandlerPubrec;
 
 	RyanMqttAssert(NULL != client);
 
-	// 反序列化ack
+	// 反序列化 pubrec 
 	MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_e);
 
-	// 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
-	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec, RyanMqttFalse);
-	if (RyanMqttSuccessError != result)
-	{
-		// 没有pubrec ,并且没有pubcomp,说明这个报文是非法报文,不进行mqtt服务器回复
-		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubrec,
-						 RyanMqttFalse);
-		RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttInvalidPacketError, RyanMqttLog_d);
-	}
-
-	// 可以安全的发送重传pubrel
+	// 序列化 pubrel 包
 	uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
 	MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
-
-	// 序列化ack数据包
 	status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREL, packetId);
 	RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_e);
 
-	// 需要小心 result 被覆盖
+	// 若已存在 PUBCOMP ack,说明此前已处理过 PUBREC;仅重发 PUBREL 即可
+	RyanMqttAckHandler_t *ackHandlerPubcomp;
+	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubcomp, RyanMqttFalse);
+	if (RyanMqttSuccessError == result)
+	{
+		goto __next;
+	}
+
+	RyanMqttAckHandler_t *ackHandlerPubrec;
+	result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec, RyanMqttTrue);
 	if (RyanMqttSuccessError == result)
 	{
 		RyanMqttMsgHandler_t *msgHandler;
-		RyanMqttAckHandler_t *ackHandler;
+		RyanMqttAckHandler_t *ackHandlerNewPubcomp;
 
 		// 首次收到消息,创建 pubcomp ack
 		result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
 						  ackHandlerPubrec->msgHandler->topicLen, RyanMqttMsgInvalidPacketId,
 						  ackHandlerPubrec->msgHandler->qos,
 						  ackHandlerPubrec->msgHandler->userData, &msgHandler);
-		RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
+		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
+				  { RyanMqttAckListAddToAckList(client, ackHandlerPubrec); });
 
 		// 期望收到pubcomp否则会重复发送pubrel
 		result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId,
 						  MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
-						  &ackHandler, RyanMqttFalse);
-		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
-				  { RyanMqttMsgHandlerDestroy(client, msgHandler); });
-		RyanMqttAckListAddToAckList(client, ackHandler);
+						  &ackHandlerNewPubcomp, RyanMqttFalse);
+		RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
+			RyanMqttMsgHandlerDestroy(client, msgHandler);
+			RyanMqttAckListAddToAckList(client, ackHandlerPubrec);
+		});
 
-		// 清除pubrec记录,必须使用find函数进行重新查找才能确保线程安全
-		result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec,
-						 RyanMqttTrue);
-		if (RyanMqttSuccessError == result)
-		{
-			RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
-		}
+		RyanMqttAckListAddToAckList(client, ackHandlerNewPubcomp);
+		RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
 	}
 
-	result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
-	RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
+__next:
+	// 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
+	RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 
 	return result;
 }
@@ -202,8 +196,8 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 		MQTTStatus_t status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBACK, packetId);
 		RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
 
-		result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
-		RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
+		// 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
+		RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 	}
 
 	break;
@@ -242,13 +236,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQ
 		}
 		else
 		{
-			// 不是第一次收到 publish 报文,不进行消息分发
-			RyanMqttLog_e("Not the first time to receive publish packet, packetId: %d", msgData.packetId);
+			// 非首次收到同一 packetId 的 PUBLISH(正常重传场景),不再分发
+			RyanMqttLog_d("Duplicate QoS2 PUBLISH, packetId: %d", msgData.packetId);
 		}
 
 		// 无论是不是第一次收到,都回复 pub ack报文
-		result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
-		RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
+		// 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
+		RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
 	}
 
 	break;
@@ -507,23 +501,21 @@ RyanMqttError_e RyanMqttGetPacketInfo(RyanMqttClient_t *client, MQTTPacketInfo_t
 		{
 			needReadSize = 3; // 最多还需要 3 个字节
 
-			// // 冗余,理论上不可能发生的
-			// if (sizeof(pBuffer) - readIndex > 0)
-			// {
-			// 	needReadSize = sizeof(pBuffer) - readIndex;F
-			// }
-			// else
+			// ?冗余,header最多为5字节,理论上不可能发生的,所以还是使用上面的形式吧
+			// if (readIndex >= sizeof(pBuffer))
 			// {
 			// 	result = RyanMqttFailedError;
 			// 	goto __next;
 			// }
+
+			// needReadSize = sizeof(pBuffer) - readIndex;
 			continue;
 		}
 
 		if (MQTTSuccess != status)
 		{
 			RyanMqttLog_e("解析固定报头失败 %d", status);
-			result = RyanMqttFailedError;
+			result = RyanMqttDeserializePacketError;
 			goto __next;
 		}