|
@@ -23,6 +23,7 @@ void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
|
|
|
static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
|
|
static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
|
|
|
{
|
|
{
|
|
|
RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
|
|
RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
|
|
|
|
|
+ RyanMqttError_e result = RyanMqttFailedError;
|
|
|
int32_t packetLen = 0;
|
|
int32_t packetLen = 0;
|
|
|
RyanMqttAssert(NULL != client);
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
|
@@ -45,8 +46,14 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
|
|
|
{
|
|
{
|
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
packetLen = MQTTSerialize_pingreq((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
|
|
packetLen = MQTTSerialize_pingreq((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
|
|
|
- if (packetLen > 0)
|
|
|
|
|
- RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
|
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
|
|
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock);
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
|
|
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock);
|
|
|
|
|
+ });
|
|
|
platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -446,7 +453,8 @@ static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8
|
|
|
|
|
|
|
|
// 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
|
|
// 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
|
|
|
fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config.recvBuffer + fixedHeaderLen, payloadLen);
|
|
fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config.recvBuffer + fixedHeaderLen, payloadLen);
|
|
|
- // 上面不判断是因为recv空间不可能小到 fixedHeaderLen 都无法写入
|
|
|
|
|
|
|
+
|
|
|
|
|
+ //? 上面不判断是因为recv空间不可能小到 fixedHeaderLen 都无法写入
|
|
|
// 判断recvBufferSize是否可以存的下数据
|
|
// 判断recvBufferSize是否可以存的下数据
|
|
|
RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config.recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
|
|
RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config.recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
|
|
|
{
|
|
{
|
|
@@ -575,6 +583,7 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
|
|
|
// 重发数据事件回调
|
|
// 重发数据事件回调
|
|
|
RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
|
|
RyanMqttEventMachine(client, RyanMqttEventRepeatPublishPacket, (void *)ackHandler);
|
|
|
|
|
|
|
|
|
|
+ //? 发送失败也是重试,所以这里不进行错误判断
|
|
|
RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
|
|
RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
|
|
|
|
|
|
|
|
// 重置ack超时时间
|
|
// 重置ack超时时间
|