|
|
@@ -1,21 +1,14 @@
|
|
|
-
|
|
|
-
|
|
|
-#include "RyanMqttPublic.h"
|
|
|
+#define rlogEnable 1 // 是否使能日志
|
|
|
+#define rlogColorEnable 1 // 是否使能日志颜色
|
|
|
+#define rlogLevel (rlogLvlWarning) // 日志打印等级
|
|
|
+#define rlogTag "RyanMqttClient" // 日志tag
|
|
|
+
|
|
|
+#include "RyanMqttLog.h"
|
|
|
+#include "MQTTPacket.h"
|
|
|
+#include "RyanMqttClient.h"
|
|
|
#include "RyanMqttUtile.h"
|
|
|
#include "RyanMqttThread.h"
|
|
|
|
|
|
-#define DBG_ENABLE
|
|
|
-#define DBG_SECTION_NAME RyanMqttTag
|
|
|
-
|
|
|
-#ifdef RyanDebugEnable
|
|
|
-#define DBG_LEVEL DBG_LOG
|
|
|
-#else
|
|
|
-#define DBG_LEVEL DBG_INFO
|
|
|
-#endif
|
|
|
-
|
|
|
-#define DBG_COLOR
|
|
|
-#include "ulog.h"
|
|
|
-
|
|
|
/**
|
|
|
* @brief 获取报文标识符,报文标识符不可为0
|
|
|
*
|
|
|
@@ -56,28 +49,28 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
|
|
|
{
|
|
|
|
|
|
RyanMqttClient_t *client = NULL;
|
|
|
- RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, rlog_d);
|
|
|
|
|
|
client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, rlog_d);
|
|
|
memset(client, 0, sizeof(RyanMqttClient_t));
|
|
|
|
|
|
// 网络接口初始化
|
|
|
client->network = (platformNetwork_t *)platformMemoryMalloc(sizeof(platformNetwork_t));
|
|
|
- RyanMqttCheckCode(NULL != client->network, RyanMqttNotEnoughMemError, ulog_d, { RyanMqttDestroy(client); });
|
|
|
+ RyanMqttCheckCode(NULL != client->network, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
|
|
|
memset(client->network, 0, sizeof(platformNetwork_t));
|
|
|
client->network->socket = -1;
|
|
|
|
|
|
client->config = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
|
|
|
- RyanMqttCheckCode(NULL != client->config, RyanMqttNotEnoughMemError, ulog_d, { RyanMqttDestroy(client); });
|
|
|
+ RyanMqttCheckCode(NULL != client->config, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
|
|
|
memset(client->config, 0, sizeof(RyanMqttClientConfig_t));
|
|
|
|
|
|
client->mqttThread = platformMemoryMalloc(sizeof(platformThread_t));
|
|
|
- RyanMqttCheckCode(NULL != client->mqttThread, RyanMqttNotEnoughMemError, ulog_d, { RyanMqttDestroy(client); });
|
|
|
+ RyanMqttCheckCode(NULL != client->mqttThread, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
|
|
|
memset(client->mqttThread, 0, sizeof(platformThread_t));
|
|
|
|
|
|
client->sendBufLock = platformMemoryMalloc(sizeof(platformMutex_t));
|
|
|
- RyanMqttCheckCode(NULL != client->sendBufLock, RyanMqttNotEnoughMemError, ulog_d, { RyanMqttDestroy(client); });
|
|
|
+ RyanMqttCheckCode(NULL != client->sendBufLock, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttDestroy(client); });
|
|
|
memset(client->sendBufLock, 0, sizeof(platformMutex_t));
|
|
|
|
|
|
client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
|
|
|
@@ -85,7 +78,7 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
|
|
|
client->eventFlag = 0;
|
|
|
client->keepaliveTimeoutCount = 0;
|
|
|
client->ackHandlerCount = 0;
|
|
|
- client->lwtFlag = RyanFalse;
|
|
|
+ client->lwtFlag = RyanMqttFalse;
|
|
|
client->lwtOptions = NULL;
|
|
|
|
|
|
platformMutexInit(client->config->userData, client->sendBufLock); // 初始化发送缓冲区互斥锁
|
|
|
@@ -94,7 +87,7 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
|
|
|
RyanListInit(&client->ackHandlerList);
|
|
|
platformTimerInit(&client->keepaliveTimer);
|
|
|
|
|
|
- RyanMqttSetClientState(client, mqttInitState);
|
|
|
+ RyanMqttSetClientState(client, RyanMqttInitState);
|
|
|
|
|
|
*pClient = client;
|
|
|
return RyanMqttSuccessError;
|
|
|
@@ -109,7 +102,7 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
|
|
|
RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
|
|
|
{
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
|
|
|
RyanMqttEventMachine(client, RyanMqttEventDestoryBefore, (void *)NULL);
|
|
|
|
|
|
@@ -140,10 +133,10 @@ RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
|
|
|
// 清除config信息
|
|
|
if (NULL != client->config)
|
|
|
{
|
|
|
- if (RyanTrue != client->config->recvBufferStaticFlag && NULL != client->config->recvBuffer)
|
|
|
+ if (RyanMqttTrue != client->config->recvBufferStaticFlag && NULL != client->config->recvBuffer)
|
|
|
platformMemoryFree(client->config->recvBuffer);
|
|
|
|
|
|
- if (RyanTrue != client->config->sendBufferStaticFlag && NULL != client->config->sendBuffer)
|
|
|
+ if (RyanMqttTrue != client->config->sendBufferStaticFlag && NULL != client->config->sendBuffer)
|
|
|
platformMemoryFree(client->config->sendBuffer);
|
|
|
|
|
|
if (NULL != client->config->clientId)
|
|
|
@@ -169,7 +162,7 @@ RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
|
|
|
}
|
|
|
|
|
|
// 清除遗嘱相关配置
|
|
|
- if (RyanTrue == client->lwtFlag && NULL != client->lwtOptions)
|
|
|
+ if (RyanMqttTrue == client->lwtFlag && NULL != client->lwtOptions)
|
|
|
{
|
|
|
if (NULL != client->lwtOptions->topic)
|
|
|
platformMemoryFree(client->lwtOptions->topic);
|
|
|
@@ -196,9 +189,9 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
|
|
|
{
|
|
|
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(mqttInitState == client->clientState, RyanMqttFailedError, ulog_d);
|
|
|
- RyanMqttSetClientState(client, mqttStartState);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttInitState == client->clientState, RyanMqttFailedError, rlog_d);
|
|
|
+ RyanMqttSetClientState(client, RyanMqttStartState);
|
|
|
// 连接成功,需要初始化 MQTT 线程
|
|
|
result = platformThreadInit(client->config->userData,
|
|
|
client->mqttThread,
|
|
|
@@ -208,7 +201,7 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
|
|
|
client->config->taskStack,
|
|
|
client->config->taskPrio);
|
|
|
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, ulog_d, { RyanMqttSetClientState(client, mqttInitState); });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttSetClientState(client, RyanMqttInitState); });
|
|
|
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|
|
|
@@ -217,19 +210,19 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
|
|
|
* @brief 断开mqtt服务器连接
|
|
|
*
|
|
|
* @param client
|
|
|
- * @param sendDiscFlag RyanTrue表示发送断开连接报文,RyanFalse表示不发送断开连接报文
|
|
|
+ * @param sendDiscFlag RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
|
|
|
* @return RyanMqttError_e
|
|
|
*/
|
|
|
-RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanBool_e sendDiscFlag)
|
|
|
+RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
|
|
|
{
|
|
|
|
|
|
int32_t connectState = RyanMqttConnectAccepted;
|
|
|
int32_t packetLen = 0;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(mqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
|
|
|
- if (RyanTrue == sendDiscFlag)
|
|
|
+ if (RyanMqttTrue == sendDiscFlag)
|
|
|
{
|
|
|
platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
// 序列化断开连接数据包并发送
|
|
|
@@ -255,9 +248,9 @@ RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanBool_e sendDisc
|
|
|
RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
|
|
|
{
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != client->mqttThread, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(mqttDisconnectState != RyanMqttGetClientState(client), RyanMqttConnectError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != client->mqttThread, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttDisconnectState != RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
|
|
|
|
|
|
RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL);
|
|
|
platformThreadStart(client->config->userData, client->mqttThread);
|
|
|
@@ -282,22 +275,22 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
|
|
|
MQTTString topicName = MQTTString_initializer;
|
|
|
topicName.cstring = (char *)topic;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(QOS0 <= qos && QOS2 >= qos, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(mqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
|
|
|
platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
packetId = RyanMqttGetNextPacketId(client);
|
|
|
|
|
|
packetLen = MQTTSerialize_subscribe((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName, (int *)&qos);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, ulog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttMsgHandlerCreate(topic, strlen(topic), qos, &msgHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, {
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
|
|
|
platformMemoryFree(msgHandler);
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock);
|
|
|
});
|
|
|
@@ -311,7 +304,7 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
@@ -332,23 +325,23 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
|
|
|
MQTTString topicName = MQTTString_initializer;
|
|
|
topicName.cstring = topic;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(mqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
|
|
|
// 查找当前主题是否已经订阅,没有订阅就取消发送
|
|
|
- result = RyanMqttMsgHandlerFind(client, topicName.cstring, strlen(topicName.cstring), RyanFalse, &msgHandler);
|
|
|
- RyanMqttCheck(RyanMqttSuccessError == result, result, ulog_d);
|
|
|
+ result = RyanMqttMsgHandlerFind(client, topicName.cstring, strlen(topicName.cstring), RyanMqttFalse, &msgHandler);
|
|
|
+ RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
|
|
|
|
|
|
platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
packetId = RyanMqttGetNextPacketId(client);
|
|
|
|
|
|
packetLen = MQTTSerialize_unsubscribe((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, ulog_d,
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
|
|
|
{ platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d,
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
|
|
|
{ platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
@@ -360,7 +353,7 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
|
|
|
return result;
|
|
|
}
|
|
|
@@ -376,7 +369,7 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
|
|
|
* @param retain
|
|
|
* @return RyanMqttError_e
|
|
|
*/
|
|
|
-RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanBool_e retain)
|
|
|
+RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
|
|
|
{
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
int32_t packetLen = 0;
|
|
|
@@ -386,22 +379,22 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
RyanMqttAckHandler_t *ackHandler = NULL;
|
|
|
topicName.cstring = (char *)topic;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(QOS0 <= qos && QOS2 >= qos, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(RyanTrue == retain || RyanFalse == retain, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(mqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
|
|
|
|
|
|
if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
|
|
|
return RyanMqttParamInvalidError;
|
|
|
|
|
|
- if (QOS0 == qos)
|
|
|
+ if (RyanMqttQos0 == qos)
|
|
|
{
|
|
|
platformMutexLock(client->config->userData, client->sendBufLock); // 获取互斥锁
|
|
|
packetLen = MQTTSerialize_publish((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
|
|
|
topicName, (uint8_t *)payload, payloadLen);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, ulog_d,
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
|
|
|
{ platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
@@ -415,29 +408,29 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
|
|
|
packetLen = MQTTSerialize_publish((uint8_t *)client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
|
|
|
topicName, (uint8_t *)payload, payloadLen);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, ulog_d,
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
|
|
|
{ platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttMsgHandlerCreate(topic, strlen(topic), qos, &msgHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d,
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d,
|
|
|
{ platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
- result = RyanMqttAckHandlerCreate(client, (QOS1 == qos) ? PUBACK : PUBREC, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, {
|
|
|
+ result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
|
|
|
platformMemoryFree(msgHandler);
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock);
|
|
|
});
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
// 确定节点是否已存在,存在就删除,理论上不会存在
|
|
|
- result = RyanMqttAckListNodeFind(client, (QOS1 == qos) ? PUBACK : PUBREC, packetId, &ackHandler);
|
|
|
+ result = RyanMqttAckListNodeFind(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, &ackHandler);
|
|
|
if (RyanMqttSuccessError == result)
|
|
|
RyanMqttAckHandlerDestroy(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
|
|
|
// 提前设置重发标志位
|
|
|
RyanMqttSetPublishDup(&ackHandler->packet[0], 1);
|
|
|
@@ -455,7 +448,7 @@ RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
|
|
|
{
|
|
|
|
|
|
if (NULL == client)
|
|
|
- return mqttInvalidState;
|
|
|
+ return RyanMqttInvalidState;
|
|
|
|
|
|
return RyanMqttGetClientState(client);
|
|
|
}
|
|
|
@@ -476,9 +469,9 @@ RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandle
|
|
|
*next = NULL;
|
|
|
RyanMqttMsgHandler_t *msgHandler = NULL;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, rlog_d);
|
|
|
|
|
|
*subscribeNum = 0;
|
|
|
|
|
|
@@ -514,8 +507,8 @@ RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandle
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
RyanMqttClientConfig_t *clientConfig = NULL;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, rlog_d);
|
|
|
|
|
|
RyanMqttCheck(NULL != client->config, RyanMqttNoRescourceError);
|
|
|
|
|
|
@@ -566,35 +559,35 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
|
|
|
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != clientConfig->port, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != clientConfig->userName, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != clientConfig->password, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(2 < clientConfig->recvBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->recvBufferSize, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(2 < clientConfig->sendBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->sendBufferSize, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != clientConfig->port, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != clientConfig->userName, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != clientConfig->password, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(2 < clientConfig->recvBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->recvBufferSize, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(2 < clientConfig->sendBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->sendBufferSize, RyanMqttParamInvalidError, rlog_d);
|
|
|
|
|
|
- RyanMqttCheckCode(NULL != client->config, RyanMqttParamInvalidError, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(NULL != client->config, RyanMqttParamInvalidError, rlog_d, { goto __exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->clientId, clientConfig->clientId);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->userName, clientConfig->userName);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->password, clientConfig->password);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->host, clientConfig->host);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->port, clientConfig->port);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->taskName, clientConfig->taskName);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
|
|
|
|
|
|
client->config->taskPrio = clientConfig->taskPrio;
|
|
|
client->config->taskStack = clientConfig->taskStack;
|
|
|
@@ -618,20 +611,20 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
|
|
|
client->config->recvBuffer = clientConfig->recvBuffer;
|
|
|
client->config->sendBuffer = clientConfig->sendBuffer;
|
|
|
|
|
|
- if (RyanTrue != client->config->recvBufferStaticFlag)
|
|
|
+ if (RyanMqttTrue != client->config->recvBufferStaticFlag)
|
|
|
{
|
|
|
client->config->recvBuffer = (char *)platformMemoryMalloc(client->config->recvBufferSize);
|
|
|
- RyanMqttCheckCode(NULL != client->config->recvBuffer, RyanMqttFailedError, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(NULL != client->config->recvBuffer, RyanMqttFailedError, rlog_d, { goto __exit; });
|
|
|
}
|
|
|
|
|
|
- if (RyanTrue != client->config->sendBufferStaticFlag)
|
|
|
+ if (RyanMqttTrue != client->config->sendBufferStaticFlag)
|
|
|
{
|
|
|
client->config->sendBuffer = (char *)platformMemoryMalloc(client->config->sendBufferSize);
|
|
|
- RyanMqttCheckCode(NULL != client->config->sendBuffer, RyanMqttFailedError, ulog_d, { goto exit; });
|
|
|
+ RyanMqttCheckCode(NULL != client->config->sendBuffer, RyanMqttFailedError, rlog_d, { goto __exit; });
|
|
|
}
|
|
|
return RyanMqttSuccessError;
|
|
|
|
|
|
-exit:
|
|
|
+__exit:
|
|
|
RyanMqttDestroy(client);
|
|
|
return RyanMqttFailedError;
|
|
|
}
|
|
|
@@ -648,24 +641,24 @@ exit:
|
|
|
* @param payload
|
|
|
* @return RyanMqttError_e
|
|
|
*/
|
|
|
-RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanBool_e retain)
|
|
|
+RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
|
|
|
{
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(QOS0 <= qos && QOS2 >= qos, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(RyanTrue == retain || RyanFalse == retain, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(NULL == client->lwtOptions, RyanMqttFailedError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(NULL == client->lwtOptions, RyanMqttFailedError, rlog_d);
|
|
|
|
|
|
if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
|
|
|
return RyanMqttParamInvalidError;
|
|
|
|
|
|
client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t) + payloadLen);
|
|
|
- RyanMqttCheck(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, rlog_d);
|
|
|
memset(client->lwtOptions, 0, sizeof(lwtOptions_t) + payloadLen);
|
|
|
|
|
|
- client->lwtFlag = RyanTrue;
|
|
|
+ client->lwtFlag = RyanMqttTrue;
|
|
|
client->lwtOptions->qos = qos;
|
|
|
client->lwtOptions->retain = retain;
|
|
|
client->lwtOptions->payloadLen = payloadLen;
|
|
|
@@ -673,7 +666,7 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
|
|
|
memcpy(client->lwtOptions->payload, payload, payloadLen);
|
|
|
|
|
|
result = RyanMqttStringCopy(&client->lwtOptions->topic, topicName, strlen(topicName));
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, ulog_d, { platformMemoryFree(client->lwtOptions); });
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(client->lwtOptions); });
|
|
|
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|
|
|
@@ -689,13 +682,13 @@ RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, enum msgType
|
|
|
{
|
|
|
RyanMqttError_e result = RyanMqttSuccessError;
|
|
|
RyanMqttAckHandler_t *ackHandler = NULL;
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(CONNECT <= packetType && DISCONNECT >= packetType, RyanMqttParamInvalidError, ulog_d);
|
|
|
- RyanMqttCheck(0 < packetId, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(CONNECT <= packetType && DISCONNECT >= packetType, RyanMqttParamInvalidError, rlog_d);
|
|
|
+ RyanMqttCheck(0 < packetId, RyanMqttParamInvalidError, rlog_d);
|
|
|
|
|
|
// 删除pubrel记录
|
|
|
result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
|
|
|
- RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttNoRescourceError, ulog_d);
|
|
|
+ RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttNoRescourceError, rlog_d);
|
|
|
|
|
|
RyanMqttEventMachine(client, RyanMqttEventAckHandlerdiscard, (void *)ackHandler); // 回调函数
|
|
|
|
|
|
@@ -705,14 +698,14 @@ RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, enum msgType
|
|
|
|
|
|
RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
|
|
|
{
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
client->eventFlag |= eventId;
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|
|
|
|
|
|
RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
|
|
|
{
|
|
|
- RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, ulog_d);
|
|
|
+ RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
|
|
|
client->eventFlag &= ~eventId;
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|