| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
- #include "RyanMqttUtil.h"
- #include "RyanMqttLog.h"
- #include "RyanMqttThread.h"
- #ifdef RyanMqttLinuxTestEnable
- #include "RyanMqttTest.h"
- #endif
- /**
- * @brief 字符串拷贝,需要手动释放内存
- *
- * @param dest
- * @param src
- * @param strLen
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttDupString(char **dest, const char *src, uint32_t strLen)
- {
- RyanMqttAssert(NULL != dest);
- RyanMqttAssert(NULL != src);
- RyanMqttCheck(0 != strLen, RyanMqttFailedError, RyanMqttLog_d);
- *dest = NULL;
- char *s = (char *)platformMemoryMalloc(strLen + 1);
- if (NULL == s)
- {
- return RyanMqttNotEnoughMemError;
- }
- RyanMqttMemcpy(s, src, strLen);
- s[strLen] = '\0';
- *dest = s;
- return RyanMqttSuccessError;
- }
- /**
- * @brief mqtt读取报文,此函数仅Mqtt线程进行调用
- *
- * @param client
- * @param buf
- * @param length
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, uint8_t *recvBuf, uint32_t recvLen)
- {
- uint32_t offset = 0;
- int32_t recvResult = 0;
- uint32_t timeOut = client->config.recvTimeout;
- RyanMqttTimer_t timer;
- RyanMqttAssert(NULL != client);
- RyanMqttAssert(NULL != recvBuf);
- RyanMqttAssert(0 != recvLen);
- // 如果需要处理ack,就缩短读取超时时间,避免阻塞太久(保留用户配置的上限)
- if (RyanMqttTrue == client->pendingAckFlag)
- {
- if (client->config.recvTimeout > 100)
- {
- timeOut = 100;
- }
- else
- {
- timeOut = client->config.recvTimeout;
- }
- }
- RyanMqttTimerCutdown(&timer, timeOut);
- while ((offset < recvLen) && (timeOut > 0))
- {
- recvResult =
- platformNetworkRecvAsync(client->config.userData, &client->network, (char *)(recvBuf + offset),
- (size_t)(recvLen - offset), (int32_t)timeOut);
- if (recvResult < 0)
- {
- break;
- }
- offset += recvResult;
- timeOut = RyanMqttTimerRemain(&timer);
- }
- // RyanMqttLog_d("offset: %d, recvLen: %d, recvResult: %d", offset, recvLen, recvResult);
- // 错误
- if (recvResult < 0)
- {
- RyanMqttConnectStatus_e connectState = RyanMqttConnectNetWorkFail;
- RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
- RyanMqttLog_d("recv错误, result: %d", recvResult);
- return RyanSocketFailedError;
- }
- // 读取超时
- if (offset != recvLen)
- {
- return RyanMqttRecvPacketTimeOutError;
- }
- #ifdef RyanMqttLinuxTestEnable
- RyanMqttTestEnableCritical();
- if (RyanMqttTrue == isEnableRandomNetworkFault)
- {
- randomCount++;
- if (randomCount >= RyanRand(10, 100))
- {
- randomCount = 0;
- RyanMqttTestExitCritical();
- // printf("模拟接收超时\r\n");
- return RyanMqttRecvPacketTimeOutError;
- }
- }
- RyanMqttTestExitCritical();
- #endif
- return RyanMqttSuccessError;
- }
- /**
- * @brief mqtt发送报文
- *
- * @param client
- * @param buf
- * @param length
- * @return RyanMqttError_e
- */
- RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, uint8_t *sendBuf, uint32_t sendLen)
- {
- uint32_t offset = 0;
- int32_t sendResult = 0;
- uint32_t timeOut = client->config.sendTimeout;
- RyanMqttTimer_t timer;
- RyanMqttAssert(NULL != client);
- RyanMqttAssert(NULL != sendBuf);
- RyanMqttAssert(0 != sendLen);
- #ifdef RyanMqttLinuxTestEnable
- RyanMqttTestEnableCritical();
- if (RyanMqttTrue == isEnableRandomNetworkFault)
- {
- sendRandomCount++;
- if (sendRandomCount >= RyanRand(1, 10))
- {
- sendRandomCount = 0;
- RyanMqttTestExitCritical();
- // printf("模拟发送超时\r\n");
- return RyanMqttSendPacketTimeOutError;
- }
- }
- RyanMqttTestExitCritical();
- #endif
- RyanMqttTimerCutdown(&timer, timeOut);
- platformMutexLock(client->config.userData, &client->sendLock); // 获取互斥锁
- while ((offset < sendLen) && (timeOut > 0))
- {
- sendResult =
- platformNetworkSendAsync(client->config.userData, &client->network, (char *)(sendBuf + offset),
- (size_t)(sendLen - offset), (int32_t)timeOut);
- if (-1 == sendResult)
- {
- break;
- }
- offset += sendResult;
- timeOut = RyanMqttTimerRemain(&timer);
- }
- platformMutexUnLock(client->config.userData, &client->sendLock); // 释放互斥锁
- if (sendResult < 0)
- {
- RyanMqttConnectStatus_e connectState = RyanMqttConnectNetWorkFail;
- RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
- return RyanSocketFailedError;
- }
- // 发送超时
- if (offset != sendLen)
- {
- return RyanMqttSendPacketTimeOutError;
- }
- // 发送数据成功就刷新 keepalive 时间
- RyanMqttRefreshKeepaliveTime(client);
- return RyanMqttSuccessError;
- }
- /**
- * @brief 设置mqtt客户端状态
- *
- * @param client
- * @param state
- */
- void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state)
- {
- RyanMqttAssert(NULL != client);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- client->clientState = state;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- }
- /**
- * @brief 获取mqtt客户端状态
- *
- * @param client
- * @return RyanMqttState_e
- */
- RyanMqttState_e RyanMqttGetClientState(RyanMqttClient_t *client)
- {
- RyanMqttAssert(NULL != client);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- RyanMqttState_e state = client->clientState;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return state;
- }
- /**
- * @brief 清理session
- *
- * @param client
- */
- void RyanMqttPurgeSession(RyanMqttClient_t *client)
- {
- RyanMqttList_t *curr, *next;
- RyanMqttAssert(NULL != client);
- // 释放所有msg_handler_list内存
- platformMutexLock(client->config.userData, &client->msgHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
- {
- RyanMqttMsgHandler_t *msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
- RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
- RyanMqttMsgHandlerDestroy(client, msgHandler);
- }
- RyanMqttListDelInit(&client->msgHandlerList);
- platformMutexUnLock(client->config.userData, &client->msgHandleLock);
- // 释放所有ackHandler_list内存
- platformMutexLock(client->config.userData, &client->ackHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
- {
- RyanMqttAckHandler_t *ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
- RyanMqttAckListRemoveToAckList(client, ackHandler);
- RyanMqttAckHandlerDestroy(client, ackHandler);
- }
- RyanMqttListDelInit(&client->ackHandlerList);
- client->ackHandlerCount = 0;
- platformMutexUnLock(client->config.userData, &client->ackHandleLock);
- // 释放所有userAckHandler_list内存
- platformMutexLock(client->config.userData, &client->userSessionLock);
- RyanMqttListForEachSafe(curr, next, &client->userAckHandlerList)
- {
- RyanMqttAckHandler_t *userAckHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
- RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
- RyanMqttAckHandlerDestroy(client, userAckHandler);
- }
- RyanMqttListDelInit(&client->userAckHandlerList);
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- }
- /**
- * @brief 销毁mqtt客户端Config信息
- *
- * @param clientConfig
- */
- void RyanMqttPurgeConfig(RyanMqttClientConfig_t *clientConfig)
- {
- RyanMqttAssert(NULL != clientConfig);
- if (clientConfig->clientId)
- {
- platformMemoryFree(clientConfig->clientId);
- }
- }
- /**
- * @brief 销毁mqtt客户端资源
- *
- * @param client
- */
- void RyanMqttPurgeClient(RyanMqttClient_t *client)
- {
- RyanMqttAssert(NULL != client);
- // 关闭销毁网络组件
- platformNetworkClose(client->config.userData, &client->network);
- platformNetworkDestroy(client->config.userData, &client->network);
- // 清除config信息
- RyanMqttPurgeConfig(&client->config);
- // 清除遗嘱相关配置
- if (NULL != client->lwtOptions)
- {
- if (NULL != client->lwtOptions->payload)
- {
- platformMemoryFree(client->lwtOptions->payload);
- }
- if (NULL != client->lwtOptions->topic)
- {
- platformMemoryFree(client->lwtOptions->topic);
- }
- platformMemoryFree(client->lwtOptions);
- }
- // 清除session ack链表和msg链表
- RyanMqttPurgeSession(client);
- // 清除互斥锁
- platformMutexDestroy(client->config.userData, &client->sendLock);
- platformMutexDestroy(client->config.userData, &client->msgHandleLock);
- platformMutexDestroy(client->config.userData, &client->ackHandleLock);
- platformMutexDestroy(client->config.userData, &client->userSessionLock);
- // 清除临界区
- platformCriticalDestroy(client->config.userData, &client->criticalLock);
- }
- /**
- * @brief 初始化计时器
- *
- * @param platformTimer
- */
- void RyanMqttTimerInit(RyanMqttTimer_t *platformTimer)
- {
- platformTimer->timeOut = 0;
- platformTimer->time = 0;
- }
- /**
- * @brief 添加计数时间
- *
- * @param platformTimer
- * @param timeout
- */
- void RyanMqttTimerCutdown(RyanMqttTimer_t *platformTimer, uint32_t timeout)
- {
- platformTimer->time = platformUptimeMs();
- platformTimer->timeOut = timeout;
- }
- /**
- * @brief 获取设置的超时时间
- *
- * @param platformTimer
- */
- uint32_t RyanMqttTimerGetConfigTimeout(RyanMqttTimer_t *platformTimer)
- {
- return platformTimer->timeOut;
- }
- /**
- * @brief 计算time还有多长时间超时,考虑了32位溢出判断
- *
- * @param platformTimer
- * @return uint32_t 返回剩余时间,超时返回0
- */
- uint32_t RyanMqttTimerRemain(RyanMqttTimer_t *platformTimer)
- {
- uint32_t elapsed = platformUptimeMs() - platformTimer->time; // 计算内部自动绕回
- // 如果已过超时时间,返回 0
- if (elapsed >= platformTimer->timeOut)
- {
- return 0;
- }
- // 否则返回剩余时间
- return platformTimer->timeOut - elapsed;
- }
- /**
- * @brief 获取报文标识符,报文标识符不可为0
- * 都在sendbuf锁内调用
- * @param client
- * @return uint16_t
- */
- uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
- {
- uint16_t packetId;
- RyanMqttAssert(NULL != client);
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- if (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1)
- {
- client->packetId = 1;
- }
- else
- {
- client->packetId++;
- }
- packetId = client->packetId;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- return packetId;
- }
- const char *RyanMqttStrError(int32_t state)
- {
- const char *str;
- switch (state)
- {
- case RyanMqttRecvPacketTimeOutError: str = "读取数据超时"; break;
- case RyanMqttParamInvalidError: str = "无效参数"; break;
- case RyanSocketFailedError: str = "套接字失败"; break;
- case RyanMqttSendPacketError: str = "数据包发送失败"; break;
- case RyanMqttSerializePacketError: str = "序列化报文失败"; break;
- case RyanMqttDeserializePacketError: str = "反序列化报文失败"; break;
- case RyanMqttNoRescourceError: str = "没有资源"; break;
- case RyanMqttHaveRescourceError: str = "资源已存在"; break;
- case RyanMqttNotConnectError: str = "mqttClient没有连接"; break;
- case RyanMqttConnectError: str = "mqttClient已经连接"; break;
- case RyanMqttRecvBufToShortError: str = "接收缓冲区不足"; break;
- case RyanMqttSendBufToShortError: str = "发送缓冲区不足"; break;
- case RyanMqttSocketConnectFailError: str = "socket连接失败"; break;
- case RyanMqttNotEnoughMemError: str = "动态内存不足"; break;
- case RyanMqttFailedError: str = "mqtt失败, 详细信息请看函数内部"; break;
- case RyanMqttSuccessError: str = "mqtt成功, 详细信息请看函数内部"; break;
- case RyanMqttConnectRefusedProtocolVersion: str = "mqtt断开连接, 服务端不支持客户端请求的 MQTT 协议级别"; break;
- case RyanMqttConnectRefusedIdentifier: str = "mqtt断开连接, 不合格的客户端标识符"; break;
- case RyanMqttConnectRefusedServer: str = "mqtt断开连接, 服务端不可用"; break;
- case RyanMqttConnectRefusedUsernamePass: str = "mqtt断开连接, 无效的用户名或密码"; break;
- case RyanMqttConnectRefusedNotAuthorized: str = "mqtt断开连接, 连接已拒绝,未授权"; break;
- case RyanMqttConnectClientInvalid: str = "mqtt断开连接, 客户端处于无效状态"; break;
- case RyanMqttConnectNetWorkFail: str = "mqtt断开连接, 网络错误"; break;
- case RyanMqttConnectDisconnected: str = "mqtt断开连接, mqtt客户端断开连接"; break;
- case RyanMqttKeepaliveTimeout: str = "mqtt断开连接, 心跳超时断开连接"; break;
- case RyanMqttConnectUserDisconnected: str = "mqtt断开连接, 用户手动断开连接"; break;
- case RyanMqttConnectTimeout: str = "mqtt断开连接, connect超时断开"; break;
- default: str = "未知错误描述"; break;
- }
- return str;
- }
|