|
|
@@ -193,18 +193,21 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
int32_t packetLen = 0;
|
|
|
uint16_t packetId = 0;
|
|
|
- int requestedQoS = qos;
|
|
|
+ int requestedQoS = 0;
|
|
|
RyanMqttMsgHandler_t *msgHandler = NULL;
|
|
|
RyanMqttAckHandler_t *userAckHandler = NULL;
|
|
|
MQTTString topicName = MQTTString_initializer;
|
|
|
- topicName.cstring = (char *)topic;
|
|
|
|
|
|
RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
|
|
|
- result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
|
|
|
+ requestedQoS = qos;
|
|
|
+ topicName.lenstring.data = topic;
|
|
|
+ topicName.lenstring.len = strlen(topic);
|
|
|
+
|
|
|
+ result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, qos, &msgHandler);
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
@@ -250,17 +253,19 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
|
|
|
RyanMqttMsgHandler_t *msgHandler = NULL;
|
|
|
RyanMqttAckHandler_t *userAckHandler = NULL;
|
|
|
MQTTString topicName = MQTTString_initializer;
|
|
|
- topicName.cstring = topic;
|
|
|
|
|
|
RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
|
|
|
+ topicName.lenstring.data = topic;
|
|
|
+ topicName.lenstring.len = strlen(topic);
|
|
|
+
|
|
|
// 查找当前主题是否已经订阅,没有订阅就取消发送
|
|
|
- result = RyanMqttMsgHandlerFind(client, topicName.cstring, strlen(topicName.cstring), RyanMqttFalse, &subMsgHandler);
|
|
|
+ result = RyanMqttMsgHandlerFind(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttFalse, &subMsgHandler);
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
- result = RyanMqttMsgHandlerCreate(client, topicName.cstring, strlen(topicName.cstring), RyanMqttQos0, &msgHandler);
|
|
|
+ result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttQos0, &msgHandler);
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
@@ -309,7 +314,6 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
MQTTString topicName = MQTTString_initializer;
|
|
|
RyanMqttMsgHandler_t *msgHandler = NULL;
|
|
|
RyanMqttAckHandler_t *userAckHandler = NULL;
|
|
|
- topicName.cstring = (char *)topic;
|
|
|
|
|
|
RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
@@ -321,6 +325,9 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
|
|
|
return RyanMqttParamInvalidError;
|
|
|
|
|
|
+ topicName.lenstring.data = topic;
|
|
|
+ topicName.lenstring.len = strlen(topic);
|
|
|
+
|
|
|
if (RyanMqttQos0 == qos)
|
|
|
{
|
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
@@ -335,7 +342,7 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
}
|
|
|
|
|
|
// qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
|
|
|
- result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
|
|
|
+ result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, qos, &msgHandler);
|
|
|
RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|