| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119 |
- #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
- #include "RyanMqttClient.h"
- #include "RyanMqttThread.h"
- #include "RyanMqttUtil.h"
- #include "core_mqtt_serializer.h"
- /**
- * @brief mqtt初始化
- *
- * @param clientConfig
- * @param pClient mqtt客户端指针
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttClient_t *client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
- RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- RyanMqttMemset(client, 0, sizeof(RyanMqttClient_t));
- client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
- client->clientState = RyanMqttInitState;
- client->eventFlag = 0;
- client->ackHandlerCount = 0;
- RyanMqttBool_e criticalLockIsOk = RyanMqttFalse;
- RyanMqttBool_e sendLockIsOk = RyanMqttFalse;
- RyanMqttBool_e msgHandleLockIsOk = RyanMqttFalse;
- RyanMqttBool_e ackHandleLockIsOk = RyanMqttFalse;
- RyanMqttBool_e userSessionLockIsOk = RyanMqttFalse;
- RyanMqttBool_e networkIsOk = RyanMqttFalse;
- result = platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- criticalLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->sendLock); // 初始化发送缓冲区互斥锁
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- sendLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->msgHandleLock);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- msgHandleLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->ackHandleLock);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- ackHandleLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->userSessionLock);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- userSessionLockIsOk = RyanMqttTrue;
- result = platformNetworkInit(client->config.userData, &client->network); // 网络接口初始化
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- // networkIsOk = RyanMqttTrue;
- RyanMqttListInit(&client->msgHandlerList);
- RyanMqttListInit(&client->ackHandlerList);
- RyanMqttListInit(&client->userAckHandlerList);
- RyanMqttSetClientState(client, RyanMqttInitState);
- *pClient = client;
- return RyanMqttSuccessError;
- __exit:
- // 不能按空来判断,不是指针类型
- if (criticalLockIsOk)
- {
- platformCriticalDestroy(client->config.userData, &client->criticalLock);
- }
- if (sendLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->sendLock);
- }
- if (msgHandleLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->msgHandleLock);
- }
- if (ackHandleLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->ackHandleLock);
- }
- if (userSessionLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->userSessionLock);
- }
- if (networkIsOk)
- {
- platformNetworkClose(client->config.userData, &client->network);
- platformNetworkDestroy(client->config.userData, &client->network);
- }
- platformMemoryFree(client);
- return result;
- }
- /**
- * @brief 销毁mqtt客户端
- * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
- * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
- * !mqtt删除自己前会调用 RyanMqttEventDestroyBefore 事件回调
- * !调用此函数后就不应该再对该客户端进行任何操作
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->destroyFlag = RyanMqttTrue;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 启动mqtt客户端
- * !不要重复调用
- *
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttInitState == RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- RyanMqttSetClientState(client, RyanMqttStartState);
- // 连接成功,需要初始化 MQTT 线程
- result = platformThreadInit(client->config.userData, &client->mqttThread, client->config.taskName,
- RyanMqttThread, client, client->config.taskStack, client->config.taskPrio);
- RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNoRescourceError, RyanMqttLog_d,
- { RyanMqttSetClientState(client, RyanMqttInitState); });
- return result;
- }
- /**
- * @brief 断开mqtt服务器连接
- *
- * @param client
- * @param sendDiscFlag
- * RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- if (RyanMqttTrue == sendDiscFlag)
- {
- MQTTStatus_t status;
- MQTTFixedBuffer_t fixedBuffer;
- // 获取断开连接的数据包大小
- status = MQTT_GetDisconnectPacketSize(&fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请断开连接数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- // 序列化断开连接数据包
- status = MQTT_SerializeDisconnect(&fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- // 发送断开连接数据包
- RyanMqttError_e result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- platformMemoryFree(fixedBuffer.pBuffer);
- }
- RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
- RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
- return RyanMqttSuccessError;
- }
- // todo 这里考虑要不要使用信号量来实现吧,会增加platform厚度,目前不想加。最好用自动重连
- // ?现在取巧使用线程的暂停和恢复实现,如果mqtt线程还没有暂停,用户就调用这个函数就会没有效果。
- // ?用户不用自动重连的话,也可以通过一直判断 client 的状态是不是为 RyanMqttDisconnectState 是的话可以调用
- // ?RyanMqttReconnect。 最推荐的是用自动重连
- /**
- * @brief 手动重连mqtt客户端
- * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
- *
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttDisconnectState == RyanMqttGetClientState(client), RyanMqttConnectError, RyanMqttLog_d);
- if (RyanMqttTrue == client->config.autoReconnectFlag)
- {
- return RyanMqttNoRescourceError;
- }
- result = platformThreadStart(client->config.userData, &client->mqttThread);
- RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- return result;
- }
- /**
- * @brief 订阅主题
- *
- * @param client
- * @param topic
- * @param qos
- * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
- RyanMqttSubscribeData_t subscribeManyData[])
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttMsgHandler_t *msgToListHandler;
- RyanMqttAckHandler_t *userAckHandler;
- MQTTFixedBuffer_t fixedBuffer;
- // 校验参数合法性
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- // 检查每个主题消息是否合法
- for (int32_t i = 0; i < count; i++)
- {
- RyanMqttCheck(NULL != subscribeManyData[i].topic && subscribeManyData[i].topicLen > 0,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttQos0 <= subscribeManyData[i].qos && RyanMqttQos2 >= subscribeManyData[i].qos,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- }
- // 申请 coreMqtt 支持的topic格式空间
- MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
- RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- for (int32_t i = 0; i < count; i++)
- {
- subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
- subscriptionList[i].pTopicFilter = subscribeManyData[i].topic;
- subscriptionList[i].topicFilterLength = subscribeManyData[i].topicLen;
- }
- // 序列化数据包
- {
- size_t remainingLength;
- // 获取数据包大小
- MQTTStatus_t status =
- MQTT_GetSubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
- { platformMemoryFree(subscriptionList); });
- // 序列化数据包
- packetId = RyanMqttGetNextPacketId(client);
- status = MQTT_SerializeSubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
- platformMemoryFree(subscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- });
- }
- // 创建每个msg主题的ack节点
- // ?mqtt空间收到服务器的suback时,会查找所有同名的然后删掉,所以这里不进行同名对比操作
- for (int32_t i = 0; i < count; i++)
- {
- // 创建msg包
- result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
- subscriptionList[i].topicFilterLength, packetId,
- (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { goto __RyanMqttSubCreateAckErrorExit; });
- result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_SUBACK, packetId, 0, NULL, msgHandler,
- &userAckHandler, RyanMqttFalse);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- goto __RyanMqttSubCreateAckErrorExit;
- });
- // 此函数不会失败
- RyanMqttAckListAddToUserAckList(client, userAckHandler);
- continue;
- __RyanMqttSubCreateAckErrorExit:
- // 创建 sub ack 数组时失败,查找所有同 packetId 的ack进行清除
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
- platformMemoryFree(subscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return RyanMqttNotEnoughMemError;
- }
- // ?创建msg包,3.8.4响应,允许服务端在发送 SUBACK 报文之前就开始发送与订阅匹配的 PUBLISH 报文。
- for (int32_t i = 0; i < count; i++)
- {
- result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
- subscriptionList[i].topicFilterLength, packetId,
- (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgToListHandler);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- // 将msg信息添加到订阅链表上
- RyanMqttMsgHandlerAddToMsgList(client, msgToListHandler);
- }
- // 发送订阅主题包
- // 如果发送失败就清除ack链表,创建ack链表必须在发送前
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- __exit:
- // 失败清除session
- if (RyanMqttSuccessError != result)
- {
- // 清除ack链表
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
- // 清除msg链表
- RyanMqttMsgHandler_t msgMatchCriteria;
- for (int32_t i = 0; i < count; i++)
- {
- msgMatchCriteria.topic = (char *)subscriptionList[i].pTopicFilter;
- msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
- msgMatchCriteria.packetId = packetId;
- RyanMqttMsgHandlerFindAndDestroyByPacketId(client, &msgMatchCriteria, RyanMqttFalse);
- }
- }
- platformMemoryFree(subscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return result;
- }
- /**
- * @brief 订阅主题
- *
- * @param client
- * @param topic
- * @param qos
- * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
- {
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttSubscribeData_t subscribeManyData = {.qos = qos, .topic = topic, .topicLen = RyanMqttStrlen(topic)};
- return RyanMqttSubscribeMany(client, 1, &subscribeManyData);
- }
- /**
- * @brief 取消订阅指定主题
- *
- * @param client
- * @param topic
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
- RyanMqttUnSubscribeData_t unSubscribeManyData[])
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttMsgHandler_t *subMsgHandler;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAckHandler_t *userAckHandler;
- MQTTFixedBuffer_t fixedBuffer;
- // 校验参数合法性
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != unSubscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- // 检查有效性
- for (int32_t i = 0; i < count; i++)
- {
- RyanMqttCheck(NULL != unSubscribeManyData[i].topic && unSubscribeManyData[i].topicLen > 0,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- }
- // 申请 coreMqtt 支持的topic格式空间
- MQTTSubscribeInfo_t *unSubscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
- RyanMqttCheck(NULL != unSubscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- for (int32_t i = 0; i < count; i++)
- {
- unSubscriptionList[i].qos = (MQTTQoS_t)RyanMqttSubFail; // 无效数据,仅用作占位符
- unSubscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
- unSubscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
- }
- // 序列化数据包
- {
- size_t remainingLength;
- // 获取数据包大小
- MQTTStatus_t status =
- MQTT_GetUnsubscribePacketSize(unSubscriptionList, count, &remainingLength, &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
- { platformMemoryFree(unSubscriptionList); });
- // 序列化数据包
- packetId = RyanMqttGetNextPacketId(client);
- status = MQTT_SerializeUnsubscribe(unSubscriptionList, count, packetId, remainingLength, &fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- });
- }
- // 创建ack
- for (int32_t i = 0; i < count; i++)
- {
- // ?不判断是否订阅,统一都发送取消
- RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)unSubscriptionList[i].pTopicFilter,
- .topicLen = unSubscriptionList[i].topicFilterLength};
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- result =
- RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler, RyanMqttFalse);
- if (RyanMqttSuccessError == result)
- {
- // !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
- // 同步msg qos等级,之后unsub回调使用
- unSubscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- result = RyanMqttMsgHandlerCreate(client, unSubscriptionList[i].pTopicFilter,
- unSubscriptionList[i].topicFilterLength, packetId,
- (RyanMqttQos_e)unSubscriptionList[i].qos, NULL, &msgHandler);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { goto __RyanMqttUnSubCreateAckErrorExit; });
- result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, 0, NULL, msgHandler,
- &userAckHandler, RyanMqttFalse);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- goto __RyanMqttUnSubCreateAckErrorExit;
- });
- // 此函数不会失败
- RyanMqttAckListAddToUserAckList(client, userAckHandler);
- continue;
- __RyanMqttUnSubCreateAckErrorExit:
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return RyanMqttNotEnoughMemError;
- }
- // 发送取消订阅主题包
- // 如果发送失败就清除ack链表,创建ack链表必须在发送前
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- });
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return result;
- }
- /**
- * @brief 取消订阅指定主题
- *
- * @param client
- * @param topic
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
- {
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttUnSubscribeData_t subscribeManyData = {.topic = topic, .topicLen = RyanMqttStrlen(topic)};
- return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
- }
- RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen, char *payload,
- uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain,
- void *userData)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- MQTTStatus_t status;
- MQTTFixedBuffer_t fixedBuffer;
- size_t remainingLength;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != topic && topicLen > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- // 报文支持有效载荷长度为0
- if (payloadLen > 0 && NULL == payload)
- {
- return RyanMqttParamInvalidError;
- }
- // 序列化pub发送包
- MQTTPublishInfo_t publishInfo = {
- .qos = (MQTTQoS_t)qos,
- .pTopicName = topic,
- .topicNameLength = topicLen,
- .pPayload = payload,
- .payloadLength = payloadLen,
- .retain = retain,
- .dup = 0,
- };
- // 获取数据包大小
- status = MQTT_GetPublishPacketSize(&publishInfo, &remainingLength, &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- // qos0不需要 packetId
- if (RyanMqttQos0 == qos)
- {
- packetId = 0;
- }
- else
- {
- packetId = RyanMqttGetNextPacketId(client);
- }
- // 序列化数据包
- status = MQTT_SerializePublish(&publishInfo, packetId, remainingLength, &fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- if (RyanMqttQos0 == qos)
- {
- // 发送报文
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- platformMemoryFree(fixedBuffer.pBuffer);
- }
- else
- {
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAckHandler_t *userAckHandler;
- uint8_t packetType = (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC;
- // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
- result = RyanMqttMsgHandlerCreate(client, publishInfo.pTopicName, publishInfo.topicNameLength,
- RyanMqttMsgInvalidPacketId, qos, userData, &msgHandler);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- result = RyanMqttAckHandlerCreate(client, packetType, packetId, fixedBuffer.size, fixedBuffer.pBuffer,
- msgHandler, &userAckHandler, RyanMqttTrue);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- platformMemoryFree(fixedBuffer.pBuffer);
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- });
- // 一定要先加再send,send一定在mqtt broker回复前执行完,要不可能线程调度mqtt返回消息会比添加ack更快执行
- RyanMqttAckListAddToUserAckList(client, userAckHandler);
- result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_e, {
- RyanMqttLog_e("RyanMqttSendPacket failed, clear user ack session");
- // userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
- RyanMqttClearAckSession(client, packetType, packetId);
- });
- }
- return result;
- }
- /**
- * @brief 客户端向服务端发送消息
- *
- * @param client
- * @param topic
- * @param payload
- * @param payloadLen
- * @param QOS
- * @param retain
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
- RyanMqttQos_e qos, RyanMqttBool_e retain)
- {
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
- return RyanMqttPublishWithUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain,
- NULL);
- }
- /**
- * @brief 获取已订阅主题
- * !此函数是非线程安全的,已不推荐使用
- * !请使用 RyanMqttGetSubscribeSafe 代替
- * !如果另一个线程在这个调用返回后立即取消订阅,topic将指向非法内存
- *
- * @param client
- * @param msgHandles 存放已订阅主题的空间
- * @param msgHandleSize 存放已订阅主题的空间大小个数
- * @param subscribeNum 函数内部返回已订阅主题的个数
- * @return RyanMqttState_e
- */
- RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize,
- int32_t *subscribeNum)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttList_t *curr, *next;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, RyanMqttLog_d);
- *subscribeNum = 0;
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- if (*subscribeNum >= msgHandleSize)
- {
- result = RyanMqttNoRescourceError;
- break;
- }
- msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- msgHandles[*subscribeNum].topic = msgHandler->topic;
- msgHandles[*subscribeNum].qos = msgHandler->qos;
- (*subscribeNum)++;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- return result;
- }
- /**
- * @brief 安全的获取已订阅主题列表,仅可通过 RyanMqttSafeFreeSubscribeResources 进行安全释放。
- *
- * @param client
- * @param msgHandles
- * @param subscribeNum
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetSubscribeSafe(RyanMqttClient_t *client, RyanMqttMsgHandler_t **msgHandles,
- int32_t *subscribeNum)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttList_t *curr, *next;
- RyanMqttMsgHandler_t *msgHandler;
- int32_t subscribeTotal;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
- if (0 == subscribeTotal)
- {
- *msgHandles = NULL;
- *subscribeNum = 0;
- return RyanMqttSuccessError;
- }
- RyanMqttMsgHandler_t *msgHandlerArr = platformMemoryMalloc(sizeof(RyanMqttMsgHandler_t) * subscribeTotal);
- if (NULL == msgHandlerArr)
- {
- result = RyanMqttNotEnoughMemError;
- goto __exit;
- }
- int32_t subscribeCount = 0;
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- if (subscribeCount >= subscribeTotal)
- {
- break;
- }
- msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- RyanMqttMemcpy(&msgHandlerArr[subscribeCount], msgHandler, sizeof(RyanMqttMsgHandler_t));
- result = RyanMqttDupString(&msgHandlerArr[subscribeCount].topic, msgHandler->topic,
- msgHandler->topicLen);
- if (RyanMqttSuccessError != result)
- {
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- RyanMqttSafeFreeSubscribeResources(msgHandlerArr, subscribeCount);
- result = RyanMqttNotEnoughMemError;
- goto __exit;
- }
- subscribeCount++;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- *msgHandles = msgHandlerArr;
- *subscribeNum = subscribeCount;
- __exit:
- return result;
- }
- /**
- * @brief 安全释放订阅主题列表(禁止直接调用free函数)
- *
- * @param msgHandles
- * @param subscribeNum
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSafeFreeSubscribeResources(RyanMqttMsgHandler_t *msgHandles, int32_t subscribeNum)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
- // RyanMqttGetSubscribeTotalCount 内部调用的时候可以会等于0
- RyanMqttCheck(subscribeNum >= 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- for (int32_t i = 0; i < subscribeNum; i++)
- {
- // 不加null判断,因为如果是空,一定是用户程序内存访问越界了
- platformMemoryFree(msgHandles[i].topic);
- }
- platformMemoryFree(msgHandles);
- return result;
- }
- /**
- * @brief 获取已订阅主题个数
- *
- * @param client
- * @param subscribeTotalCount
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t *subscribeTotalCount)
- {
- RyanMqttList_t *curr, *next;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeTotalCount, RyanMqttParamInvalidError, RyanMqttLog_d);
- *subscribeTotalCount = 0;
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- (*subscribeTotalCount)++;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 获取mqtt客户端状态
- *
- * @param client
- * @return RyanMqttState_e
- */
- RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
- {
- if (NULL == client)
- {
- return RyanMqttInvalidState;
- }
- return RyanMqttGetClientState(client);
- }
- /**
- * @brief 获取 keepalive 剩余时间
- *
- * @param client
- * @param keepAliveRemain
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != keepAliveRemain, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- *keepAliveRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- static RyanMqttError_e RyanMqttClientConfigDeepCopy(RyanMqttClientConfig_t *destConfig,
- RyanMqttClientConfig_t *srcConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
- RyanMqttMemcpy(destConfig, srcConfig, sizeof(RyanMqttClientConfig_t));
- destConfig->clientId = NULL;
- destConfig->userName = NULL;
- destConfig->password = NULL;
- destConfig->host = NULL;
- destConfig->taskName = NULL;
- result = RyanMqttDupString(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- if (NULL != srcConfig->userName)
- {
- result = RyanMqttDupString(&destConfig->userName, srcConfig->userName,
- RyanMqttStrlen(srcConfig->userName));
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- }
- if (NULL != srcConfig->password)
- {
- result = RyanMqttDupString(&destConfig->password, srcConfig->password,
- RyanMqttStrlen(srcConfig->password));
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- }
- result = RyanMqttDupString(&destConfig->host, srcConfig->host, RyanMqttStrlen(srcConfig->host));
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- result = RyanMqttDupString(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- __exit:
- if (RyanMqttSuccessError != result)
- {
- RyanMqttPurgeConfig(destConfig);
- }
- return result;
- }
- /**
- * @brief 获取mqtt config
- * !非线程安全,多线程通过set和get数据可能会错乱甚至崩溃。
- * !使用完毕后,需要用户手动调用 RyanMqttFreeConfigFromGet 释放指针空间
- *
- * @param client
- * @param pclientConfig
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttClientConfig_t *clientConfig;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
- RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- result = RyanMqttClientConfigDeepCopy(clientConfig, &client->config);
- RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- if (RyanMqttSuccessError == result)
- {
- *pclientConfig = clientConfig;
- }
- else
- {
- *pclientConfig = NULL;
- platformMemoryFree(clientConfig);
- }
- return result;
- }
- /**
- * @brief 释放通过 RyanMqttGetConfig 获取的配置信息 (禁止直接调用free函数)
- *
- * @param clientConfig
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttFreeConfigFromGet(RyanMqttClientConfig_t *clientConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttPurgeConfig(clientConfig);
- platformMemoryFree(clientConfig);
- return result;
- }
- // todo 增加更多校验,比如判断心跳包和recv的关系
- /**
- * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt
- * thread线程和用户线程的资源互斥
- *
- * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt
- * thread处于挂起状态时调用 mqtt thread处于阻塞状态时调用此函数也是很危险的行为
- * 要保证mqtt线程和用户线程的资源互斥
- * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
- *
- * !项目中用户不应频繁调用此函数
- *
- * 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
- *
- * @param client
- * @param clientConfig
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(clientConfig->recvTimeout <= (uint32_t)clientConfig->keepaliveTimeoutS * 1000 / 2,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttClientConfig_t tempConfig;
- result = RyanMqttClientConfigDeepCopy(&tempConfig, clientConfig);
- RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- if (RyanMqttSuccessError == result)
- {
- // todo !因为这里是非线程安全的
- RyanMqttPurgeConfig(&client->config);
- client->config = tempConfig;
- }
- return result;
- }
- /**
- * @brief 设置遗嘱的配置信息
- * 此函数必须在发送connect报文前调用,因为遗嘱消息包含在connect报文中
- *
- * @param client
- * @param topicName
- * @param qos
- * @param retain
- * @param payload
- * @param payloadLen
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
- RyanMqttQos_e qos, RyanMqttBool_e retain)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
- // 报文支持有效载荷长度为0
- if (NULL == payload && payloadLen > 0)
- {
- return RyanMqttParamInvalidError;
- }
- platformMutexLock(client->config.userData, &client->userSessionLock);
- // 之前如果设置过遗嘱就进行资源释放,否则申请空间
- if (NULL == client->lwtOptions)
- {
- client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t));
- RyanMqttCheckCodeNoReturn(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
- result = RyanMqttNotEnoughMemError;
- goto __exit;
- });
- }
- else
- {
- if (NULL != client->lwtOptions->topic)
- {
- platformMemoryFree(client->lwtOptions->topic);
- }
- if (NULL != client->lwtOptions->payload)
- {
- platformMemoryFree(client->lwtOptions->payload);
- }
- }
- RyanMqttMemset(client->lwtOptions, 0, sizeof(lwtOptions_t));
- if (payloadLen > 0)
- {
- result = RyanMqttDupString(&client->lwtOptions->payload, payload, payloadLen);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- }
- else
- {
- client->lwtOptions->payload = NULL;
- }
- result = RyanMqttDupString(&client->lwtOptions->topic, topicName, RyanMqttStrlen(topicName));
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- client->lwtOptions->lwtFlag = RyanMqttTrue;
- client->lwtOptions->qos = qos;
- client->lwtOptions->retain = retain;
- client->lwtOptions->payloadLen = payloadLen;
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- return RyanMqttSuccessError;
- __exit:
- if (NULL != client->lwtOptions)
- {
- if (NULL != client->lwtOptions->topic)
- {
- platformMemoryFree(client->lwtOptions->topic);
- }
- if (NULL != client->lwtOptions->payload)
- {
- platformMemoryFree(client->lwtOptions->payload);
- }
- platformMemoryFree(client->lwtOptions);
- client->lwtOptions = NULL;
- }
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- return result;
- }
- /**
- * @brief 丢弃指定ack
- *
- * @param client
- * @param ackHandler
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(0 < packetId && packetId <= RyanMqttMaxPacketId, RyanMqttParamInvalidError, RyanMqttLog_d);
- // 删除pubrel记录
- result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
- if (RyanMqttSuccessError == result)
- {
- RyanMqttEventMachine(client, RyanMqttEventAckHandlerDiscard, (void *)ackHandler); // 回调函数
- RyanMqttAckHandlerDestroy(client, ackHandler);
- }
- return result;
- }
- RyanMqttError_e RyanMqttGetEventId(RyanMqttClient_t *client, RyanMqttEventId_e *eventId)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != eventId, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- *eventId = client->eventFlag;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->eventFlag |= eventId;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->eventFlag &= ~eventId;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
|