|
|
@@ -261,9 +261,11 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
MQTTFixedBuffer_t fixedBuffer = {0};
|
|
|
size_t remainingLength = 0;
|
|
|
|
|
|
+ // 校验参数合法性
|
|
|
RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != subscribeManyData, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(count > 0, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
-
|
|
|
for (int32_t i = 0; i < count; i++)
|
|
|
{
|
|
|
RyanMqttCheck(NULL != subscribeManyData[i].topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
@@ -271,6 +273,7 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
RyanMqttParamInvalidError, rlog_d);
|
|
|
}
|
|
|
|
|
|
+ // 申请 coreMqtt 支持的topic格式空间
|
|
|
MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
|
|
|
RyanMqttCheck(NULL != subscriptionList, RyanMqttParamInvalidError, rlog_d);
|
|
|
memset(subscriptionList, 0, sizeof(MQTTSubscribeInfo_t) * count);
|
|
|
@@ -281,7 +284,9 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
subscriptionList[i].topicFilterLength = subscribeManyData[i].topicLen;
|
|
|
}
|
|
|
|
|
|
- { // 获取数据包大小
|
|
|
+ // 序列化数据包
|
|
|
+ {
|
|
|
+ // 获取数据包大小
|
|
|
status = MQTT_GetSubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
|
|
|
RyanMqttAssert(MQTTSuccess == status);
|
|
|
|
|
|
@@ -299,7 +304,8 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // ?mqtt空间接到suback时,会查找所有同名的然后删掉,这里不进行同名对比操作
|
|
|
+ // 创建每个msg主题的ack节点
|
|
|
+ // ?mqtt空间收到服务器的suback时,会查找所有同名的然后删掉,所以这里不进行同名对比操作
|
|
|
for (int32_t i = 0; i < count; i++)
|
|
|
{
|
|
|
// 创建msg包
|
|
|
@@ -326,8 +332,9 @@ RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
RyanMqttAckHandlerDestroy(client, userAckHandler);
|
|
|
goto __exit;
|
|
|
});
|
|
|
- RyanMqttMsgHandlerAddToMsgList(client,
|
|
|
- msgToListHandler); // 将msg信息添加到订阅链表上
|
|
|
+
|
|
|
+ // 将msg信息添加到订阅链表上
|
|
|
+ RyanMqttMsgHandlerAddToMsgList(client, msgToListHandler);
|
|
|
continue;
|
|
|
|
|
|
__exit:
|
|
|
@@ -338,6 +345,7 @@ __exit:
|
|
|
return RyanMqttNotEnoughMemError;
|
|
|
}
|
|
|
|
|
|
+ // 发送订阅主题包
|
|
|
// 如果发送失败就清除ack链表,创建ack链表必须在发送前
|
|
|
result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
|
|
|
RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
|
|
|
@@ -418,14 +426,17 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
MQTTFixedBuffer_t fixedBuffer = {0};
|
|
|
size_t remainingLength = 0;
|
|
|
|
|
|
+ // 校验参数合法性
|
|
|
RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != unSubscribeManyData, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(count > 0, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
-
|
|
|
for (int32_t i = 0; i < count; i++)
|
|
|
{
|
|
|
RyanMqttCheck(NULL != unSubscribeManyData[i].topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
}
|
|
|
|
|
|
+ // 申请 coreMqtt 支持的topic格式空间
|
|
|
MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
|
|
|
RyanMqttCheck(NULL != subscriptionList, RyanMqttParamInvalidError, rlog_d);
|
|
|
memset(subscriptionList, 0, sizeof(MQTTSubscribeInfo_t) * count);
|
|
|
@@ -436,27 +447,30 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
subscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
|
|
|
}
|
|
|
|
|
|
- // 获取数据包大小
|
|
|
- status = MQTT_GetUnsubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
|
|
|
- RyanMqttAssert(MQTTSuccess == status);
|
|
|
+ // 序列化数据包
|
|
|
+ {
|
|
|
+ // 获取数据包大小
|
|
|
+ status = MQTT_GetUnsubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
|
|
|
+ RyanMqttAssert(MQTTSuccess == status);
|
|
|
|
|
|
- // 申请数据包的空间
|
|
|
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
|
|
|
- RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d,
|
|
|
- { platformMemoryFree(subscriptionList); });
|
|
|
+ // 申请数据包的空间
|
|
|
+ fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
|
|
|
+ RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d,
|
|
|
+ { platformMemoryFree(subscriptionList); });
|
|
|
|
|
|
- // 序列化数据包
|
|
|
- packetId = RyanMqttGetNextPacketId(client);
|
|
|
- status = MQTT_SerializeUnsubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
|
|
|
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, {
|
|
|
- platformMemoryFree(subscriptionList);
|
|
|
- platformMemoryFree(fixedBuffer.pBuffer);
|
|
|
- });
|
|
|
+ // 序列化数据包
|
|
|
+ packetId = RyanMqttGetNextPacketId(client);
|
|
|
+ status = MQTT_SerializeUnsubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
|
|
|
+ RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, {
|
|
|
+ platformMemoryFree(subscriptionList);
|
|
|
+ platformMemoryFree(fixedBuffer.pBuffer);
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
// 查找当前主题是否已经订阅,没有订阅就取消发送
|
|
|
for (int32_t i = 0; i < count; i++)
|
|
|
{
|
|
|
- // !不判断是否订阅,统一都发送取消
|
|
|
+ // ?不判断是否订阅,统一都发送取消
|
|
|
result = RyanMqttMsgHandlerFind(client, subscriptionList[i].pTopicFilter,
|
|
|
subscriptionList[i].topicFilterLength, RyanMqttFalse, &subMsgHandler);
|
|
|
if (RyanMqttSuccessError == result)
|
|
|
@@ -479,14 +493,15 @@ RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
|
|
|
RyanMqttAckListAddToUserAckList(client, userAckHandler);
|
|
|
continue;
|
|
|
|
|
|
-__exit: {
|
|
|
- RyanMqttClearUnSubSession(client, packetId, i, subscriptionList);
|
|
|
- platformMemoryFree(subscriptionList);
|
|
|
- platformMemoryFree(fixedBuffer.pBuffer);
|
|
|
- return RyanMqttNotEnoughMemError;
|
|
|
-}
|
|
|
+__exit:
|
|
|
+ RyanMqttClearUnSubSession(client, packetId, i, subscriptionList);
|
|
|
+ platformMemoryFree(subscriptionList);
|
|
|
+ platformMemoryFree(fixedBuffer.pBuffer);
|
|
|
+ return RyanMqttNotEnoughMemError;
|
|
|
}
|
|
|
|
|
|
+ // 发送取消订阅主题包
|
|
|
+ // 如果发送失败就清除ack链表,创建ack链表必须在发送前
|
|
|
result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
|
|
|
RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
|
|
|
RyanMqttClearUnSubSession(client, packetId, count, subscriptionList);
|