| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669 |
- #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
- #include "RyanMqttThread.h"
- #include "RyanMqttLog.h"
- #include "RyanMqttUtil.h"
- /**
- * @brief qos1或者qos2接收消息成功确认处理
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *client,
- MQTTPacketInfo_t *pIncomingPacket)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttAssert(NULL != client);
- // 反序列化ack包
- MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- // 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
- result = RyanMqttAckListNodeFind(client, pIncomingPacket->type & 0xF0U, packetId, &ackHandler, RyanMqttTrue);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttLog_i("packetType: %02x, packetId: %d", pIncomingPacket->type & 0xF0U, packetId);
- });
- RyanMqttEventMachine(client, RyanMqttEventPublished, (void *)ackHandler); // 回调函数
- RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
- return result;
- }
- /**
- * @brief 发布释放处理函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttAssert(NULL != client);
- // 反序列化ack包
- MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- // 删除pubrel记录
- result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, packetId, &ackHandler, RyanMqttTrue);
- if (RyanMqttSuccessError == result)
- {
- RyanMqttAckHandlerDestroy(client, ackHandler);
- }
- // 制作确认数据包并发送
- uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
- MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
- // 序列化ack数据包
- status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBCOMP, packetId);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- // ?这里没法判断packetid是否非法,只能每次都回复咯
- // 每次收到PUBREL都返回消息
- // 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
- RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 发布收到处理函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttAssert(NULL != client);
- // 反序列化 pubrec 包
- MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_e);
- // 序列化 pubrel 包
- uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
- MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
- status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREL, packetId);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_e);
- // 若已存在 PUBCOMP ack,说明此前已处理过 PUBREC;仅重发 PUBREL 即可
- RyanMqttAckHandler_t *ackHandlerPubcomp;
- result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBCOMP, packetId, &ackHandlerPubcomp, RyanMqttFalse);
- if (RyanMqttSuccessError == result)
- {
- goto __next;
- }
- RyanMqttAckHandler_t *ackHandlerPubrec;
- result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREC, packetId, &ackHandlerPubrec, RyanMqttTrue);
- if (RyanMqttSuccessError == result)
- {
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAckHandler_t *ackHandlerNewPubcomp;
- // 首次收到消息,创建 pubcomp ack
- result = RyanMqttMsgHandlerCreate(client, ackHandlerPubrec->msgHandler->topic,
- ackHandlerPubrec->msgHandler->topicLen, RyanMqttMsgInvalidPacketId,
- ackHandlerPubrec->msgHandler->qos,
- ackHandlerPubrec->msgHandler->userData, &msgHandler);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { RyanMqttAckListAddToAckList(client, ackHandlerPubrec); });
- // 期望收到pubcomp否则会重复发送pubrel
- result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBCOMP, packetId,
- MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
- &ackHandlerNewPubcomp, RyanMqttFalse);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- RyanMqttAckListAddToAckList(client, ackHandlerPubrec);
- });
- RyanMqttAckListAddToAckList(client, ackHandlerNewPubcomp);
- RyanMqttAckHandlerDestroy(client, ackHandlerPubrec);
- }
- __next:
- // 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
- RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
- return result;
- }
- /**
- * @brief 收到服务器发布消息处理函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttMsgData_t msgData;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAssert(NULL != client);
- {
- // 反系列化 publish 消息
- MQTTPublishInfo_t publishInfo;
- MQTTStatus_t status = MQTT_DeserializePublish(pIncomingPacket, &packetId, &publishInfo);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttDeserializePacketError, RyanMqttLog_d);
- msgData.topic = (char *)publishInfo.pTopicName;
- msgData.topicLen = publishInfo.topicNameLength;
- msgData.packetId = packetId;
- msgData.payload = (char *)publishInfo.pPayload;
- msgData.payloadLen = publishInfo.payloadLength;
- msgData.qos = (RyanMqttQos_e)publishInfo.qos;
- msgData.retained = publishInfo.retain;
- msgData.dup = publishInfo.dup;
- // 查看订阅列表是否包含此消息主题,进行通配符匹配
- RyanMqttMsgHandler_t msgMatchCriteria = {.topic = msgData.topic, .topicLen = msgData.topicLen};
- result = RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttTrue, &msgHandler, RyanMqttFalse);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttLog_w("主题不匹配: %.*s", msgData.topicLen, msgData.topic);
- RyanMqttEventMachine(client, RyanMqttEventUnsubscribedData, (void *)&msgData);
- });
- }
- switch (msgData.qos)
- {
- case RyanMqttQos0: RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData); break;
- case RyanMqttQos1: {
- // 先分发消息,再回答ack
- RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
- uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
- MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
- // 序列化ack数据包
- MQTTStatus_t status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBACK, packetId);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- // 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
- RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
- }
- break;
- case RyanMqttQos2: // qos2采用方法B
- {
- RyanMqttAckHandler_t *ackHandler;
- uint8_t buffer[MQTT_PUBLISH_ACK_PACKET_SIZE];
- MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
- // !序列化ack数据包,必须先执行,因为创建ack需要用到这个报文
- MQTTStatus_t status = MQTT_SerializeAck(&fixedBuffer, MQTT_PACKET_TYPE_PUBREC, packetId);
- // 上面代码不太可能出错,如果出错了就让服务器重新发送吧
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- // 收到 publish 就期望收到 PUBREL .
- // 如果 PUBREL 报文已经存在说明不是首次收到 publish,不进行qos2 PUBREC消息处理
- result = RyanMqttAckListNodeFind(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId, &ackHandler,
- RyanMqttFalse);
- if (RyanMqttSuccessError != result)
- {
- // 第一次收到 PUBREL 报文
- RyanMqttEventMachine(client, RyanMqttEventData, (void *)&msgData);
- // 期望下一次收到 PUBREL 报文
- result = RyanMqttMsgHandlerCreate(client, msgData.topic, msgData.topicLen,
- RyanMqttMsgInvalidPacketId, msgData.qos, NULL, &msgHandler);
- RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_PUBREL, msgData.packetId,
- MQTT_PUBLISH_ACK_PACKET_SIZE, fixedBuffer.pBuffer, msgHandler,
- &ackHandler, RyanMqttFalse);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { RyanMqttMsgHandlerDestroy(client, msgHandler); });
- RyanMqttAckListAddToAckList(client, ackHandler);
- }
- else
- {
- // 非首次收到同一 packetId 的 PUBLISH(正常重传场景),不再分发
- RyanMqttLog_d("Duplicate QoS2 PUBLISH, packetId: %d", msgData.packetId);
- }
- // 无论是不是第一次收到,都回复 pub ack报文
- // 不管结果,因为失败了也没有好的处理形式,等待broker重发吧
- RyanMqttSendPacket(client, fixedBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE);
- }
- break;
- default: RyanMqttLog_w("Unhandled QoS level: %d", msgData.qos); break;
- }
- return result;
- }
- /**
- * @brief 订阅确认处理函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
- {
- uint16_t packetId;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttList_t *curr, *next;
- RyanMqttAssert(NULL != client);
- // 反序列化ack包,MQTTSuccess 和 MQTTServerRefused 都是成功的
- // coreMqtt 检测到qos等级为 0x80 就会返回 MQTTServerRefused
- MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
- RyanMqttCheck(MQTTSuccess == status || MQTTServerRefused == status, RyanMqttDeserializePacketError,
- RyanMqttLog_d);
- // 检查ack的msgCount和返回消息的msgCount是否一致
- {
- // MQTT_DeserializeAck 会保证 pIncomingPacket->remainingLength >= 3
- uint32_t statusCount = pIncomingPacket->remainingLength - sizeof(uint16_t);
- uint32_t ackMsgCount = 0;
- // ?使用ack或msg遍历都行,使用msg更容易测试出问题,遍历性能也会更好一些
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- if (packetId == msgHandler->packetId)
- {
- ackMsgCount++;
- }
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- // 服务器回复的ack数和记录的ack数不一致就清除所有ack
- RyanMqttCheckCode(ackMsgCount == statusCount, RyanMqttNoRescourceError, RyanMqttLog_d, {
- // 清除所有ack
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
- // 清除所有msg
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- if (packetId == msgHandler->packetId)
- {
- RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- }
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- });
- }
- // 到这里说明ackCount和msgCount是一致的
- RyanMqttQos_e subscriptionQos;
- uint32_t ackMsgIndex = 0;
- const uint8_t *pStatusStart = &pIncomingPacket->pRemainingData[sizeof(uint16_t)];
- // todo 这里效率非常低,订阅属于用的少的功能,暂时可以接受
- // 查找ack句柄
- platformMutexLock(client->config.userData, &client->ackHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
- {
- ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
- if (packetId != ackHandler->packetId)
- {
- continue;
- }
- // 处理非订阅ack
- if (MQTT_PACKET_TYPE_SUBACK != ackHandler->packetType)
- {
- RyanMqttLog_e("packetType error: %02x", ackHandler->packetType);
- goto __next;
- }
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- // 查找同名订阅但是packetid不一样的进行删除,保证订阅主题列表只有一个最新的
- RyanMqttMsgHandlerFindAndDestroyByPacketId(client, ackHandler->msgHandler, RyanMqttTrue);
- // 到这里就可以保证没有同名订阅了
- // 查找之前记录的topic句柄,根据服务器授权Qos进行更新
- // 几乎不可能查找不到,可以查找到 ackHandler 就一定有 msgHandler
- RyanMqttError_e result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler, RyanMqttFalse,
- &msgHandler, RyanMqttFalse);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- goto __next;
- });
- // 解析服务端授权 QoS(0,1,2)或失败(0x80)
- subscriptionQos = pStatusStart[ackMsgIndex++];
- switch (subscriptionQos)
- {
- case RyanMqttQos0:
- case RyanMqttQos1:
- case RyanMqttQos2:
- // 到这里说明订阅成功,更新 QoS 并清除临时 packetId
- msgHandler->qos = subscriptionQos;
- msgHandler->packetId = RyanMqttMsgInvalidPacketId;
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- // mqtt回调函数
- RyanMqttEventMachine(client, RyanMqttEventSubscribed, (void *)ackHandler->msgHandler);
- break;
- case RyanMqttSubFail:
- default:
- // 订阅失败,服务器拒绝;删除并通知失败
- RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- // mqtt事件回调
- RyanMqttEventMachine(client, RyanMqttEventSubscribedFailed, (void *)ackHandler->msgHandler);
- break;
- }
- __next:
- RyanMqttAckListRemoveToAckList(client, ackHandler);
- RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
- }
- platformMutexUnLock(client->config.userData, &client->ackHandleLock);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 取消订阅确认处理函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttMsgHandler_t *subMsgHandler;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttList_t *curr, *next;
- uint16_t packetId;
- RyanMqttAssert(NULL != client);
- // 反序列化ack包
- MQTTStatus_t status = MQTT_DeserializeAck(pIncomingPacket, &packetId, NULL);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- // todo 这里效率低,取消订阅属于用的少的功能,暂时可以接受
- platformMutexLock(client->config.userData, &client->ackHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
- {
- ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
- if (packetId != ackHandler->packetId)
- {
- continue;
- }
- // 必须先判断packetId是否相等,再判断类型
- if (MQTT_PACKET_TYPE_UNSUBACK != ackHandler->packetType)
- {
- goto __next;
- }
- // 查找当前主题是否已经订阅,进行取消订阅
- result = RyanMqttMsgHandlerFind(client, ackHandler->msgHandler, RyanMqttFalse, &subMsgHandler,
- RyanMqttTrue);
- if (RyanMqttSuccessError == result)
- {
- ackHandler->msgHandler->qos = subMsgHandler->qos;
- RyanMqttMsgHandlerDestroy(client, subMsgHandler);
- }
- // mqtt事件回调
- RyanMqttEventMachine(client, RyanMqttEventUnSubscribed, (void *)ackHandler->msgHandler);
- __next:
- RyanMqttAckListRemoveToAckList(client, ackHandler);
- RyanMqttAckHandlerDestroy(client, ackHandler); // 销毁ackHandler
- }
- platformMutexUnLock(client->config.userData, &client->ackHandleLock);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 将用户空间的ack链表搬到mqtt线程空间
- *
- * @param client
- */
- static void RyanMqttSyncUserAckHandle(RyanMqttClient_t *client)
- {
- RyanMqttAckHandler_t *userAckHandler;
- RyanMqttList_t *curr, *next;
- platformMutexLock(client->config.userData, &client->userSessionLock);
- RyanMqttListForEachSafe(curr, next, &client->userAckHandlerList)
- {
- // 获取此节点的结构体
- userAckHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
- RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
- RyanMqttAckListAddToAckList(client, userAckHandler);
- }
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- }
- RyanMqttError_e RyanMqttGetPacketInfo(RyanMqttClient_t *client, MQTTPacketInfo_t *pIncomingPacket)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttAssert(NULL != client);
- uint8_t pBuffer[5]; // MQTT 固定报头最大 5 字节
- uint8_t needReadSize = 2;
- size_t readIndex = 0;
- MQTTStatus_t status;
- // // todo 可以考虑增加包大小限制,目前不准备加,错误需要更复杂的实现
- // MQTTStatus_t status =
- // MQTT_GetIncomingPacketTypeAndLength(coreMqttTransportRecv, &pNetworkContext, pIncomingPacket);
- do
- {
- // 第一次直接读取 2 个字节
- result = RyanMqttRecvPacket(client, pBuffer + readIndex, needReadSize);
- if (RyanMqttRecvPacketTimeOutError == result)
- {
- goto __next; // 超时直接退出
- }
- else if (RyanMqttSuccessError != result)
- {
- RyanMqttLog_e("读取固定报头失败");
- goto __next;
- }
- readIndex += needReadSize; // 更新读取位置
- // 尝试解析
- status = MQTT_ProcessIncomingPacketTypeAndLength(pBuffer, &readIndex, pIncomingPacket);
- if (MQTTNeedMoreBytes == status)
- {
- needReadSize = 3; // 最多还需要 3 个字节
- // ?冗余,header最多为5字节,理论上不可能发生的,所以还是使用上面的形式吧
- // if (readIndex >= sizeof(pBuffer))
- // {
- // result = RyanMqttFailedError;
- // goto __next;
- // }
- // needReadSize = sizeof(pBuffer) - readIndex;
- continue;
- }
- if (MQTTSuccess != status)
- {
- RyanMqttLog_e("解析固定报头失败 %d", status);
- result = RyanMqttDeserializePacketError;
- goto __next;
- }
- if (pIncomingPacket->remainingLength <= 0)
- {
- break; // 不包含可变长度报文
- }
- // 申请 payload 的空间
- pIncomingPacket->pRemainingData = platformMemoryMalloc(pIncomingPacket->remainingLength);
- RyanMqttCheckCode(NULL != pIncomingPacket->pRemainingData, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
- result = RyanMqttNotEnoughMemError;
- goto __next;
- });
- // 如果固定报头解析时已经多读了 payload 的一部分
- uint8_t alreadyRead = readIndex - pIncomingPacket->headerLength;
- for (uint8_t i = 0; i < alreadyRead; i++)
- {
- pIncomingPacket->pRemainingData[i] = *(pBuffer + pIncomingPacket->headerLength + i);
- }
- // // ? memcpy可能性能更高
- // if (alreadyRead > 0)
- // {
- // RyanMqttMemcpy(pIncomingPacket->pRemainingData, (char *)pBuffer + pIncomingPacket->headerLength,
- // alreadyRead);
- // }
- // 读取剩余 payload
- if (alreadyRead < pIncomingPacket->remainingLength)
- {
- result = RyanMqttRecvPacket(client, pIncomingPacket->pRemainingData + alreadyRead,
- pIncomingPacket->remainingLength - alreadyRead);
- // 返回 result 没错
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- platformMemoryFree(pIncomingPacket->pRemainingData);
- pIncomingPacket->pRemainingData = NULL;
- goto __next;
- });
- }
- break;
- } while (1);
- __next:
- // 先同步用户接口的ack链表
- RyanMqttSyncUserAckHandle(client);
- return result;
- }
- /**
- * @brief mqtt数据包处理函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttProcessPacketHandler(RyanMqttClient_t *client)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- MQTTPacketInfo_t pIncomingPacket = {0}; // 下面有非空判断
- RyanMqttAssert(NULL != client);
- result = RyanMqttGetPacketInfo(client, &pIncomingPacket);
- if (RyanMqttRecvPacketTimeOutError == result)
- {
- RyanMqttLog_d("没有待处理的数据包");
- goto __exit;
- }
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttSerializePacketError, RyanMqttLog_d,
- { goto __exit; });
- RyanMqttLog_d("pIncomingPacket.type: %x ", pIncomingPacket.type & 0xF0U);
- // 控制报文类型
- // QoS2 使用官方推荐的方法B
- // 发送者QoS2动作 发布PUBLISH报文 -> 等待PUBREC报文 -> 发送PUBREL报文 -> 等待PUBCOMP报文
- // 接收者QoS2动作 等待PUBLISH报文 -> 发送PUBREC报文 -> 等待PUBREL报文 -> 发送PUBCOMP报文
- switch (pIncomingPacket.type & 0xF0U)
- {
- case MQTT_PACKET_TYPE_PUBLISH: // 接收到订阅消息
- result = RyanMqttPublishPacketHandler(client, &pIncomingPacket);
- break;
- case MQTT_PACKET_TYPE_PINGRESP: // 心跳响应
- RyanMqttRefreshKeepaliveTime(client);
- result = RyanMqttSuccessError;
- break;
- case MQTT_PACKET_TYPE_PUBACK: // 客户端发送QoS 1消息,服务端发布收到确认
- case MQTT_PACKET_TYPE_PUBCOMP: // 发送QOS2 发布完成
- result = RyanMqttPubackAndPubcompPacketHandler(client, &pIncomingPacket);
- break;
- case MQTT_PACKET_TYPE_PUBREC: // 客户端发送QOS2,服务端发布PUBREC,需要客户端继续发送PUBREL
- result = RyanMqttPubrecPacketHandler(client, &pIncomingPacket);
- break;
- case (MQTT_PACKET_TYPE_PUBREL & 0xF0U): // 客户端接收QOS2 已经发布PUBREC,等待服务器发布释放 pubrel
- result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
- // !RyanMqttGetPacketInfo 检查报文type错误时不会进行返回,所以下面逻辑暂时没用
- // // PUBREL 控制报文固定报头的第 3,2,1,0
- // // 位必须被设置为0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接
- // if (pIncomingPacket.type & 0x02U)
- // {
- // result = RyanMqttPubrelPacketHandler(client, &pIncomingPacket);
- // }
- // else
- // {
- // RyanMqttLog_e("PUBREL 控制报文固定报头的第 3,2,1,0 "
- // "位必须被设置为0,0,1,0。必须将其它的任何值都当做是不合法的并关闭网络连接");
- // RyanMqttConnectStatus_e connectState = RyanMqttConnectInvalidPacketError;
- // RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
- // result = RyanMqttInvalidPacketError;
- // }
- break;
- case MQTT_PACKET_TYPE_SUBACK: // 订阅确认
- result = RyanMqttSubackHandler(client, &pIncomingPacket);
- break;
- case MQTT_PACKET_TYPE_UNSUBACK: // 取消订阅确认
- result = RyanMqttUnSubackHandler(client, &pIncomingPacket);
- break;
- case MQTT_PACKET_TYPE_CONNACK: // 连接报文确认
- {
- // ?没必要这么严格,考虑兼容性多一些吧
- // // ?客户端已处于连接状态时又收到CONNACK报文,应该视为严重错误,断开连接
- // RyanMqttLog_e("收到 CONNACK 时已连接,正在断开连接");
- // RyanMqttConnectStatus_e connectState = RyanMqttConnectProtocolError;
- // RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
- // result = RyanMqttHaveRescourceError;
- }
- break;
- default:
- RyanMqttLog_w("Unhandled packet type: 0x%02X", pIncomingPacket.type & 0xF0U);
- result = RyanMqttDeserializePacketError;
- break;
- }
- __exit:
- if (NULL != pIncomingPacket.pRemainingData)
- {
- platformMemoryFree(pIncomingPacket.pRemainingData);
- }
- return result;
- }
|