|
|
@@ -13,9 +13,9 @@ void RyanMqttRefreshKeepaliveTime(RyanMqttClient_t *client)
|
|
|
{
|
|
|
// 服务器在心跳时间的1.5倍内没有收到keeplive消息则会断开连接
|
|
|
// 这里在用户设置的心跳剩余 1/4 时手动发送保活指令
|
|
|
- platformCriticalEnter(client->config->userData, client->criticalLock);
|
|
|
- platformTimerCutdown(&client->keepaliveTimer, client->config->keepaliveTimeoutS / 4 * 3 * 1000); // 启动心跳定时器
|
|
|
- platformCriticalExit(client->config->userData, client->criticalLock);
|
|
|
+ platformCriticalEnter(client->config.userData, &client->criticalLock);
|
|
|
+ platformTimerCutdown(&client->keepaliveTimer, client->config.keepaliveTimeoutS / 4 * 3 * 1000); // 启动心跳定时器
|
|
|
+ platformCriticalExit(client->config.userData, &client->criticalLock);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -45,11 +45,11 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
|
|
|
return RyanMqttKeepaliveTimeout;
|
|
|
}
|
|
|
|
|
|
- platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
- int32_t packetLen = MQTTSerialize_pingreq((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize);
|
|
|
+ platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
+ int32_t packetLen = MQTTSerialize_pingreq((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
|
|
|
if (packetLen > 0)
|
|
|
- RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
+ RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
client->keepaliveTimeoutCount++;
|
|
|
return RyanMqttSuccessError;
|
|
|
@@ -103,7 +103,7 @@ static RyanMqttError_e RyanMqttPubackAndPubcompPacketHandler(RyanMqttClient_t *c
|
|
|
RyanMqttAckHandler_t *ackHandler = NULL;
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
- result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
// 可能会多次收到 puback / pubcomp,仅在首次收到时触发发布成功回调函数
|
|
|
@@ -133,17 +133,17 @@ static RyanMqttError_e RyanMqttPubrelPacketHandler(RyanMqttClient_t *client)
|
|
|
RyanMqttAckHandler_t *ackHandler = NULL;
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
- result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
// 制作确认数据包并发送
|
|
|
- platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
- packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBCOMP, 0, packetId);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
+ packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBCOMP, 0, packetId);
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
|
|
|
// 每次收到PUBREL都返回消息
|
|
|
- result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
+ result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
// 删除pubrel记录
|
|
|
@@ -175,18 +175,18 @@ static RyanMqttError_e RyanMqttPubrecPacketHandler(RyanMqttClient_t *client)
|
|
|
RyanMqttAckHandler_t *ackHandlerPubrec = NULL;
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
- result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ result = MQTTDeserialize_ack(&packetType, &dup, &packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
// 制作确认数据包并发送
|
|
|
- platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
+ platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
// 序列化发布释放报文
|
|
|
- packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREL, 0, packetId);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBREL, 0, packetId);
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
|
|
|
// 每次收到PUBREC都返回ack
|
|
|
- result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
+ result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
// 只在首次收到pubrec, 并pubcomp不存在于ack链表时,才创建pubcmp到ack链表,再删除pubrec记录
|
|
|
@@ -239,7 +239,7 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
result = MQTTDeserialize_publish(&msgData.dup, (int *)&msgData.qos, &msgData.retained, &msgData.packetId, &topicName,
|
|
|
- (uint8_t **)&msgData.payload, (int *)&msgData.payloadLen, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ (uint8_t **)&msgData.payload, (int *)&msgData.payloadLen, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
// 查看订阅列表是否包含此消息主题,进行通配符匹配。不包含就直接退出在一定程度上可以防止恶意攻击
|
|
|
@@ -253,13 +253,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
|
|
|
break;
|
|
|
|
|
|
case RyanMqttQos1:
|
|
|
- platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
- packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBACK, 0, msgData.packetId);
|
|
|
+ platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
+ packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBACK, 0, msgData.packetId);
|
|
|
RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
|
|
|
- { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
|
|
|
- result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
+ result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
deliverMsgFlag = RyanMqttTrue;
|
|
|
@@ -267,13 +267,13 @@ static RyanMqttError_e RyanMqttPublishPacketHandler(RyanMqttClient_t *client)
|
|
|
|
|
|
case RyanMqttQos2: // qos2采用方法B
|
|
|
|
|
|
- platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
- packetLen = MQTTSerialize_ack((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, PUBREC, 0, msgData.packetId);
|
|
|
+ platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
+ packetLen = MQTTSerialize_ack((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, PUBREC, 0, msgData.packetId);
|
|
|
RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
|
|
|
- { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
|
|
|
- result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
+ result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
// 收到publish就期望收到PUBREL,如果PUBREL报文已经存在说明不是首次收到publish, 不进行qos2 PUBREC消息处理
|
|
|
@@ -323,7 +323,7 @@ static RyanMqttError_e RyanMqttSubackHandler(RyanMqttClient_t *client)
|
|
|
RyanMqttAckHandler_t *ackHandler = NULL;
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
- result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ result = MQTTDeserialize_suback(&packetId, 1, (int *)&count, (int *)&grantedQoS, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
// ack链表不存在当前订阅确认节点就直接退出
|
|
|
@@ -376,7 +376,7 @@ static RyanMqttError_e RyanMqttUnSubackHandler(RyanMqttClient_t *client)
|
|
|
uint16_t packetId = 0;
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
|
|
|
- result = MQTTDeserialize_unsuback(&packetId, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ result = MQTTDeserialize_unsuback(&packetId, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
// ack链表不存在当前取消订阅确认节点就直接退出
|
|
|
@@ -409,13 +409,13 @@ static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8
|
|
|
// RyanMqttAssert(NULL != packetType); packetType == 0时会误判
|
|
|
|
|
|
// 1.读取标头字节。 其中包含数据包类型
|
|
|
- result = RyanMqttRecvPacket(client, client->config->recvBuffer, fixedHeaderLen);
|
|
|
+ result = RyanMqttRecvPacket(client, client->config.recvBuffer, fixedHeaderLen);
|
|
|
if (RyanMqttRecvPacketTimeOutError == result)
|
|
|
return RyanMqttRecvPacketTimeOutError;
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
// 填充联合体标头信息
|
|
|
- header.byte = client->config->recvBuffer[0];
|
|
|
+ header.byte = client->config.recvBuffer[0];
|
|
|
rlog_d("packetType: %d", header.bits.type);
|
|
|
RyanMqttCheck(CONNECT <= header.bits.type && DISCONNECT >= header.bits.type, result, rlog_d);
|
|
|
|
|
|
@@ -424,14 +424,14 @@ static RyanMqttError_e RyanMqttReadPacketHandler(RyanMqttClient_t *client, uint8
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
// 将剩余长度编码成mqtt报文,并放入接收缓冲区,如果消息长度超过缓冲区长度则抛弃此次数据
|
|
|
- fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config->recvBuffer + fixedHeaderLen, payloadLen);
|
|
|
- RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config->recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
|
|
|
- { RyanMqttRecvPacket(client, client->config->recvBuffer, payloadLen); });
|
|
|
+ fixedHeaderLen += MQTTPacket_encode((uint8_t *)client->config.recvBuffer + fixedHeaderLen, payloadLen);
|
|
|
+ RyanMqttCheckCode((fixedHeaderLen + payloadLen) <= client->config.recvBufferSize, RyanMqttRecvBufToShortError, rlog_d,
|
|
|
+ { RyanMqttRecvPacket(client, client->config.recvBuffer, payloadLen); });
|
|
|
|
|
|
// 3.读取mqtt载荷数据并放到读取缓冲区
|
|
|
if (payloadLen > 0)
|
|
|
{
|
|
|
- result = RyanMqttRecvPacket(client, client->config->recvBuffer + fixedHeaderLen, payloadLen);
|
|
|
+ result = RyanMqttRecvPacket(client, client->config.recvBuffer + fixedHeaderLen, payloadLen);
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
}
|
|
|
|
|
|
@@ -528,11 +528,11 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
|
|
|
RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen); // 重新发送数据
|
|
|
|
|
|
// 重置ack超时时间
|
|
|
- platformTimerCutdown(&ackHandler->timer, client->config->ackTimeout);
|
|
|
+ platformTimerCutdown(&ackHandler->timer, client->config.ackTimeout);
|
|
|
ackHandler->repeatCount++;
|
|
|
|
|
|
// 重发次数超过警告值回调
|
|
|
- if (ackHandler->repeatCount >= client->config->ackHandlerRepeatCountWarning)
|
|
|
+ if (ackHandler->repeatCount >= client->config.ackHandlerRepeatCountWarning)
|
|
|
RyanMqttEventMachine(client, RyanMqttEventAckRepeatCountWarning, (void *)ackHandler);
|
|
|
|
|
|
break;
|
|
|
@@ -577,41 +577,39 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
|
|
|
int32_t connackRc = 0;
|
|
|
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
- RyanMqttAssert(NULL != client->network);
|
|
|
- RyanMqttAssert(NULL != client->config);
|
|
|
|
|
|
RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttConnectAccepted, rlog_d);
|
|
|
|
|
|
// 连接标志位
|
|
|
- connectData.clientID.cstring = client->config->clientId;
|
|
|
- connectData.username.cstring = client->config->userName;
|
|
|
- connectData.password.cstring = client->config->password;
|
|
|
- connectData.keepAliveInterval = client->config->keepaliveTimeoutS;
|
|
|
- connectData.cleansession = client->config->cleanSessionFlag;
|
|
|
- connectData.MQTTVersion = client->config->mqttVersion;
|
|
|
+ connectData.clientID.cstring = client->config.clientId;
|
|
|
+ connectData.username.cstring = client->config.userName;
|
|
|
+ connectData.password.cstring = client->config.password;
|
|
|
+ connectData.keepAliveInterval = client->config.keepaliveTimeoutS;
|
|
|
+ connectData.cleansession = client->config.cleanSessionFlag;
|
|
|
+ connectData.MQTTVersion = client->config.mqttVersion;
|
|
|
|
|
|
if (RyanMqttTrue == client->lwtFlag)
|
|
|
{
|
|
|
connectData.willFlag = 1;
|
|
|
- connectData.will.qos = client->lwtOptions->qos;
|
|
|
- connectData.will.retained = client->lwtOptions->retain;
|
|
|
- connectData.will.message.lenstring.data = client->lwtOptions->payload;
|
|
|
- connectData.will.message.lenstring.len = client->lwtOptions->payloadLen;
|
|
|
- connectData.will.topicName.cstring = client->lwtOptions->topic;
|
|
|
+ connectData.will.qos = client->lwtOptions.qos;
|
|
|
+ connectData.will.retained = client->lwtOptions.retain;
|
|
|
+ connectData.will.message.lenstring.data = client->lwtOptions.payload;
|
|
|
+ connectData.will.message.lenstring.len = client->lwtOptions.payloadLen;
|
|
|
+ connectData.will.topicName.cstring = client->lwtOptions.topic;
|
|
|
}
|
|
|
|
|
|
// 调用底层的连接函数连接上服务器
|
|
|
- result = platformNetworkConnect(client->config->userData, client->network, client->config->host, client->config->port);
|
|
|
+ result = platformNetworkConnect(client->config.userData, &client->network, client->config.host, client->config.port);
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttConnectNetWorkFail, rlog_d);
|
|
|
|
|
|
- platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
+ platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
// 序列化mqtt的CONNECT报文
|
|
|
- packetLen = MQTTSerialize_connect((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, &connectData);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ packetLen = MQTTSerialize_connect((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, &connectData);
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
|
|
|
// 发送序列化mqtt的CONNECT报文
|
|
|
- result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
+ result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
|
|
|
+ platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
// 等待报文
|
|
|
@@ -620,7 +618,7 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
|
|
|
RyanMqttCheck(CONNACK == packetType, RyanMqttConnectDisconnected, rlog_d);
|
|
|
|
|
|
// 解析CONNACK报文
|
|
|
- result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config->recvBuffer, client->config->recvBufferSize);
|
|
|
+ result = MQTTDeserialize_connack(&sessionPresent, (uint8_t *)&connackRc, (uint8_t *)client->config.recvBuffer, client->config.recvBufferSize);
|
|
|
RyanMqttCheck(1 == result, RyanMqttDeserializePacketError, rlog_d);
|
|
|
|
|
|
rlog_i("result: %d, packetLen: %d, packetType: %d connackRc: %d", result, packetLen, packetType, connackRc);
|
|
|
@@ -638,8 +636,6 @@ static RyanMqttError_e RyanMqttConnect(RyanMqttClient_t *client)
|
|
|
void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, void *eventData)
|
|
|
{
|
|
|
RyanMqttAssert(NULL != client);
|
|
|
- RyanMqttAssert(NULL != client->network);
|
|
|
- RyanMqttAssert(NULL != client->config);
|
|
|
|
|
|
switch (eventId)
|
|
|
{
|
|
|
@@ -652,9 +648,9 @@ void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, v
|
|
|
|
|
|
case RyanMqttEventDisconnected: // 断开连接事件
|
|
|
RyanMqttSetClientState(client, RyanMqttDisconnectState); // 先将客户端状态设置为断开连接,避免close网络资源时用户依然在使用
|
|
|
- platformNetworkClose(client->config->userData, client->network);
|
|
|
+ platformNetworkClose(client->config.userData, &client->network);
|
|
|
|
|
|
- if (RyanMqttTrue == client->config->cleanSessionFlag)
|
|
|
+ if (RyanMqttTrue == client->config.cleanSessionFlag)
|
|
|
RyanMqttCleanSession(client);
|
|
|
|
|
|
break;
|
|
|
@@ -667,11 +663,11 @@ void RyanMqttEventMachine(RyanMqttClient_t *client, RyanMqttEventId_e eventId, v
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- if (client->config->mqttEventHandle == NULL)
|
|
|
+ if (client->config.mqttEventHandle == NULL)
|
|
|
return;
|
|
|
|
|
|
if (client->eventFlag & eventId)
|
|
|
- client->config->mqttEventHandle(client, eventId, eventData);
|
|
|
+ client->config.mqttEventHandle(client, eventId, eventData);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -683,86 +679,52 @@ void RyanMqttThread(void *argument)
|
|
|
{
|
|
|
int32_t result = 0;
|
|
|
RyanMqttClient_t *client = (RyanMqttClient_t *)argument;
|
|
|
- RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
|
|
|
- RyanMqttAssert(NULL != client->network); // RyanMqttStart前没有调用RyanMqttInit
|
|
|
- RyanMqttAssert(NULL != client->config); // RyanMqttStart前没有调用RyanMqttSetConfig
|
|
|
+ RyanMqttAssert(NULL != client); // RyanMqttStart前没有调用RyanMqttInit
|
|
|
|
|
|
while (1)
|
|
|
{
|
|
|
-
|
|
|
// 销毁客户端
|
|
|
if (RyanMqttTrue == client->destoryFlag)
|
|
|
{
|
|
|
-
|
|
|
RyanMqttEventMachine(client, RyanMqttEventDestoryBefore, (void *)NULL);
|
|
|
|
|
|
// 清除网络组件
|
|
|
- if (NULL != client->network)
|
|
|
- {
|
|
|
- platformNetworkClose(client->config->userData, client->network);
|
|
|
- platformMemoryFree(client->network);
|
|
|
- client->network = NULL;
|
|
|
- }
|
|
|
+ platformNetworkClose(client->config.userData, &client->network);
|
|
|
|
|
|
// 清除config信息
|
|
|
- if (NULL != client->config)
|
|
|
- {
|
|
|
- if (NULL != client->config->clientId)
|
|
|
- platformMemoryFree(client->config->clientId);
|
|
|
-
|
|
|
- if (NULL != client->config->host)
|
|
|
- platformMemoryFree(client->config->host);
|
|
|
-
|
|
|
- if (NULL != client->config->port)
|
|
|
- platformMemoryFree(client->config->port);
|
|
|
-
|
|
|
- if (NULL != client->config->userName)
|
|
|
- platformMemoryFree(client->config->userName);
|
|
|
-
|
|
|
- if (NULL != client->config->password)
|
|
|
- platformMemoryFree(client->config->password);
|
|
|
-
|
|
|
- if (NULL != client->config->taskName)
|
|
|
- platformMemoryFree(client->config->taskName);
|
|
|
-
|
|
|
- if (NULL != client->config)
|
|
|
- platformMemoryFree(client->config);
|
|
|
- }
|
|
|
+ if (NULL != client->config.clientId)
|
|
|
+ platformMemoryFree(client->config.clientId);
|
|
|
+ if (NULL != client->config.userName)
|
|
|
+ platformMemoryFree(client->config.userName);
|
|
|
+ if (NULL != client->config.password)
|
|
|
+ platformMemoryFree(client->config.password);
|
|
|
+ if (NULL != client->config.host)
|
|
|
+ platformMemoryFree(client->config.host);
|
|
|
+ if (NULL != client->config.port)
|
|
|
+ platformMemoryFree(client->config.port);
|
|
|
+ if (NULL != client->config.taskName)
|
|
|
+ platformMemoryFree(client->config.taskName);
|
|
|
|
|
|
// 清除遗嘱相关配置
|
|
|
- if (RyanMqttTrue == client->lwtFlag && NULL != client->lwtOptions)
|
|
|
- {
|
|
|
- if (NULL != client->lwtOptions->topic)
|
|
|
- platformMemoryFree(client->lwtOptions->topic);
|
|
|
+ if (NULL != client->lwtOptions.payload)
|
|
|
+ platformMemoryFree(client->lwtOptions.payload);
|
|
|
|
|
|
- platformMemoryFree(client->lwtOptions);
|
|
|
- }
|
|
|
+ if (NULL != client->lwtOptions.topic)
|
|
|
+ platformMemoryFree(client->lwtOptions.topic);
|
|
|
|
|
|
// 清除session ack链表和msg链表
|
|
|
RyanMqttCleanSession(client);
|
|
|
|
|
|
// 清除互斥锁
|
|
|
- if (NULL != client->sendBufLock)
|
|
|
- {
|
|
|
- platformMutexDestroy(client->config->userData, client->sendBufLock);
|
|
|
- platformMemoryFree(client->sendBufLock);
|
|
|
- client->sendBufLock = NULL;
|
|
|
- }
|
|
|
+ platformMutexDestroy(client->config.userData, &client->sendBufLock);
|
|
|
|
|
|
// 清除临界区
|
|
|
- if (NULL != client->criticalLock)
|
|
|
- {
|
|
|
- platformCriticalDestroy(client->config->userData, client->criticalLock);
|
|
|
- platformMemoryFree(client->criticalLock);
|
|
|
- client->criticalLock = NULL;
|
|
|
- }
|
|
|
-
|
|
|
- platformThread_t mqttThread = *client->mqttThread;
|
|
|
- void *userData = client->config->userData;
|
|
|
+ platformCriticalDestroy(client->config.userData, &client->criticalLock);
|
|
|
|
|
|
// 清除掉线程动态资源
|
|
|
- platformMemoryFree(client->mqttThread);
|
|
|
- client->mqttThread = NULL;
|
|
|
+ platformThread_t mqttThread = {0};
|
|
|
+ memcpy(&mqttThread, &client->mqttThread, sizeof(platformThread_t));
|
|
|
+ void *userData = client->config.userData;
|
|
|
|
|
|
platformMemoryFree(client);
|
|
|
client = NULL;
|
|
|
@@ -791,11 +753,11 @@ void RyanMqttThread(void *argument)
|
|
|
|
|
|
case RyanMqttDisconnectState: // 断开连接状态
|
|
|
rlog_d("断开连接状态");
|
|
|
- if (RyanMqttTrue != client->config->autoReconnectFlag) // 没有使能自动连接就休眠线程
|
|
|
- platformThreadStop(client->config->userData, client->mqttThread);
|
|
|
+ if (RyanMqttTrue != client->config.autoReconnectFlag) // 没有使能自动连接就休眠线程
|
|
|
+ platformThreadStop(client->config.userData, &client->mqttThread);
|
|
|
|
|
|
rlog_d("触发自动连接,%dms后开始连接\r\n", client->config->reconnectTimeout);
|
|
|
- platformDelay(client->config->reconnectTimeout);
|
|
|
+ platformDelay(client->config.reconnectTimeout);
|
|
|
RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL); // 给上层触发重新连接前事件
|
|
|
|
|
|
break;
|