|
|
@@ -104,6 +104,7 @@ static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *c
|
|
|
|
|
|
RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
|
|
|
|
|
|
+ RyanMqttAckListRemove(client, ackHandler);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
|
|
|
return result;
|
|
|
}
|
|
|
@@ -140,7 +141,10 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
|
|
|
// 删除pubrel记录
|
|
|
result = RyanMqttAckListNodeFind(client, PUBREL, packetId, &ackHandler);
|
|
|
if (RyanMqttSuccessError == result)
|
|
|
+ {
|
|
|
+ RyanMqttAckListRemove(client, ackHandler);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler);
|
|
|
+ }
|
|
|
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|
|
|
@@ -174,11 +178,15 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
|
|
|
// 查找ack链表是否存在pubcomp报文,不存在表示首次接收到
|
|
|
result = RyanMqttAckListNodeFind(client, PUBCOMP, packetId, &ackHandler);
|
|
|
if (RyanMqttSuccessError != result)
|
|
|
+ {
|
|
|
fastFlag = RyanMqttTrue;
|
|
|
-
|
|
|
+ }
|
|
|
// 出现pubrec和pubcomp同时存在的情况,清除pubrec理论上不会出现
|
|
|
else
|
|
|
+ {
|
|
|
+ RyanMqttAckListRemove(client, ackHandlerPubrec);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// 制作确认数据包并发送
|
|
|
@@ -213,6 +221,7 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
|
|
|
{
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
// 保证等待PUBCOMP记录成功后再清除PUBREC记录
|
|
|
+ RyanMqttAckListRemove(client, ackHandlerPubrec);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
|
|
|
}
|
|
|
|
|
|
@@ -350,6 +359,7 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
|
|
|
{
|
|
|
// mqtt事件回调
|
|
|
RyanMqttEventMachine(client, RyanMqttEventSubscribedFaile, (void *)ackHandler->msgHandler);
|
|
|
+ RyanMqttAckListRemove(client, ackHandler);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|
|
|
@@ -358,7 +368,10 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
|
|
|
// 查找是否有同名订阅,如果有就销毁之前的
|
|
|
result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler->topic, strlen(ackHandler->msgHandler->topic), RyanMqttFalse, &msgHandler);
|
|
|
if (RyanMqttSuccessError == result)
|
|
|
+ {
|
|
|
+ RyanMqttMsgHandlerRemove(msgHandler);
|
|
|
RyanMqttMsgHandlerDestory(msgHandler);
|
|
|
+ }
|
|
|
|
|
|
// 服务端可以授予比订阅者要求的低一些的 QoS 等级。
|
|
|
result = RyanMqttMsgHandlerCreate(ackHandler->msgHandler->topic,
|
|
|
@@ -368,7 +381,8 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
|
|
|
|
|
|
RyanMqttMsgHandlerAdd(client, msgHandler); // 将msg信息添加到订阅链表上
|
|
|
RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)msgHandler); // mqtt回调函数
|
|
|
- RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
|
|
|
+ RyanMqttAckListRemove(client, ackHandler);
|
|
|
+ RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
|
|
|
|
|
|
return result;
|
|
|
}
|
|
|
@@ -396,6 +410,7 @@ static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client)
|
|
|
// mqtt事件回调
|
|
|
RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
|
|
|
|
|
|
+ RyanMqttAckListRemove(client, ackHandler);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
|
|
|
|
|
|
return result;
|
|
|
@@ -553,6 +568,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
|
|
|
RyanMqttEventMachine(client, (SUBACK == ackHandler->packetType) ? RyanMqttEventSubscribedFaile : RyanMqttEventUnSubscribedFaile,
|
|
|
(void *)ackHandler->msgHandler);
|
|
|
// 清除句柄
|
|
|
+ RyanMqttAckListRemove(client, ackHandler);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler);
|
|
|
break;
|
|
|
}
|
|
|
@@ -757,6 +773,9 @@ void RyanMqttThread(void *argument)
|
|
|
// 清除session ack链表和msg链表
|
|
|
RyanMqttCleanSession(client);
|
|
|
|
|
|
+ platformThread_t mqttThread = *client->mqttThread;
|
|
|
+ void *userData = client->config->userData;
|
|
|
+
|
|
|
// 清除掉线程动态资源
|
|
|
platformMemoryFree(client->mqttThread);
|
|
|
client->mqttThread = NULL;
|
|
|
@@ -765,7 +784,7 @@ void RyanMqttThread(void *argument)
|
|
|
client = NULL;
|
|
|
|
|
|
// 销毁自身线程
|
|
|
- platformThreadDestroy(client->config->userData, client->mqttThread);
|
|
|
+ platformThreadDestroy(userData, &mqttThread);
|
|
|
}
|
|
|
|
|
|
// 客户端状态变更状态机
|