| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
- // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
- #include "RyanMqttThread.h"
- #include "RyanMqttLog.h"
- #include "RyanMqttUtil.h"
- // mqtt标准是1.5倍,大部分mqtt服务器也是这个配置
- #define RyanMqttKeepAliveMultiplier (1.5)
- void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
- {
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- uint32_t timeout = (uint32_t)(client->config.keepaliveTimeoutS * 1000 * RyanMqttKeepAliveMultiplier);
- RyanMqttTimerCutdown(&client->keepaliveTimer, timeout); // 启动心跳定时器
- platformCriticalExit(client->config.userData, &client->criticalLock);
- }
- /**
- * @brief mqtt心跳保活
- *
- * @param client
- * @return int32_t
- */
- static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
- {
- RyanMqttAssert(NULL != client);
- // mqtt没有连接就退出
- if (RyanMqttConnectState != RyanMqttGetClientState(client))
- {
- return RyanMqttNotConnectError;
- }
- uint32_t timeRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
- // 当剩余时间大于 recvtimeout 并且小于 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
- if (timeRemain > (uint32_t)(client->config.recvTimeout + 100))
- {
- // 当没有到达 keepaliveTimeoutS 的 0.9 倍时间时不进行发送心跳包
- // timeRemain 是剩余时间,所以 timeRemain > RyanMqttKeepAliveMultiplier - 0.9 就是还没有到达0.9倍时间
- if (timeRemain > client->config.keepaliveTimeoutS * 1000 * (RyanMqttKeepAliveMultiplier - 0.9))
- {
- return RyanMqttSuccessError;
- }
- // 节流时间内不发送心跳报文
- if (RyanMqttTimerRemain(&client->keepaliveThrottleTimer))
- {
- return RyanMqttSuccessError;
- }
- }
- // 超过设置的 1.5 倍心跳周期,主动通知用户断开连接
- if (0 == timeRemain)
- {
- RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
- RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
- RyanMqttLog_d("ErrorCode: %d, strError: %s", RyanMqttKeepaliveTimeout,
- RyanMqttStrError(RyanMqttKeepaliveTimeout));
- return RyanMqttFailedError;
- }
- // 发送mqtt心跳包
- {
- // MQTT_PACKET_PINGREQ_SIZE
- uint8_t buffer[2];
- MQTTFixedBuffer_t fixedBuffer = {.pBuffer = buffer, .size = sizeof(buffer)};
- // 序列化数据包
- MQTTStatus_t status = MQTT_SerializePingreq(&fixedBuffer);
- RyanMqttCheck(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d);
- RyanMqttError_e result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
- RyanMqttTimerCutdown(&client->keepaliveThrottleTimer,
- client->config.recvTimeout + 1500); // 启动心跳检查节流定时器
- }
- return RyanMqttSuccessError;
- }
- // todo 也可以考虑将发送操作独立出去,异步发送,目前没有遇到性能瓶颈,需要超高性能的时候再考虑吧
- /**
- * @brief 遍历ack链表,进行相应的处理
- *
- * @param client
- * @param waitFlag
- * waitFlag : RyanMqttFalse 表示不需要等待超时立即处理这些数据包。通常在重新连接后立即进行处理
- * waitFlag : RyanMqttTrue 表示需要等待超时再处理这些消息,一般是稳定连接下的超时处理
- */
- static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e waitFlag)
- {
- RyanMqttList_t *curr, *next;
- RyanMqttAckHandler_t *ackHandler;
- RyanMqttTimer_t ackScanRemainTimer;
- uint32_t ackScanThrottleTime = 1000; // ack扫描节流最长一秒
- RyanMqttAssert(NULL != client);
- // mqtt没有连接就退出
- if (RyanMqttConnectState != RyanMqttGetClientState(client))
- {
- return;
- }
- // 节流时间内不检查ack链表
- if (RyanMqttTimerRemain(&client->ackScanThrottleTimer))
- {
- return;
- }
- // 设置scan最大处理时间定时器
- uint32_t ackScanWindowMs;
- if (client->config.recvTimeout > 100)
- {
- ackScanWindowMs = client->config.recvTimeout - 100;
- }
- else
- {
- ackScanWindowMs = client->config.recvTimeout;
- }
- RyanMqttTimerCutdown(&ackScanRemainTimer, ackScanWindowMs);
- platformMutexLock(client->config.userData, &client->ackHandleLock);
- RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
- {
- // 需要再判断一次
- if (RyanMqttConnectState != RyanMqttGetClientState(client))
- {
- continue;
- }
- // 超过最大处理时间,直接跳出处理函数,等待下次再处理
- if (0 == RyanMqttTimerRemain(&ackScanRemainTimer))
- {
- break;
- }
- // 获取此节点的结构体
- ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
- // ack响应没有超时就不进行处理
- uint32_t ackRemainTime = RyanMqttTimerRemain(&ackHandler->timer);
- if (0 != ackRemainTime)
- {
- // 如果ack剩余时间小于节流时间,就把ack剩余时间更新到节流上
- if (ackRemainTime < ackScanThrottleTime)
- {
- ackScanThrottleTime = ackRemainTime;
- }
- if (RyanMqttTrue == waitFlag)
- {
- continue;
- }
- }
- switch (ackHandler->packetType)
- {
- // 发送qos1 / qos2消息, 服务器ack响应超时。需要重新发送它们。
- case MQTT_PACKET_TYPE_PUBACK: // qos1 publish后没有收到puback
- case MQTT_PACKET_TYPE_PUBREC: // qos2 publish后没有收到pubrec
- case MQTT_PACKET_TYPE_PUBREL: // qos2 收到pubrec,发送pubrel后没有收到pubcomp
- case MQTT_PACKET_TYPE_PUBCOMP: // 理论不会出现,冗余措施
- {
- // 设置重发标志位
- if (0 == ackHandler->repeatCount && ackHandler->packet)
- {
- MQTT_UpdateDuplicatePublishFlag(ackHandler->packet, true);
- }
- // 重发数据事件回调
- RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
- //? 发送失败也是重试,所以这里不进行错误判断
- RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
- // 重置ack超时时间
- RyanMqttTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
- ackHandler->repeatCount++;
- // 重发次数超过警告值回调
- if (ackHandler->repeatCount >= client->config.ackHandlerRepeatCountWarning)
- {
- RyanMqttEventMachine(client, RyanMqttEventAckRepeatCountWarning, (void *)ackHandler);
- }
- break;
- }
- // 订阅 / 取消订阅超时就认为失败
- case MQTT_PACKET_TYPE_SUBACK: {
- RyanMqttMsgHandler_t *msgMatchCriteria = ackHandler->msgHandler;
- RyanMqttMsgHandlerFindAndDestroyByPacketId(client, msgMatchCriteria, RyanMqttFalse);
- RyanMqttEventMachine(client, RyanMqttEventSubscribedFailed, (void *)ackHandler->msgHandler);
- RyanMqttAckListRemoveToAckList(client, ackHandler);
- RyanMqttAckHandlerDestroy(client, ackHandler); // 清除句柄
- break;
- }
- case MQTT_PACKET_TYPE_UNSUBACK: {
- RyanMqttEventMachine(client, RyanMqttEventUnSubscribedFailed, (void *)ackHandler->msgHandler);
- RyanMqttAckListRemoveToAckList(client, ackHandler);
- RyanMqttAckHandlerDestroy(client, ackHandler); // 清除句柄
- break;
- }
- default: {
- RyanMqttLog_e("不应该出现的值: %d", ackHandler->packetType);
- RyanMqttAssert(NULL); // 不应该为别的值
- break;
- }
- }
- }
- platformMutexUnLock(client->config.userData, &client->ackHandleLock);
- // 扫描链表没有超时时,才设置scan节流定时器
- if (RyanMqttTimerRemain(&ackScanRemainTimer))
- {
- // 启动ack scan节流定时器
- RyanMqttTimerCutdown(&client->ackScanThrottleTimer, ackScanThrottleTime);
- client->pendingAckFlag = RyanMqttFalse;
- }
- else
- {
- client->pendingAckFlag = RyanMqttTrue;
- }
- }
- /**
- * @brief mqtt连接函数
- *
- * @param client
- * @return RyanMqttError_e
- */
- static RyanMqttError_e RyanMqttConnectBroker(RyanMqttClient_t *client, RyanMqttConnectStatus_e *connectState)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- MQTTStatus_t status;
- MQTTConnectInfo_t connectInfo;
- MQTTPublishInfo_t willInfo;
- MQTTFixedBuffer_t fixedBuffer = {0};
- size_t remainingLength;
- RyanMqttBool_e lwtFlag;
- RyanMqttAssert(NULL != client);
- RyanMqttAssert(NULL != connectState);
- RyanMqttCheckCodeNoReturn(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectError,
- RyanMqttLog_d, {
- result = RyanMqttNoRescourceError;
- *connectState = RyanMqttConnectClientInvalid;
- goto __exit;
- });
- // 填充 connect 信息
- {
- // 无需判断config有效性,如果无效一定是用户内存访问越界了
- // RyanMqtt不允许 pClientIdentifier 为NULL
- connectInfo.pClientIdentifier = client->config.clientId;
- connectInfo.clientIdentifierLength = RyanMqttStrlen(client->config.clientId);
- connectInfo.pUserName = client->config.userName;
- if (connectInfo.pUserName)
- {
- connectInfo.userNameLength = RyanMqttStrlen(client->config.userName);
- }
- else
- {
- connectInfo.userNameLength = 0;
- }
- connectInfo.pPassword = client->config.password;
- if (connectInfo.pPassword)
- {
- connectInfo.passwordLength = RyanMqttStrlen(client->config.password);
- }
- else
- {
- connectInfo.passwordLength = 0;
- }
- connectInfo.keepAliveSeconds = client->config.keepaliveTimeoutS;
- connectInfo.cleanSession = client->config.cleanSessionFlag;
- // 验证lwt信息
- platformMutexLock(client->config.userData, &client->userSessionLock);
- if (NULL != client->lwtOptions)
- {
- lwtFlag = client->lwtOptions->lwtFlag;
- if (lwtFlag)
- {
- willInfo.qos = (MQTTQoS_t)client->lwtOptions->qos;
- willInfo.retain = client->lwtOptions->retain;
- willInfo.pPayload = client->lwtOptions->payload;
- willInfo.payloadLength = client->lwtOptions->payloadLen;
- willInfo.pTopicName = client->lwtOptions->topic;
- willInfo.topicNameLength = RyanMqttStrlen(client->lwtOptions->topic);
- willInfo.dup = RyanMqttFalse;
- }
- }
- else
- {
- lwtFlag = RyanMqttFalse;
- }
- platformMutexUnLock(client->config.userData, &client->userSessionLock);
- }
- // 获取数据包大小
- status = MQTT_GetConnectPacketSize(&connectInfo, RyanMqttTrue == lwtFlag ? &willInfo : NULL, &remainingLength,
- &fixedBuffer.size);
- RyanMqttAssert(MQTTSuccess == status);
- // 申请数据包的空间
- fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
- RyanMqttCheckCodeNoReturn(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
- result = RyanMqttNotEnoughMemError;
- *connectState = RyanMqttConnectFailedError;
- goto __exit;
- });
- // 序列化数据包
- status = MQTT_SerializeConnect(&connectInfo, RyanMqttTrue == lwtFlag ? &willInfo : NULL, remainingLength,
- &fixedBuffer);
- RyanMqttCheckCodeNoReturn(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
- result = RyanMqttSerializePacketError;
- *connectState = RyanMqttConnectFailedError;
- goto __exit;
- });
- // 调用底层的连接函数连接上服务器
- result = platformNetworkConnect(client->config.userData, &client->network, client->config.host,
- client->config.port);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanSocketFailedError, RyanMqttLog_d, {
- *connectState = RyanMqttConnectNetWorkFail;
- goto __exit;
- });
- // 发送序列化mqtt的CONNECT报文
- result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
- *connectState = RyanMqttConnectNetWorkFail;
- goto __exit;
- });
- // 等待报文
- // mqtt规范 服务端接收到connect报文后,服务端发送给客户端的第一个报文必须是 CONNACK
- MQTTPacketInfo_t pIncomingPacket = {0};
- result = RyanMqttGetPacketInfo(client, &pIncomingPacket);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttSerializePacketError, RyanMqttLog_d, {
- *connectState = RyanMqttConnectInvalidPacketError;
- goto __exit;
- });
- if (MQTT_PACKET_TYPE_CONNACK == (pIncomingPacket.type & 0xF0U))
- {
- uint16_t packetId;
- bool sessionPresent; // 会话位
- // 反序列化ack包,MQTTSuccess 和 MQTTServerRefused 都会返回正确的connectState
- // MQTTServerRefused 表示连接被拒绝,但是可以获取原因思什么
- status = MQTT_DeserializeAck(&pIncomingPacket, &packetId, &sessionPresent);
- if (MQTTSuccess != status && MQTTServerRefused != status)
- {
- result = RyanMqttFailedError;
- *connectState = RyanMqttConnectInvalidPacketError;
- }
- else
- {
- // 获取连接返回值
- *connectState = pIncomingPacket.pRemainingData[1];
- if (RyanMqttConnectAccepted != *connectState)
- {
- result = RyanMqttFailedError;
- }
- else
- {
- // 服务端无历史会话,客户端这里选择直接进行清空
- if (false == sessionPresent)
- {
- RyanMqttPurgeSession(client);
- }
- }
- }
- }
- else
- {
- result = RyanMqttInvalidPacketError;
- *connectState = RyanMqttConnectFirstPackNotConnack;
- }
- platformMemoryFree(pIncomingPacket.pRemainingData);
- if (RyanMqttSuccessError != result)
- {
- platformNetworkClose(client->config.userData, &client->network);
- }
- __exit:
- if (fixedBuffer.pBuffer)
- {
- platformMemoryFree(fixedBuffer.pBuffer);
- }
- return result;
- }
- /**
- * @brief mqtt事件处理函数
- *
- * @param client
- * @param eventId
- * @param eventData
- */
- void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
- {
- RyanMqttAssert(NULL != client);
- switch (eventId)
- {
- case RyanMqttEventConnected: // 第一次连接成功
- RyanMqttSetClientState(client, RyanMqttConnectState);
- RyanMqttRefreshKeepaliveTime(client);
- RyanMqttAckListScan(client, RyanMqttFalse); // 扫描确认列表,销毁已超时的确认处理程序或重新发送它们
- break;
- case RyanMqttEventDisconnected: // 断开连接事件
- // 先将客户端状态设置为断开连接,避免close网络资源时用户依然在使用
- RyanMqttSetClientState(client, RyanMqttDisconnectState);
- platformNetworkClose(client->config.userData, &client->network);
- if (RyanMqttTrue == client->config.cleanSessionFlag)
- {
- RyanMqttPurgeSession(client);
- }
- break;
- case RyanMqttEventReconnectBefore: // 重连前回调
- RyanMqttSetClientState(client, RyanMqttReconnectState);
- break;
- default: break;
- }
- if (NULL == client->config.mqttEventHandle)
- {
- return;
- }
- platformCriticalEnter(client->config.userData, &client->criticalLock);
- RyanMqttEventId_e eventFlag = client->eventFlag;
- platformCriticalExit(client->config.userData, &client->criticalLock);
- if (eventFlag & eventId)
- {
- client->config.mqttEventHandle(client, eventId, eventData);
- }
- }
- /**
- * @brief mqtt运行线程
- *
- * @param argument
- */
- void RyanMqttThread(void *argument)
- {
- RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
- RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
- while (1)
- {
- // 销毁客户端
- if (RyanMqttTrue == client->destroyFlag)
- {
- RyanMqttEventMachine(client, RyanMqttEventDestroyBefore, (void *)NULL);
- RyanMqttPurgeClient(client);
- // 清除掉线程动态资源
- platformThread_t mqttThread;
- RyanMqttMemcpy(&mqttThread, &client->mqttThread, sizeof(platformThread_t));
- void *userData = client->config.userData;
- platformMemoryFree(client);
- client = NULL;
- // 销毁自身线程
- platformThreadDestroy(userData, &mqttThread);
- return;
- }
- // 客户端状态变更状态机
- switch (RyanMqttGetClientState(client))
- {
- case RyanMqttStartState: // 开始状态状态
- case RyanMqttReconnectState: {
- RyanMqttLog_d("开始连接");
- RyanMqttConnectStatus_e connectState;
- RyanMqttError_e result = RyanMqttConnectBroker(client, &connectState);
- if (RyanMqttSuccessError == result)
- {
- RyanMqttEventMachine(client, RyanMqttEventConnected, (void *)&connectState);
- }
- else
- {
- RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
- }
- }
- break;
- case RyanMqttConnectState: // 连接状态
- RyanMqttLog_d("连接状态");
- // 不对返回值进行处理
- RyanMqttProcessPacketHandler(client);
- RyanMqttAckListScan(client, RyanMqttTrue);
- RyanMqttKeepalive(client);
- break;
- case RyanMqttDisconnectState: // 断开连接状态
- RyanMqttLog_d("断开连接状态");
- if (RyanMqttTrue != client->config.autoReconnectFlag) // 没有使能自动连接就休眠线程
- {
- platformThreadStop(client->config.userData, &client->mqttThread);
- // 断连的时候会暂停线程,线程重新启动就是用户手动连接了
- RyanMqttLog_d("手动重新连接\r\n");
- RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL);
- }
- else
- {
- RyanMqttLog_d("触发自动连接,%dms后开始连接\r\n", client->config.reconnectTimeout);
- platformDelay(client->config.reconnectTimeout);
- RyanMqttEventMachine(client, RyanMqttEventReconnectBefore,
- NULL); // 给上层触发重新连接前事件
- }
- break;
- default: RyanMqttAssert(NULL); break;
- }
- }
- }
|