RyanMqttClient.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. #define rlogLevel (rlogLvlInfo) // 日志打印等级
  2. #include "RyanMqttLog.h"
  3. #include "RyanMqttClient.h"
  4. #include "RyanMqttUtile.h"
  5. #include "RyanMqttThread.h"
  6. /**
  7. * @brief Returns the next valid MQTT packet identifier for the client.
  8. *
  9. * Ensures the packet identifier is non-zero and wraps around to 1 if it exceeds the maximum allowed value.
  10. * The operation is performed within a critical section to guarantee thread safety.
  11. *
  12. * @return uint16_t The next packet identifier to use for MQTT packets.
  13. */
  14. static uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
  15. {
  16. uint16_t packetId;
  17. RyanMqttAssert(NULL != client);
  18. platformCriticalEnter(client->config.userData, &client->criticalLock);
  19. if (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1)
  20. client->packetId = 1;
  21. else
  22. client->packetId++;
  23. packetId = client->packetId;
  24. platformCriticalExit(client->config.userData, &client->criticalLock);
  25. return packetId;
  26. }
  27. /**
  28. * @brief Sets a configuration string value by copying a new string and freeing the previous value.
  29. *
  30. * Frees the memory pointed to by `*dest`, copies the string from `rest` into newly allocated memory, and updates `*dest` to point to the new string.
  31. *
  32. * @param dest Pointer to the destination string pointer to update.
  33. * @param rest Source string to copy.
  34. * @return RyanMqttSuccessError if the operation succeeds, RyanMqttFailedError if memory allocation fails.
  35. */
  36. static RyanMqttError_e setConfigValue(char **dest, char const *const rest)
  37. {
  38. RyanMqttAssert(NULL != dest && NULL != rest);
  39. // if (0 == strcmp(*dest, rest))
  40. // return RyanMqttSuccessError;
  41. platformMemoryFree(*dest);
  42. RyanMqttStringCopy(dest, (char *)rest, strlen(rest));
  43. if (NULL == *dest)
  44. return RyanMqttFailedError;
  45. return RyanMqttSuccessError;
  46. }
  47. /**
  48. * @brief Initializes a new MQTT client instance.
  49. *
  50. * Allocates and initializes all resources and internal structures required for an MQTT client, including synchronization primitives, handler lists, timers, and network interfaces. Sets the client to its initial state and returns a pointer to the created client.
  51. *
  52. * @param pClient Pointer to where the initialized MQTT client instance will be stored.
  53. * @return RyanMqttSuccessError on success, or an error code if initialization fails.
  54. */
  55. RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
  56. {
  57. RyanMqttClient_t *client = NULL;
  58. RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, rlog_d);
  59. client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
  60. RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, rlog_d);
  61. memset(client, 0, sizeof(RyanMqttClient_t));
  62. platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
  63. client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
  64. client->clientState = RyanMqttInitState;
  65. client->eventFlag = 0;
  66. client->ackHandlerCount = 0;
  67. client->lwtFlag = RyanMqttFalse;
  68. platformMutexInit(client->config.userData, &client->sendBufLock); // 初始化发送缓冲区互斥锁
  69. RyanListInit(&client->msgHandlerList);
  70. platformMutexInit(client->config.userData, &client->msgHandleLock);
  71. RyanListInit(&client->ackHandlerList);
  72. platformMutexInit(client->config.userData, &client->ackHandleLock);
  73. RyanListInit(&client->userAckHandlerList);
  74. platformMutexInit(client->config.userData, &client->userAckHandleLock);
  75. RyanMqttSetClientState(client, RyanMqttInitState);
  76. platformTimerInit(&client->keepaliveTimer);
  77. platformNetworkInit(client->config.userData, &client->network); // 网络接口初始化
  78. *pClient = client;
  79. return RyanMqttSuccessError;
  80. }
  81. /**
  82. * @brief 销毁mqtt客户端
  83. * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
  84. * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
  85. * !mqtt删除自己前会调用 RyanMqttEventDestoryBefore 事件回调
  86. * !调用此函数后就不应该再对该客户端进行任何操作
  87. * @param client
  88. * @return RyanMqttError_e
  89. */
  90. RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
  91. {
  92. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  93. platformCriticalEnter(client->config.userData, &client->criticalLock);
  94. client->destoryFlag = RyanMqttTrue;
  95. platformCriticalExit(client->config.userData, &client->criticalLock);
  96. return RyanMqttSuccessError;
  97. }
  98. /**
  99. * @brief Starts the MQTT client and creates the MQTT processing thread.
  100. *
  101. * Transitions the client from the initialization state to the start state and initializes the MQTT thread.
  102. * Returns an error if called when the client is not in the initialization state or if thread creation fails.
  103. *
  104. * @return RyanMqttError_e Error code indicating the result of the operation.
  105. */
  106. RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
  107. {
  108. RyanMqttError_e result = RyanMqttSuccessError;
  109. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  110. RyanMqttCheck(RyanMqttInitState == RyanMqttGetClientState(client), RyanMqttFailedError, rlog_d);
  111. RyanMqttSetClientState(client, RyanMqttStartState);
  112. // 连接成功,需要初始化 MQTT 线程
  113. result = platformThreadInit(client->config.userData,
  114. &client->mqttThread,
  115. client->config.taskName,
  116. RyanMqttThread,
  117. client,
  118. client->config.taskStack,
  119. client->config.taskPrio);
  120. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttSetClientState(client, RyanMqttInitState); });
  121. return RyanMqttSuccessError;
  122. }
  123. /**
  124. * @brief Disconnects the MQTT client from the server.
  125. *
  126. * If requested, sends a DISCONNECT packet to the MQTT server before disconnecting. Updates the client state to reflect the disconnection.
  127. *
  128. * @param sendDiscFlag If RyanMqttTrue, a DISCONNECT packet is sent to the server; if RyanMqttFalse, no packet is sent.
  129. * @return RyanMqttError_e Result of the disconnect operation.
  130. */
  131. RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
  132. {
  133. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  134. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  135. if (RyanMqttTrue == sendDiscFlag)
  136. {
  137. MQTTStatus_t status = MQTTSuccess;
  138. MQTTFixedBuffer_t fixedBuffer = {0};
  139. // 获取断开连接的数据包大小
  140. status = MQTT_GetDisconnectPacketSize(&fixedBuffer.size);
  141. RyanMqttAssert(MQTTSuccess == status);
  142. // 申请断开连接数据包的空间
  143. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  144. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d);
  145. // 序列化断开连接数据包
  146. status = MQTT_SerializeDisconnect(&fixedBuffer);
  147. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  148. // 发送断开连接数据包
  149. RyanMqttError_e result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  150. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  151. platformMemoryFree(fixedBuffer.pBuffer);
  152. }
  153. RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
  154. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  155. return RyanMqttSuccessError;
  156. }
  157. /**
  158. * @brief 手动重连mqtt客户端
  159. * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
  160. * ! 否则可能会造成内存泄漏
  161. *
  162. * @param client
  163. * @return RyanMqttError_e
  164. */
  165. RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
  166. {
  167. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  168. RyanMqttCheck(RyanMqttDisconnectState == RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
  169. if (RyanMqttTrue == client->config.autoReconnectFlag)
  170. return RyanMqttNoRescourceError;
  171. platformThreadStart(client->config.userData, &client->mqttThread);
  172. return RyanMqttSuccessError;
  173. }
  174. /**
  175. * @brief Subscribes the MQTT client to a specified topic with a given QoS level.
  176. *
  177. * Allocates and serializes a SUBSCRIBE packet, creates message and acknowledgment handlers, and sends the subscription request to the MQTT broker. The granted QoS may be lower than requested and can be checked in the subscription success callback.
  178. *
  179. * @param topic The topic filter to subscribe to.
  180. * @param qos The requested Quality of Service level for the subscription.
  181. * @return RyanMqttError_e Result code indicating success or the type of error encountered.
  182. */
  183. RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
  184. {
  185. RyanMqttError_e result = RyanMqttSuccessError;
  186. uint16_t packetId = 0;
  187. RyanMqttMsgHandler_t *msgHandler = NULL;
  188. RyanMqttAckHandler_t *userAckHandler = NULL;
  189. MQTTSubscribeInfo_t subscriptionList[1] = {0};
  190. MQTTStatus_t status = MQTTSuccess;
  191. MQTTFixedBuffer_t fixedBuffer = {0};
  192. size_t remainingLength = 0;
  193. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  194. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  195. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  196. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  197. subscriptionList[0].qos = (MQTTQoS_t)qos;
  198. subscriptionList[0].pTopicFilter = topic;
  199. subscriptionList[0].topicFilterLength = strlen(topic);
  200. rlog_w("qos: %d", subscriptionList[0].qos);
  201. // 获取数据包大小
  202. status = MQTT_GetSubscribePacketSize(subscriptionList, 1, &remainingLength, &fixedBuffer.size);
  203. RyanMqttAssert(MQTTSuccess == status);
  204. // 申请数据包的空间
  205. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  206. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d);
  207. // 序列化数据包
  208. packetId = RyanMqttGetNextPacketId(client);
  209. status = MQTT_SerializeSubscribe(subscriptionList, 1, packetId, remainingLength, &fixedBuffer);
  210. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  211. // 创建msg包
  212. result = RyanMqttMsgHandlerCreate(client, subscriptionList[0].pTopicFilter, subscriptionList[0].topicFilterLength, (RyanMqttQos_e)subscriptionList[0].qos, &msgHandler);
  213. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  214. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_SUBACK,
  215. packetId, 0, NULL, msgHandler, &userAckHandler, RyanMqttFalse);
  216. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  217. platformMemoryFree(fixedBuffer.pBuffer);
  218. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  219. });
  220. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  221. // 如果发送失败就清除ack链表,创建ack链表必须在发送前
  222. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  223. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  224. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  225. RyanMqttAckHandlerDestroy(client, userAckHandler);
  226. });
  227. platformMemoryFree(fixedBuffer.pBuffer);
  228. return result;
  229. }
  230. /**
  231. * @brief Unsubscribes the client from a specified MQTT topic.
  232. *
  233. * Attempts to remove the subscription for the given topic if it exists. Serializes and sends an UNSUBSCRIBE packet to the MQTT broker, and manages acknowledgment handlers for the operation.
  234. *
  235. * @param topic The topic string to unsubscribe from.
  236. * @return RyanMqttError_e Result code indicating success or the type of failure.
  237. */
  238. RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
  239. {
  240. RyanMqttError_e result = RyanMqttFailedError;
  241. uint16_t packetId;
  242. RyanMqttMsgHandler_t *subMsgHandler = NULL;
  243. RyanMqttMsgHandler_t *msgHandler = NULL;
  244. RyanMqttAckHandler_t *userAckHandler = NULL;
  245. MQTTSubscribeInfo_t subscriptionList[1] = {0};
  246. MQTTStatus_t status = MQTTSuccess;
  247. MQTTFixedBuffer_t fixedBuffer = {0};
  248. size_t remainingLength = 0;
  249. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  250. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  251. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  252. subscriptionList[0].qos = (MQTTQoS_t)RyanMqttQos0; // 无效数据
  253. subscriptionList[0].pTopicFilter = topic;
  254. subscriptionList[0].topicFilterLength = strlen(topic);
  255. // 查找当前主题是否已经订阅,没有订阅就取消发送
  256. result = RyanMqttMsgHandlerFind(client, subscriptionList[0].pTopicFilter, subscriptionList[0].topicFilterLength, RyanMqttFalse, &subMsgHandler);
  257. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  258. // 获取数据包大小
  259. status = MQTT_GetUnsubscribePacketSize(subscriptionList, 1, &remainingLength, &fixedBuffer.size);
  260. RyanMqttAssert(MQTTSuccess == status);
  261. // 申请数据包的空间
  262. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  263. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d);
  264. // 序列化数据包
  265. packetId = RyanMqttGetNextPacketId(client);
  266. status = MQTT_SerializeUnsubscribe(subscriptionList, 1, packetId, remainingLength, &fixedBuffer);
  267. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  268. result = RyanMqttMsgHandlerCreate(client, subMsgHandler->topic, subMsgHandler->topicLen, subMsgHandler->qos, &msgHandler);
  269. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  270. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, 0, NULL, msgHandler, &userAckHandler, RyanMqttFalse);
  271. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  272. platformMemoryFree(fixedBuffer.pBuffer);
  273. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  274. });
  275. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  276. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  277. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  278. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  279. RyanMqttAckHandlerDestroy(client, userAckHandler);
  280. });
  281. platformMemoryFree(fixedBuffer.pBuffer);
  282. return result;
  283. }
  284. /**
  285. * @brief Publishes a message to a specified MQTT topic.
  286. *
  287. * Sends a message to the given topic with the specified payload, QoS, and retain flag. For QoS 1 and 2, the function manages acknowledgment handlers to ensure reliable delivery and retransmission if necessary.
  288. *
  289. * @param topic The topic to publish to.
  290. * @param payload The message payload. May be NULL if payloadLen is zero.
  291. * @param payloadLen Length of the payload in bytes.
  292. * @param qos Quality of Service level for the message.
  293. * @param retain Whether the message should be retained by the broker.
  294. * @return RyanMqttError_e Result of the publish operation.
  295. */
  296. RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
  297. {
  298. RyanMqttError_e result = RyanMqttSuccessError;
  299. uint16_t packetId = 0;
  300. MQTTStatus_t status = MQTTSuccess;
  301. MQTTFixedBuffer_t fixedBuffer = {0};
  302. size_t remainingLength = 0;
  303. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  304. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  305. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
  306. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  307. RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
  308. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  309. if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
  310. return RyanMqttParamInvalidError;
  311. // 序列化pub发送包
  312. MQTTPublishInfo_t publishInfo = {
  313. .qos = (MQTTQoS_t)qos,
  314. .pTopicName = topic,
  315. .topicNameLength = strlen(topic),
  316. .pPayload = payload,
  317. .payloadLength = payloadLen,
  318. .retain = retain,
  319. .dup = 0,
  320. };
  321. // 获取数据包大小
  322. status = MQTT_GetPublishPacketSize(&publishInfo, &remainingLength, &fixedBuffer.size);
  323. RyanMqttAssert(MQTTSuccess == status);
  324. // 申请数据包的空间
  325. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  326. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNoRescourceError, rlog_d);
  327. // 序列化数据包
  328. packetId = RyanMqttGetNextPacketId(client);
  329. status = MQTT_SerializePublish(&publishInfo, packetId, remainingLength, &fixedBuffer);
  330. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  331. if (RyanMqttQos0 == qos)
  332. {
  333. // 发送报文
  334. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  335. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  336. platformMemoryFree(fixedBuffer.pBuffer);
  337. }
  338. else
  339. {
  340. RyanMqttMsgHandler_t *msgHandler = NULL;
  341. RyanMqttAckHandler_t *userAckHandler = NULL;
  342. // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
  343. result = RyanMqttMsgHandlerCreate(client, publishInfo.pTopicName, publishInfo.topicNameLength, qos, &msgHandler);
  344. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(fixedBuffer.pBuffer); });
  345. result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC, packetId, fixedBuffer.size, fixedBuffer.pBuffer, msgHandler, &userAckHandler, RyanMqttTrue);
  346. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  347. platformMemoryFree(fixedBuffer.pBuffer);
  348. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  349. });
  350. // 一定要先加再send,要不可能返回消息会比这个更快执行呢
  351. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  352. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  353. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  354. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  355. RyanMqttAckHandlerDestroy(client, userAckHandler);
  356. });
  357. }
  358. return RyanMqttSuccessError;
  359. }
  360. /**
  361. * @brief 获取mqtt客户端状态
  362. *
  363. * @param client
  364. * @return RyanMqttState_e
  365. */
  366. RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
  367. {
  368. if (NULL == client)
  369. return RyanMqttInvalidState;
  370. return RyanMqttGetClientState(client);
  371. }
  372. /**
  373. * @brief Retrieves the list of currently subscribed topics and their QoS levels.
  374. *
  375. * Copies up to `msgHandleSize` subscribed topics and their QoS values into the provided array.
  376. * The actual number of subscriptions copied is returned via `subscribeNum`.
  377. *
  378. * @param msgHandles Array to receive the subscribed topics and QoS values.
  379. * @param msgHandleSize Maximum number of entries to copy into `msgHandles`.
  380. * @param subscribeNum Pointer to an integer where the number of subscriptions copied will be stored.
  381. * @return RyanMqttError_e Returns RyanMqttSuccessError on success, or RyanMqttNoRescourceError if the provided array is too small.
  382. */
  383. RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize, int32_t *subscribeNum)
  384. {
  385. RyanMqttError_e result = RyanMqttSuccessError;
  386. RyanList_t *curr = NULL,
  387. *next = NULL;
  388. RyanMqttMsgHandler_t *msgHandler = NULL;
  389. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  390. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, rlog_d);
  391. RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, rlog_d);
  392. RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, rlog_d);
  393. *subscribeNum = 0;
  394. platformMutexLock(client->config.userData, &client->msgHandleLock);
  395. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  396. {
  397. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  398. msgHandles[*subscribeNum].topic = msgHandler->topic;
  399. msgHandles[*subscribeNum].qos = msgHandler->qos;
  400. (*subscribeNum)++;
  401. if (*subscribeNum >= msgHandleSize)
  402. {
  403. result = RyanMqttNoRescourceError;
  404. goto __next;
  405. }
  406. }
  407. __next:
  408. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  409. return result;
  410. }
  411. /**
  412. * @brief Retrieves the total number of subscribed topics for the MQTT client.
  413. *
  414. * @param subscribeTotalCount Pointer to an integer where the total count will be stored.
  415. * @return RyanMqttSuccessError on success, or RyanMqttParamInvalidError if parameters are invalid.
  416. */
  417. RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t *subscribeTotalCount)
  418. {
  419. RyanList_t *curr = NULL,
  420. *next = NULL;
  421. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  422. RyanMqttCheck(NULL != subscribeTotalCount, RyanMqttParamInvalidError, rlog_d);
  423. *subscribeTotalCount = 0;
  424. platformMutexLock(client->config.userData, &client->msgHandleLock);
  425. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  426. {
  427. (*subscribeTotalCount)++;
  428. }
  429. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  430. return RyanMqttSuccessError;
  431. }
  432. /**
  433. * @brief 获取mqtt config
  434. * 使用完毕后,需要用户释放pclientConfig指针内容
  435. *
  436. * @param client
  437. * @param pclientConfig
  438. * @return RyanMqttError_e
  439. */
  440. /* RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
  441. {
  442. RyanMqttError_e result = RyanMqttSuccessError;
  443. RyanMqttClientConfig_t *clientConfig = NULL;
  444. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  445. RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, rlog_d);
  446. RyanMqttCheck(NULL != client->config, RyanMqttNoRescourceError);
  447. clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
  448. RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError);
  449. memcpy(clientConfig, client->config, sizeof(RyanMqttClientConfig_t));
  450. result = setConfigValue(&clientConfig->clientId, client->config->clientId);
  451. RyanMqttCheck(RyanMqttSuccessError == result, result);
  452. result = setConfigValue(&clientConfig->userName, client->config->userName);
  453. RyanMqttCheck(RyanMqttSuccessError == result, result);
  454. result = setConfigValue(&clientConfig->password, client->config->password);
  455. RyanMqttCheck(RyanMqttSuccessError == result, result);
  456. result = setConfigValue(&clientConfig->host, client->config->host);
  457. RyanMqttCheck(RyanMqttSuccessError == result, result);
  458. result = setConfigValue(&clientConfig->port, client->config->port);
  459. RyanMqttCheck(RyanMqttSuccessError == result, result);
  460. result = setConfigValue(&clientConfig->taskName, client->config->taskName);
  461. RyanMqttCheck(RyanMqttSuccessError == result, result);
  462. *pclientConfig = clientConfig;
  463. return RyanMqttSuccessError;
  464. }
  465. */
  466. // todo 增加更多校验,比如判断心跳包和recv的关系
  467. /**
  468. * @brief Sets the MQTT client configuration.
  469. *
  470. * Updates the client's configuration parameters such as client ID, host, credentials, timeouts, and flags. This operation is not thread-safe and must be performed only when the MQTT thread is not running or is safely suspended to avoid resource conflicts. If the function fails, the client is destroyed and further MQTT operations must be stopped.
  471. *
  472. * @return RyanMqttSuccessError on success, RyanMqttFailedError on failure (client is destroyed).
  473. */
  474. RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
  475. {
  476. RyanMqttError_e result = RyanMqttSuccessError;
  477. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  478. RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, rlog_d);
  479. RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, rlog_d);
  480. RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, rlog_d);
  481. RyanMqttCheck(clientConfig->recvTimeout <= clientConfig->keepaliveTimeoutS * 1000 / 2, RyanMqttParamInvalidError, rlog_d);
  482. RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, rlog_d);
  483. result = setConfigValue(&client->config.clientId, clientConfig->clientId);
  484. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  485. if (NULL == clientConfig->userName)
  486. {
  487. client->config.userName = NULL;
  488. }
  489. else
  490. {
  491. result = setConfigValue(&client->config.userName, clientConfig->userName);
  492. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  493. }
  494. if (NULL == clientConfig->password)
  495. {
  496. client->config.password = NULL;
  497. }
  498. else
  499. {
  500. result = setConfigValue(&client->config.password, clientConfig->password);
  501. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  502. }
  503. result = setConfigValue(&client->config.host, clientConfig->host);
  504. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  505. result = setConfigValue(&client->config.taskName, clientConfig->taskName);
  506. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  507. client->config.port = clientConfig->port;
  508. client->config.taskPrio = clientConfig->taskPrio;
  509. client->config.taskStack = clientConfig->taskStack;
  510. client->config.mqttVersion = clientConfig->mqttVersion;
  511. client->config.ackHandlerRepeatCountWarning = clientConfig->ackHandlerRepeatCountWarning;
  512. client->config.ackHandlerCountWarning = clientConfig->ackHandlerCountWarning;
  513. client->config.autoReconnectFlag = clientConfig->autoReconnectFlag;
  514. client->config.cleanSessionFlag = clientConfig->cleanSessionFlag;
  515. client->config.reconnectTimeout = clientConfig->reconnectTimeout;
  516. client->config.recvTimeout = clientConfig->recvTimeout;
  517. client->config.sendTimeout = clientConfig->sendTimeout;
  518. client->config.ackTimeout = clientConfig->ackTimeout;
  519. client->config.keepaliveTimeoutS = clientConfig->keepaliveTimeoutS;
  520. client->config.mqttEventHandle = clientConfig->mqttEventHandle;
  521. client->config.userData = clientConfig->userData;
  522. return RyanMqttSuccessError;
  523. __exit:
  524. RyanMqttDestroy(client);
  525. return RyanMqttFailedError;
  526. }
  527. /**
  528. * @brief Configures the Last Will and Testament (LWT) message for the MQTT client.
  529. *
  530. * Sets the topic, payload, QoS, and retain flag for the LWT message, which will be included in the CONNECT packet sent to the broker. This function must be called before establishing a connection (e.g., before RyanMqttStart or during the RyanMqttEventReconnectBefore event).
  531. *
  532. * @param topicName The topic for the LWT message.
  533. * @param payload The payload for the LWT message. Can be NULL if payloadLen is 0.
  534. * @param payloadLen The length of the payload in bytes.
  535. * @param qos The Quality of Service level for the LWT message.
  536. * @param retain Whether the LWT message should be retained by the broker.
  537. * @return RyanMqttSuccessError on success, or an error code on failure.
  538. */
  539. RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
  540. {
  541. RyanMqttError_e result = RyanMqttSuccessError;
  542. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  543. RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, rlog_d);
  544. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
  545. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  546. RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
  547. if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
  548. return RyanMqttParamInvalidError;
  549. if (NULL != client->lwtOptions.topic)
  550. platformMemoryFree(client->lwtOptions.topic);
  551. if (NULL != client->lwtOptions.payload)
  552. platformMemoryFree(client->lwtOptions.payload);
  553. memset(&client->lwtOptions, 0, sizeof(lwtOptions_t));
  554. result = RyanMqttStringCopy(&client->lwtOptions.payload, payload, payloadLen);
  555. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  556. result = RyanMqttStringCopy(&client->lwtOptions.topic, topicName, strlen(topicName));
  557. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(client->lwtOptions.payload); });
  558. client->lwtFlag = RyanMqttTrue;
  559. client->lwtOptions.qos = qos;
  560. client->lwtOptions.retain = retain;
  561. client->lwtOptions.payloadLen = payloadLen;
  562. return RyanMqttSuccessError;
  563. }
  564. /**
  565. * @brief Discards a specified acknowledgment handler from the MQTT client.
  566. *
  567. * Removes the given acknowledgment handler from the client's acknowledgment list, invokes the associated event callback, and destroys the handler.
  568. *
  569. * @return RyanMqttError_e Result of the operation.
  570. */
  571. RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, RyanMqttAckHandler_t *ackHandler)
  572. {
  573. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  574. RyanMqttEventMachine(client, RyanMqttEventAckHandlerdiscard, (void *)ackHandler); // 回调函数
  575. RyanMqttAckListRemoveToAckList(client, ackHandler);
  576. RyanMqttAckHandlerDestroy(client, ackHandler);
  577. return RyanMqttSuccessError;
  578. }
  579. /**
  580. * @brief Registers an event ID for the MQTT client.
  581. *
  582. * Sets the specified event ID in the client's event flag in a thread-safe manner.
  583. *
  584. * @param eventId The event identifier to register.
  585. * @return RyanMqttSuccessError on success, or RyanMqttParamInvalidError if the client is NULL.
  586. */
  587. RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  588. {
  589. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  590. platformCriticalEnter(client->config.userData, &client->criticalLock);
  591. client->eventFlag |= eventId;
  592. platformCriticalExit(client->config.userData, &client->criticalLock);
  593. return RyanMqttSuccessError;
  594. }
  595. /**
  596. * @brief Cancels a registered event ID for the MQTT client.
  597. *
  598. * Clears the specified event ID from the client's event flag in a thread-safe manner.
  599. *
  600. * @param eventId The event identifier to cancel.
  601. * @return RyanMqttSuccessError on success, or RyanMqttParamInvalidError if the client is NULL.
  602. */
  603. RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  604. {
  605. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  606. platformCriticalEnter(client->config.userData, &client->criticalLock);
  607. client->eventFlag &= ~eventId;
  608. platformCriticalExit(client->config.userData, &client->criticalLock);
  609. return RyanMqttSuccessError;
  610. }