|
|
@@ -315,6 +315,9 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
}
|
|
|
|
|
|
// qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
|
|
|
+ result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
|
|
|
+ RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
+
|
|
|
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
|
|
|
packetId = RyanMqttGetNextPacketId(client);
|
|
|
|
|
|
@@ -322,14 +325,12 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
topicName, (uint8_t *)payload, payloadLen);
|
|
|
RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
|
|
|
- result = RyanMqttMsgHandlerCreate(client, topic, strlen(topic), qos, &msgHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
|
|
|
-
|
|
|
result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, client->config.sendBuffer, msgHandler, &ackHandler);
|
|
|
RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
|
|
|
platformMemoryFree(msgHandler);
|
|
|
platformMutexUnLock(client->config.userData, &client->sendBufLock);
|
|
|
});
|
|
|
+
|
|
|
platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
@@ -337,6 +338,7 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
|
|
|
{ RyanMqttAckListRemove(client, ackHandler);
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
+
|
|
|
// 提前设置重发标志位
|
|
|
RyanMqttSetPublishDup(&ackHandler->packet[0], 1);
|
|
|
|