| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142 |
- #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
- #include "RyanMqttClient.h"
- #include "RyanMqttThread.h"
- #include "RyanMqttUtil.h"
- #include "core_mqtt_serializer.h"
- /**
- * @brief mqtt初始化
- *
- * @param clientConfig
- * @param pClient mqtt客户端指针
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttClient_t *client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
- RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- RyanMqttMemset(client, 0, sizeof(RyanMqttClient_t));
- client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
- client->clientState = RyanMqttInitState;
- client->eventFlag = 0;
- client->ackHandlerCount = 0;
- RyanMqttBool_e criticalLockIsOk = RyanMqttFalse;
- RyanMqttBool_e sendLockIsOk = RyanMqttFalse;
- RyanMqttBool_e msgHandleLockIsOk = RyanMqttFalse;
- RyanMqttBool_e ackHandleLockIsOk = RyanMqttFalse;
- RyanMqttBool_e userSessionLockIsOk = RyanMqttFalse;
- RyanMqttBool_e networkIsOk = RyanMqttFalse;
- result = platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- criticalLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->sendLock); // 初始化发送缓冲区互斥锁
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- sendLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->msgHandleLock);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- msgHandleLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->ackHandleLock);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- ackHandleLockIsOk = RyanMqttTrue;
- result = platformMutexInit(client->config.userData, &client->userSessionLock);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- userSessionLockIsOk = RyanMqttTrue;
- result = platformNetworkInit(client->config.userData, &client->network); // 网络接口初始化
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- // networkIsOk = RyanMqttTrue;
- RyanMqttListInit(&client->msgHandlerList);
- RyanMqttListInit(&client->ackHandlerList);
- RyanMqttListInit(&client->userAckHandlerList);
- RyanMqttSetClientState(client, RyanMqttInitState);
- *pClient = client;
- return RyanMqttSuccessError;
- __exit:
- // 不能按空来判断,不是指针类型
- if (criticalLockIsOk)
- {
- platformCriticalDestroy(client->config.userData, &client->criticalLock);
- }
- if (sendLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->sendLock);
- }
- if (msgHandleLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->msgHandleLock);
- }
- if (ackHandleLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->ackHandleLock);
- }
- if (userSessionLockIsOk)
- {
- platformMutexDestroy(client->config.userData, &client->userSessionLock);
- }
- if (networkIsOk)
- {
- platformNetworkClose(client->config.userData, &client->network);
- platformNetworkDestroy(client->config.userData, &client->network);
- }
- platformMemoryFree(client);
- return result;
- }
- /**
- * @brief 销毁mqtt客户端
- * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
- * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
- * !mqtt删除自己前会调用 RyanMqttEventDestroyBefore 事件回调
- * !调用此函数后就不应该再对该客户端进行任何操作
- * ?这里用信号量通知mqtt线程是最好的,但是为了简化platform层,这里用标志位代替信号量
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- if (RyanMqttInitState == RyanMqttGetClientState(client))
- {
- RyanMqttPurgeClient(client);
- platformMemoryFree(client);
- }
- else
- {
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->destroyFlag = RyanMqttTrue;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- }
- return RyanMqttSuccessError;
- }
- /**
- * @brief 启动mqtt客户端
- * !不要重复调用
- *
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttInitState == RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- RyanMqttSetClientState(client, RyanMqttStartState);
- // 连接成功,需要初始化 MQTT 线程
- result = platformThreadInit(client->config.userData, &client->mqttThread, client->config.taskName,
- RyanMqttThread, client, client->config.taskStack, client->config.taskPrio);
- RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNoRescourceError, RyanMqttLog_d,
- { RyanMqttSetClientState(client, RyanMqttInitState); });
- return result;
- }
- /**
- * @brief 断开mqtt服务器连接
- *
- * @param client
- * @param sendDiscFlag
- * RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- if (RyanMqttTrue == sendDiscFlag)
- {
- MQTTStatus_t status;
- MQTTFixedBuffer_t fixedBuffer;
- // 获取断开连接的数据包大小
- status = MQTT_GetDisconnectPacketSize(&fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请断开连接数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- // 序列化断开连接数据包
- status = MQTT_SerializeDisconnect(&fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- // 发送断开连接数据包
- RyanMqttError_e result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- platformMemoryFree(fixedBuffer.pBuffer);
- }
- RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
- RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
- return RyanMqttSuccessError;
- }
- // todo 这里考虑要不要使用信号量来实现吧,会增加platform厚度,目前不想加。最好用自动重连
- // ?现在取巧使用线程的暂停和恢复实现,如果mqtt线程还没有暂停,用户就调用这个函数就会没有效果。
- // ?用户不用自动重连的话,也可以通过一直判断 client 的状态是不是为 RyanMqttDisconnectState 是的话可以调用
- // ?RyanMqttReconnect。 最推荐的是用自动重连
- /**
- * @brief 手动重连mqtt客户端
- * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
- *
- * @param client
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttDisconnectState == RyanMqttGetClientState(client), RyanMqttConnectError, RyanMqttLog_d);
- if (RyanMqttTrue == client->config.autoReconnectFlag)
- {
- return RyanMqttNoRescourceError;
- }
- result = platformThreadStart(client->config.userData, &client->mqttThread);
- RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- return result;
- }
- /**
- * @brief 订阅主题
- *
- * @param client
- * @param topic
- * @param qos
- * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
- RyanMqttSubscribeData_t subscribeManyData[])
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttMsgHandler_t *msgToListHandler;
- RyanMqttAckHandler_t *userAckHandler;
- MQTTFixedBuffer_t fixedBuffer;
- // 校验参数合法性
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- // 检查每个主题消息是否合法
- for (int32_t i = 0; i < count; i++)
- {
- RyanMqttCheck(NULL != subscribeManyData[i].topic && subscribeManyData[i].topicLen > 0,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttQos0 <= subscribeManyData[i].qos && RyanMqttQos2 >= subscribeManyData[i].qos,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- }
- // 申请 coreMqtt 支持的topic格式空间
- MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
- RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- for (int32_t i = 0; i < count; i++)
- {
- subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
- subscriptionList[i].pTopicFilter = subscribeManyData[i].topic;
- subscriptionList[i].topicFilterLength = subscribeManyData[i].topicLen;
- }
- // 序列化数据包
- {
- size_t remainingLength;
- // 获取数据包大小
- MQTTStatus_t status =
- MQTT_GetSubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
- { platformMemoryFree(subscriptionList); });
- // 序列化数据包
- packetId = RyanMqttGetNextPacketId(client);
- status = MQTT_SerializeSubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
- platformMemoryFree(subscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- });
- }
- // 创建每个msg主题的ack节点
- // ?mqtt空间收到服务器的suback时,会查找所有同名的然后删掉,所以这里不进行同名对比操作
- for (int32_t i = 0; i < count; i++)
- {
- // 创建msg包
- result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
- subscriptionList[i].topicFilterLength, packetId,
- (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { goto __RyanMqttSubCreateAckErrorExit; });
- result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_SUBACK, packetId, 0, NULL, msgHandler,
- &userAckHandler, RyanMqttFalse);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- goto __RyanMqttSubCreateAckErrorExit;
- });
- // 此函数不会失败
- RyanMqttAckListAddToUserAckList(client, userAckHandler);
- continue;
- __RyanMqttSubCreateAckErrorExit:
- // 创建 sub ack 数组时失败,查找所有同 packetId 的ack进行清除
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
- platformMemoryFree(subscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return RyanMqttNotEnoughMemError;
- }
- // ?创建msg包,3.8.4响应,允许服务端在发送 SUBACK 报文之前就开始发送与订阅匹配的 PUBLISH 报文。
- for (int32_t i = 0; i < count; i++)
- {
- result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
- subscriptionList[i].topicFilterLength, packetId,
- (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgToListHandler);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- // 将msg信息添加到订阅链表上
- RyanMqttMsgHandlerAddToMsgList(client, msgToListHandler);
- }
- // 发送订阅主题包
- // 如果发送失败就清除ack链表,创建ack链表必须在发送前
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- __exit:
- // 失败清除session
- if (RyanMqttSuccessError != result)
- {
- // 清除ack链表
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
- // 清除msg链表
- RyanMqttMsgHandler_t msgMatchCriteria;
- for (int32_t i = 0; i < count; i++)
- {
- msgMatchCriteria.topic = (char *)subscriptionList[i].pTopicFilter;
- msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
- msgMatchCriteria.packetId = packetId;
- RyanMqttMsgHandlerFindAndDestroyByPacketId(client, &msgMatchCriteria, RyanMqttFalse);
- }
- }
- platformMemoryFree(subscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return result;
- }
- /**
- * @brief 订阅主题
- *
- * @param client
- * @param topic
- * @param qos
- * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
- {
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttSubscribeData_t subscribeManyData = {.qos = qos, .topic = topic, .topicLen = RyanMqttStrlen(topic)};
- return RyanMqttSubscribeMany(client, 1, &subscribeManyData);
- }
- /**
- * @brief 取消订阅指定主题
- *
- * @param client
- * @param topic
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
- RyanMqttUnSubscribeData_t unSubscribeManyData[])
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- RyanMqttMsgHandler_t *subMsgHandler;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAckHandler_t *userAckHandler;
- MQTTFixedBuffer_t fixedBuffer;
- // 校验参数合法性
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != unSubscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- // 检查有效性
- for (int32_t i = 0; i < count; i++)
- {
- RyanMqttCheck(NULL != unSubscribeManyData[i].topic && unSubscribeManyData[i].topicLen > 0,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- }
- // 申请 coreMqtt 支持的topic格式空间
- MQTTSubscribeInfo_t *unSubscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
- RyanMqttCheck(NULL != unSubscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- for (int32_t i = 0; i < count; i++)
- {
- unSubscriptionList[i].qos = (MQTTQoS_t)RyanMqttSubFail; // 无效数据,仅用作占位符
- unSubscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
- unSubscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
- }
- // 序列化数据包
- {
- size_t remainingLength;
- // 获取数据包大小
- MQTTStatus_t status =
- MQTT_GetUnsubscribePacketSize(unSubscriptionList, count, &remainingLength, &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
- { platformMemoryFree(unSubscriptionList); });
- // 序列化数据包
- packetId = RyanMqttGetNextPacketId(client);
- status = MQTT_SerializeUnsubscribe(unSubscriptionList, count, packetId, remainingLength, &fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- });
- }
- // 创建ack
- for (int32_t i = 0; i < count; i++)
- {
- // ?不判断是否订阅,统一都发送取消
- RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)unSubscriptionList[i].pTopicFilter,
- .topicLen = unSubscriptionList[i].topicFilterLength};
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- result =
- RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler, RyanMqttFalse);
- if (RyanMqttSuccessError == result)
- {
- // !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
- // 同步msg qos等级,之后unsub回调使用
- unSubscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- result = RyanMqttMsgHandlerCreate(client, unSubscriptionList[i].pTopicFilter,
- unSubscriptionList[i].topicFilterLength, packetId,
- (RyanMqttQos_e)unSubscriptionList[i].qos, NULL, &msgHandler);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { goto __RyanMqttUnSubCreateAckErrorExit; });
- result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, 0, NULL, msgHandler,
- &userAckHandler, RyanMqttFalse);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- goto __RyanMqttUnSubCreateAckErrorExit;
- });
- // 此函数不会失败
- RyanMqttAckListAddToUserAckList(client, userAckHandler);
- continue;
- __RyanMqttUnSubCreateAckErrorExit:
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return RyanMqttNotEnoughMemError;
- }
- // 发送取消订阅主题包
- // 如果发送失败就清除ack链表,创建ack链表必须在发送前
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- });
- platformMemoryFree(unSubscriptionList);
- platformMemoryFree(fixedBuffer.pBuffer);
- return result;
- }
- /**
- * @brief 取消订阅指定主题
- *
- * @param client
- * @param topic
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
- {
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttUnSubscribeData_t subscribeManyData = {.topic = topic, .topicLen = RyanMqttStrlen(topic)};
- return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
- }
- RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen, char *payload,
- uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain,
- void *userData)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- uint16_t packetId;
- MQTTStatus_t status;
- MQTTFixedBuffer_t fixedBuffer;
- size_t remainingLength;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != topic && topicLen > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
- // 报文支持有效载荷长度为0
- if (payloadLen > 0 && NULL == payload)
- {
- return RyanMqttParamInvalidError;
- }
- // 序列化pub发送包
- MQTTPublishInfo_t publishInfo = {
- .qos = (MQTTQoS_t)qos,
- .pTopicName = topic,
- .topicNameLength = topicLen,
- .pPayload = payload,
- .payloadLength = payloadLen,
- .retain = retain,
- .dup = 0,
- };
- // 获取数据包大小
- status = MQTT_GetPublishPacketSize(&publishInfo, &remainingLength, &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- // qos0不需要 packetId
- if (RyanMqttQos0 == qos)
- {
- packetId = 0;
- }
- else
- {
- packetId = RyanMqttGetNextPacketId(client);
- }
- // 序列化数据包
- status = MQTT_SerializePublish(&publishInfo, packetId, remainingLength, &fixedBuffer);
- RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- if (RyanMqttQos0 == qos)
- {
- // 发送报文
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- platformMemoryFree(fixedBuffer.pBuffer);
- }
- else
- {
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttAckHandler_t *userAckHandler;
- uint8_t packetType = (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC;
- // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
- result = RyanMqttMsgHandlerCreate(client, publishInfo.pTopicName, publishInfo.topicNameLength,
- RyanMqttMsgInvalidPacketId, qos, userData, &msgHandler);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
- { platformMemoryFree(fixedBuffer.pBuffer); });
- result = RyanMqttAckHandlerCreate(client, packetType, packetId, fixedBuffer.size, fixedBuffer.pBuffer,
- msgHandler, &userAckHandler, RyanMqttTrue);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- platformMemoryFree(fixedBuffer.pBuffer);
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- });
- // 一定要先加再send,send一定在mqtt broker回复前执行完,要不可能线程调度mqtt返回消息会比添加ack更快执行
- RyanMqttAckListAddToUserAckList(client, userAckHandler);
- result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_e, {
- RyanMqttLog_e("RyanMqttSendPacket failed, clear user ack session");
- // userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
- RyanMqttClearAckSession(client, packetType, packetId);
- });
- }
- return result;
- }
- /**
- * @brief 客户端向服务端发送消息
- *
- * @param client
- * @param topic
- * @param payload
- * @param payloadLen
- * @param QOS
- * @param retain
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
- RyanMqttQos_e qos, RyanMqttBool_e retain)
- {
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
- return RyanMqttPublishWithUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain,
- NULL);
- }
- /**
- * @brief 获取已订阅主题
- * !此函数是非线程安全的,已不推荐使用
- * !请使用 RyanMqttGetSubscribeSafe 代替
- * !如果另一个线程在这个调用返回后立即取消订阅,topic将指向非法内存
- *
- * @param client
- * @param msgHandles 存放已订阅主题的空间
- * @param msgHandleSize 存放已订阅主题的空间大小个数
- * @param subscribeNum 函数内部返回已订阅主题的个数
- * @return RyanMqttState_e
- */
- RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize,
- int32_t *subscribeNum)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttList_t *curr, *next;
- RyanMqttMsgHandler_t *msgHandler;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, RyanMqttLog_d);
- *subscribeNum = 0;
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- if (*subscribeNum >= msgHandleSize)
- {
- result = RyanMqttNoRescourceError;
- break;
- }
- msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- msgHandles[*subscribeNum].topic = msgHandler->topic;
- msgHandles[*subscribeNum].qos = msgHandler->qos;
- (*subscribeNum)++;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- return result;
- }
- /**
- * @brief 安全的获取已订阅主题列表,仅可通过 RyanMqttSafeFreeSubscribeResources 进行安全释放。
- *
- * @param client
- * @param msgHandles
- * @param subscribeNum
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetSubscribeSafe(RyanMqttClient_t *client, RyanMqttMsgHandler_t **msgHandles,
- int32_t *subscribeNum)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttList_t *curr, *next;
- RyanMqttMsgHandler_t *msgHandler;
- int32_t subscribeTotal;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
- if (0 == subscribeTotal)
- {
- *msgHandles = NULL;
- *subscribeNum = 0;
- return RyanMqttSuccessError;
- }
- RyanMqttMsgHandler_t *msgHandlerArr = platformMemoryMalloc(sizeof(RyanMqttMsgHandler_t) * subscribeTotal);
- if (NULL == msgHandlerArr)
- {
- result = RyanMqttNotEnoughMemError;
- goto __exit;
- }
- int32_t subscribeCount = 0;
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- if (subscribeCount >= subscribeTotal)
- {
- break;
- }
- msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- RyanMqttMemcpy(&msgHandlerArr[subscribeCount], msgHandler, sizeof(RyanMqttMsgHandler_t));
- result = RyanMqttDupString(&msgHandlerArr[subscribeCount].topic, msgHandler->topic,
- msgHandler->topicLen);
- if (RyanMqttSuccessError != result)
- {
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- RyanMqttSafeFreeSubscribeResources(msgHandlerArr, subscribeCount);
- result = RyanMqttNotEnoughMemError;
- goto __exit;
- }
- subscribeCount++;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- *msgHandles = msgHandlerArr;
- *subscribeNum = subscribeCount;
- __exit:
- return result;
- }
- /**
- * @brief 安全释放订阅主题列表(禁止直接调用free函数)
- *
- * @param msgHandles
- * @param subscribeNum
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSafeFreeSubscribeResources(RyanMqttMsgHandler_t *msgHandles, int32_t subscribeNum)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
- // RyanMqttGetSubscribeTotalCount 内部调用的时候可以会等于0
- RyanMqttCheck(subscribeNum >= 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- for (int32_t i = 0; i < subscribeNum; i++)
- {
- // 不加null判断,因为如果是空,一定是用户程序内存访问越界了
- platformMemoryFree(msgHandles[i].topic);
- }
- platformMemoryFree(msgHandles);
- return result;
- }
- /**
- * @brief 获取已订阅主题个数
- *
- * @param client
- * @param subscribeTotalCount
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t *subscribeTotalCount)
- {
- RyanMqttList_t *curr, *next;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != subscribeTotalCount, RyanMqttParamInvalidError, RyanMqttLog_d);
- *subscribeTotalCount = 0;
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- (*subscribeTotalCount)++;
- }
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 获取mqtt客户端状态
- *
- * @param client
- * @return RyanMqttState_e
- */
- RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
- {
- if (NULL == client)
- {
- return RyanMqttInvalidState;
- }
- return RyanMqttGetClientState(client);
- }
- /**
- * @brief 获取 keepalive 剩余时间
- *
- * @param client
- * @param keepAliveRemain
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != keepAliveRemain, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- *keepAliveRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- static RyanMqttError_e RyanMqttClientConfigDeepCopy(RyanMqttClientConfig_t *destConfig,
- RyanMqttClientConfig_t *srcConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
- // 清除需要申请内存的字段
- uint16_t clientIdLen = RyanMqttStrlen(srcConfig->clientId) + 1;
- uint16_t userNameLen = 0;
- uint16_t passwordLen = 0;
- uint16_t hostLen = RyanMqttStrlen(srcConfig->host) + 1;
- uint16_t taskNameLen = RyanMqttStrlen(srcConfig->taskName) + 1;
- if (NULL != srcConfig->userName)
- {
- userNameLen += RyanMqttStrlen(srcConfig->userName) + 1;
- }
- if (NULL != srcConfig->password)
- {
- passwordLen += RyanMqttStrlen(srcConfig->password) + 1;
- }
- // 获取申请内存大小
- uint32_t mallocSize = clientIdLen + userNameLen + passwordLen + hostLen + taskNameLen;
- char *buf = (char *)platformMemoryMalloc(mallocSize);
- RyanMqttCheck(NULL != buf, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- RyanMqttMemset(buf, 0, mallocSize);
- // 拷贝数据
- RyanMqttMemcpy(destConfig, srcConfig, sizeof(RyanMqttClientConfig_t));
- uint32_t offset = 0;
- // 共同使用一块内存
- #define copyConfigFieldWithOffset(key, valueLen) \
- destConfig->key = buf + offset; \
- if ((valueLen) != 1) RyanMqttMemcpy(destConfig->key, srcConfig->key, valueLen); \
- offset += (valueLen);
- copyConfigFieldWithOffset(clientId, clientIdLen); // 必须第一个字段
- if (NULL != srcConfig->userName)
- {
- copyConfigFieldWithOffset(userName, userNameLen);
- }
- else
- {
- destConfig->userName = NULL;
- }
- if (NULL != srcConfig->password)
- {
- copyConfigFieldWithOffset(password, passwordLen);
- }
- else
- {
- destConfig->password = NULL;
- }
- copyConfigFieldWithOffset(host, hostLen);
- copyConfigFieldWithOffset(taskName, taskNameLen);
- return result;
- }
- /**
- * @brief 获取mqtt config
- * !非线程安全,多线程通过set和get数据可能会错乱甚至崩溃。
- * !使用完毕后,需要用户手动调用 RyanMqttFreeConfigFromGet 释放指针空间
- *
- * @param client
- * @param pclientConfig
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttClientConfig_t *clientConfig;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
- RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
- result = RyanMqttClientConfigDeepCopy(clientConfig, &client->config);
- RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- if (RyanMqttSuccessError == result)
- {
- *pclientConfig = clientConfig;
- }
- else
- {
- *pclientConfig = NULL;
- platformMemoryFree(clientConfig);
- }
- return result;
- }
- /**
- * @brief 释放通过 RyanMqttGetConfig 获取的配置信息 (禁止直接调用free函数)
- *
- * @param clientConfig
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttFreeConfigFromGet(RyanMqttClientConfig_t *clientConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttPurgeConfig(clientConfig);
- platformMemoryFree(clientConfig);
- return result;
- }
- // todo 增加更多校验,比如判断心跳包和recv的关系
- /**
- * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt
- * thread线程和用户线程的资源互斥
- *
- * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt
- * thread处于挂起状态时调用 mqtt thread处于阻塞状态时调用此函数也是很危险的行为
- * 要保证mqtt线程和用户线程的资源互斥
- * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
- *
- * !项目中用户不应频繁调用此函数
- *
- * 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
- *
- * @param client
- * @param clientConfig
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(clientConfig->recvTimeout <= (uint32_t)clientConfig->keepaliveTimeoutS * 1000 / 2,
- RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttClientConfig_t tempConfig;
- result = RyanMqttClientConfigDeepCopy(&tempConfig, clientConfig);
- RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- if (RyanMqttSuccessError == result)
- {
- // todo !因为这里是非线程安全的
- RyanMqttPurgeConfig(&client->config);
- client->config = tempConfig;
- }
- return result;
- }
- /**
- * @brief 设置遗嘱的配置信息
- * 此函数必须在发送connect报文前调用,因为遗嘱消息包含在connect报文中
- *
- * @param client
- * @param topicName
- * @param qos
- * @param retain
- * @param payload
- * @param payloadLen
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
- RyanMqttQos_e qos, RyanMqttBool_e retain)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != topicName && RyanMqttStrlen(topicName) > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
- // 报文支持有效载荷长度为0
- if (NULL == payload && payloadLen > 0)
- {
- return RyanMqttParamInvalidError;
- }
- platformMutexLock(client->config.userData, &client->userSessionLock);
- // 之前如果设置过遗嘱就进行资源释放,否则申请空间
- if (NULL == client->lwtOptions)
- {
- client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t));
- RyanMqttCheckCodeNoReturn(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
- result = RyanMqttNotEnoughMemError;
- goto __exit;
- });
- }
- else
- {
- if (NULL != client->lwtOptions->topic)
- {
- platformMemoryFree(client->lwtOptions->topic);
- }
- if (NULL != client->lwtOptions->payload)
- {
- platformMemoryFree(client->lwtOptions->payload);
- }
- }
- RyanMqttMemset(client->lwtOptions, 0, sizeof(lwtOptions_t));
- if (payloadLen > 0)
- {
- result = RyanMqttDupString(&client->lwtOptions->payload, payload, payloadLen);
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- }
- else
- {
- client->lwtOptions->payload = NULL;
- }
- result = RyanMqttDupString(&client->lwtOptions->topic, topicName, RyanMqttStrlen(topicName));
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
- client->lwtOptions->lwtFlag = RyanMqttTrue;
- client->lwtOptions->qos = qos;
- client->lwtOptions->retain = retain;
- client->lwtOptions->payloadLen = payloadLen;
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- return RyanMqttSuccessError;
- __exit:
- if (NULL != client->lwtOptions)
- {
- if (NULL != client->lwtOptions->topic)
- {
- platformMemoryFree(client->lwtOptions->topic);
- }
- if (NULL != client->lwtOptions->payload)
- {
- platformMemoryFree(client->lwtOptions->payload);
- }
- platformMemoryFree(client->lwtOptions);
- client->lwtOptions = NULL;
- }
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- return result;
- }
- /**
- * @brief 丢弃指定ack
- *
- * @param client
- * @param ackHandler
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(0 < packetId && packetId <= RyanMqttMaxPacketId, RyanMqttParamInvalidError, RyanMqttLog_d);
- // 删除pubrel记录
- result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
- if (RyanMqttSuccessError == result)
- {
- RyanMqttEventMachine(client, RyanMqttEventAckHandlerDiscard, (void *)ackHandler); // 回调函数
- RyanMqttAckHandlerDestroy(client, ackHandler);
- }
- return result;
- }
- RyanMqttError_e RyanMqttGetEventId(RyanMqttClient_t *client, RyanMqttEventId_e *eventId)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- RyanMqttCheck(NULL != eventId, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- *eventId = client->eventFlag;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->eventFlag |= eventId;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
- RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
- {
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->eventFlag &= ~eventId;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return RyanMqttSuccessError;
- }
|