Kaynağa Gözat

fix(ack): ack链表创建优化,ack资源部分没有互斥

RyanCW 1 yıl önce
ebeveyn
işleme
beffb38b4b

+ 6 - 7
mqttclient/RyanMqttClient.c

@@ -208,16 +208,16 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
     result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
-    result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, msgHandler, &ackHandler);
+    result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
         platformMemoryFree(msgHandler);
         platformMutexUnLock(client->config.userData, &client->sendBufLock);
     });
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
 
     // 添加等待 ack
     result = RyanMqttAckListAdd(client, ackHandler);
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
-    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);RyanMqttAckHandlerDestroy(client, ackHandler); });
 
@@ -256,13 +256,13 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
     RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
                       { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
-    result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, msgHandler, &ackHandler);
+    result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
 
     result = RyanMqttAckListAdd(client, ackHandler);
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
-    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);
                         RyanMqttAckHandlerDestroy(client, ackHandler); });
@@ -325,16 +325,15 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
     result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
-    result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, msgHandler, &ackHandler);
+    result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
         platformMemoryFree(msgHandler);
         platformMutexUnLock(client->config.userData, &client->sendBufLock);
     });
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
 
     result = RyanMqttAckListAdd(client, ackHandler);
     result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
-    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
-
     RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
                       { RyanMqttAckListRemove(client, ackHandler);
                         RyanMqttAckHandlerDestroy(client, ackHandler); });

+ 7 - 6
mqttclient/RyanMqttThread.c

@@ -186,8 +186,6 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
 
     // 每次收到PUBREC都返回ack
     result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
-    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
-    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     // 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
     result = RyanMqttAckListNodeFind(client, PUBREC, packetId, &ackHandlerPubrec);
@@ -203,7 +201,7 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
             RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
             // 创建一个 ACK 处理程序节点
-            result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, msgHandler, &ackHandler);
+            result = RyanMqttAckHandlerCreate(client, PUBCOMP, packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
             RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(msgHandler); });
 
             result = RyanMqttAckListAdd(client, ackHandler);
@@ -217,6 +215,8 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
             RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
         }
     }
+    platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
+    RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
     return result;
 }
@@ -273,8 +273,6 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
                           { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
 
         result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
-        platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
-        RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
 
         // 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish, 不进行qos2 PUBREC消息处理
         result = RyanMqttAckListNodeFind(client, PUBREL, msgData.packetId, &ackHandler);
@@ -283,12 +281,15 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
             result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, msgData.qos, &msgHandler);
             RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {});
 
-            result = RyanMqttAckHandlerCreate(client, PUBREL, msgData.packetId, packetLen, msgHandler, &ackHandler);
+            result = RyanMqttAckHandlerCreate(client, PUBREL, msgData.packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
             RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(msgHandler); });
 
             result = RyanMqttAckListAdd(client, ackHandler);
             deliverMsgFlag = RyanMqttTrue;
         }
+
+        platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
+        RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
         break;
 
     default:

+ 2 - 2
mqttclient/RyanMqttUtile.c

@@ -447,7 +447,7 @@ RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttClient_t *client, RyanMqttMsgHa
  * @param pAckHandler
  * @return RyanMqttError_e
  */
-RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId, uint16_t packetLen, RyanMqttMsgHandler_t *msgHandler, RyanMqttAckHandler_t **pAckHandler)
+RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId, uint16_t packetLen, char *packet, RyanMqttMsgHandler_t *msgHandler, RyanMqttAckHandler_t **pAckHandler)
 {
     RyanMqttAckHandler_t *ackHandler = NULL;
     RyanMqttAssert(NULL != client);
@@ -469,7 +469,7 @@ RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes
     ackHandler->packetType = packetType;
     ackHandler->msgHandler = msgHandler;
     ackHandler->packet = (char *)ackHandler + sizeof(RyanMqttAckHandler_t);
-    memcpy(ackHandler->packet, client->config.sendBuffer, packetLen); // 将packet数据保存到ack中
+    memcpy(ackHandler->packet, packet, packetLen); // 将packet数据保存到ack中
 
     *pAckHandler = ackHandler;
 

+ 1 - 1
mqttclient/RyanMqttUtile.h

@@ -26,7 +26,7 @@ extern "C"
     extern RyanMqttError_e RyanMqttMsgHandlerAdd(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
     extern RyanMqttError_e RyanMqttMsgHandlerRemove(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandler);
 
-    extern RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId, uint16_t packetLen, RyanMqttMsgHandler_t *msgHandler, RyanMqttAckHandler_t **pAckHandler);
+    extern RyanMqttError_e RyanMqttAckHandlerCreate(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId, uint16_t packetLen, char *packet, RyanMqttMsgHandler_t *msgHandler, RyanMqttAckHandler_t **pAckHandler);
     extern void RyanMqttAckHandlerDestroy(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
     extern RyanMqttError_e RyanMqttAckListAdd(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler);
     extern RyanMqttError_e RyanMqttAckListNodeFind(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId, RyanMqttAckHandler_t **pAckHandler);