|
|
@@ -53,19 +53,19 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
|
|
|
|
|
|
// 网络接口初始化
|
|
|
client->network = (platformNetwork_t *)platformMemoryMalloc(sizeof(platformNetwork_t));
|
|
|
- RyanMqttCheckCode(NULL != client->network, RyanMqttNotEnoughMemError, RyanMqttDestroy(client));
|
|
|
+ RyanMqttCheckCode(NULL != client->network, RyanMqttNotEnoughMemError, { RyanMqttDestroy(client); });
|
|
|
memset(client->network, 0, sizeof(platformNetwork_t));
|
|
|
|
|
|
client->config = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
|
|
|
- RyanMqttCheckCode(NULL != client->config, RyanMqttNotEnoughMemError, RyanMqttDestroy(client));
|
|
|
+ RyanMqttCheckCode(NULL != client->config, RyanMqttNotEnoughMemError, { RyanMqttDestroy(client); });
|
|
|
memset(client->config, 0, sizeof(RyanMqttClientConfig_t));
|
|
|
|
|
|
client->mqttThread = platformMemoryMalloc(sizeof(platformThread_t));
|
|
|
- RyanMqttCheckCode(NULL != client->mqttThread, RyanMqttNotEnoughMemError, RyanMqttDestroy(client));
|
|
|
+ RyanMqttCheckCode(NULL != client->mqttThread, RyanMqttNotEnoughMemError, { RyanMqttDestroy(client); });
|
|
|
memset(client->mqttThread, 0, sizeof(platformThread_t));
|
|
|
|
|
|
client->sendBufLock = platformMemoryMalloc(sizeof(platformMutex_t));
|
|
|
- RyanMqttCheckCode(NULL != client->sendBufLock, RyanMqttNotEnoughMemError, RyanMqttDestroy(client));
|
|
|
+ RyanMqttCheckCode(NULL != client->sendBufLock, RyanMqttNotEnoughMemError, { RyanMqttDestroy(client); });
|
|
|
memset(client->sendBufLock, 0, sizeof(platformMutex_t));
|
|
|
|
|
|
client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
|
|
|
@@ -78,8 +78,8 @@ RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
|
|
|
|
|
|
platformMutexInit(client->config->userData, client->sendBufLock); // 初始化发送缓冲区互斥锁
|
|
|
|
|
|
- RyanMqttListInit(&client->msgHandlerList);
|
|
|
- RyanMqttListInit(&client->ackHandlerList);
|
|
|
+ RyanListInit(&client->msgHandlerList);
|
|
|
+ RyanListInit(&client->ackHandlerList);
|
|
|
platformTimerInit(&client->keepaliveTimer);
|
|
|
|
|
|
RyanMqttSetClientState(client, mqttInitState);
|
|
|
@@ -186,7 +186,6 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
|
|
|
RyanMqttCheck(NULL != client, RyanMqttParamInvalidError);
|
|
|
RyanMqttCheck(mqttInitState == client->clientState, RyanMqttFailedError);
|
|
|
|
|
|
- RyanMqttSetClientState(client, mqttStartState);
|
|
|
// 连接成功,需要初始化 MQTT 线程
|
|
|
result = platformThreadInit(client->config->userData,
|
|
|
client->mqttThread,
|
|
|
@@ -195,9 +194,9 @@ RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
|
|
|
client,
|
|
|
client->config->taskStack,
|
|
|
client->config->taskPrio);
|
|
|
- RyanMqttCheckCode(NULL == result, RyanMqttNotEnoughMemError,
|
|
|
- RyanMqttSetClientState(client, mqttInitState));
|
|
|
+ RyanMqttCheckCode(NULL == result, RyanMqttNotEnoughMemError, { RyanMqttSetClientState(client, mqttInitState); });
|
|
|
|
|
|
+ RyanMqttSetClientState(client, mqttStartState);
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|
|
|
|
|
|
@@ -279,17 +278,16 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
|
|
|
packetId = RyanMqttGetNextPacketId(client);
|
|
|
|
|
|
packetLen = MQTTSerialize_subscribe(client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName, &qos);
|
|
|
- RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttMsgHandlerCreate(topic, strlen(topic), qos, &msgHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- platformMemoryFree(msgHandler);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, {
|
|
|
+ platformMemoryFree(msgHandler);
|
|
|
+ platformMutexUnLock(client->config->userData, client->sendBufLock);
|
|
|
+ });
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
// 确定节点是否已存在,存在就删除
|
|
|
@@ -300,8 +298,7 @@ RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqt
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttAckHandlerDestroy(client, ackHandler));
|
|
|
-
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
@@ -335,11 +332,11 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
|
|
|
|
|
|
packetLen = MQTTSerialize_unsubscribe(client->config->sendBuffer, client->config->sendBufferSize, 0, packetId, 1, &topicName);
|
|
|
RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
// 确定节点是否已存在,存在就删除
|
|
|
@@ -350,7 +347,7 @@ RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttAckHandlerDestroy(client, ackHandler));
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
|
|
|
return result;
|
|
|
}
|
|
|
@@ -392,7 +389,7 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
packetLen = MQTTSerialize_publish(client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
|
|
|
topicName, payload, payloadLen);
|
|
|
RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttSendPacket(client, client->config->sendBuffer, packetLen);
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
@@ -406,16 +403,17 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
packetLen = MQTTSerialize_publish(client->config->sendBuffer, client->config->sendBufferSize, 0, qos, retain, packetId,
|
|
|
topicName, payload, payloadLen);
|
|
|
RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttMsgHandlerCreate(topic, strlen(topic), qos, &msgHandler);
|
|
|
RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ { platformMutexUnLock(client->config->userData, client->sendBufLock); });
|
|
|
|
|
|
result = RyanMqttAckHandlerCreate(client, (QOS1 == qos) ? PUBACK : PUBREC, packetId, packetLen, msgHandler, &ackHandler);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- platformMemoryFree(msgHandler);
|
|
|
- platformMutexUnLock(client->config->userData, client->sendBufLock));
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, {
|
|
|
+ platformMemoryFree(msgHandler);
|
|
|
+ platformMutexUnLock(client->config->userData, client->sendBufLock);
|
|
|
+ });
|
|
|
platformMutexUnLock(client->config->userData, client->sendBufLock); // 释放互斥锁
|
|
|
|
|
|
// 确定节点是否已存在,存在就删除,理论上不会存在
|
|
|
@@ -426,8 +424,7 @@ RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *pay
|
|
|
result = RyanMqttAckListAdd(client, ackHandler);
|
|
|
|
|
|
result = RyanMqttSendPacket(client, ackHandler->packet, ackHandler->packetLen);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- RyanMqttAckHandlerDestroy(client, ackHandler));
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { RyanMqttAckHandlerDestroy(client, ackHandler); });
|
|
|
|
|
|
// 提前设置重发标志位
|
|
|
RyanMqttSetPublishDup(&ackHandler->packet[0], 1);
|
|
|
@@ -567,25 +564,25 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
|
|
|
RyanMqttCheck(2 < clientConfig->recvBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->recvBufferSize, RyanMqttParamInvalidError);
|
|
|
RyanMqttCheck(2 < clientConfig->sendBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->sendBufferSize, RyanMqttParamInvalidError);
|
|
|
|
|
|
- RyanMqttCheckCode(NULL != client->config, RyanMqttParamInvalidError, goto exit);
|
|
|
+ RyanMqttCheckCode(NULL != client->config, RyanMqttParamInvalidError, { goto exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->clientId, clientConfig->clientId);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, goto exit);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { goto exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->userName, clientConfig->userName);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, goto exit);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { goto exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->password, clientConfig->password);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, goto exit);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { goto exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->host, clientConfig->host);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, goto exit);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { goto exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->port, clientConfig->port);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, goto exit);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { goto exit; });
|
|
|
|
|
|
result = setConfigValue(&client->config->taskName, clientConfig->taskName);
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result, goto exit);
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { goto exit; });
|
|
|
|
|
|
client->config->taskPrio = clientConfig->taskPrio;
|
|
|
client->config->taskStack = clientConfig->taskStack;
|
|
|
@@ -612,13 +609,13 @@ RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig
|
|
|
if (RyanTrue != client->config->recvBufferStaticFlag)
|
|
|
{
|
|
|
client->config->recvBuffer = (char *)platformMemoryMalloc(client->config->recvBufferSize);
|
|
|
- RyanMqttCheckCode(NULL != client->config->recvBuffer, RyanMqttFailedError, goto exit);
|
|
|
+ RyanMqttCheckCode(NULL != client->config->recvBuffer, RyanMqttFailedError, { goto exit; });
|
|
|
}
|
|
|
|
|
|
if (RyanTrue != client->config->sendBufferStaticFlag)
|
|
|
{
|
|
|
client->config->sendBuffer = (char *)platformMemoryMalloc(client->config->sendBufferSize);
|
|
|
- RyanMqttCheckCode(NULL != client->config->sendBuffer, RyanMqttFailedError, goto exit);
|
|
|
+ RyanMqttCheckCode(NULL != client->config->sendBuffer, RyanMqttFailedError, { goto exit; });
|
|
|
}
|
|
|
return RyanMqttSuccessError;
|
|
|
|
|
|
@@ -664,8 +661,7 @@ RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *
|
|
|
memcpy(client->lwtOptions->payload, payload, payloadLen);
|
|
|
|
|
|
result = RyanMqttStringCopy(&client->lwtOptions->topic, topicName, strlen(topicName));
|
|
|
- RyanMqttCheckCode(RyanMqttSuccessError == result, result,
|
|
|
- platformMemoryFree(client->lwtOptions));
|
|
|
+ RyanMqttCheckCode(RyanMqttSuccessError == result, result, { platformMemoryFree(client->lwtOptions); });
|
|
|
|
|
|
return RyanMqttSuccessError;
|
|
|
}
|