RyanMqttClient.c 40 KB


  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
  3. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  4. #include "RyanMqttClient.h"
  5. #include "RyanMqttThread.h"
  6. #include "RyanMqttUtil.h"
  7. #include "core_mqtt_serializer.h"
  8. /**
  9. * @brief mqtt初始化
  10. *
  11. * @param clientConfig
  12. * @param pClient mqtt客户端指针
  13. * @return RyanMqttError_e
  14. */
  15. RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
  16. {
  17. RyanMqttError_e result = RyanMqttSuccessError;
  18. RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, RyanMqttLog_d);
  19. RyanMqttClient_t *client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
  20. RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  21. RyanMqttMemset(client, 0, sizeof(RyanMqttClient_t));
  22. client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
  23. client->clientState = RyanMqttInitState;
  24. client->eventFlag = 0;
  25. client->ackHandlerCount = 0;
  26. RyanMqttBool_e criticalLockIsOk = RyanMqttFalse;
  27. RyanMqttBool_e sendLockIsOk = RyanMqttFalse;
  28. RyanMqttBool_e msgHandleLockIsOk = RyanMqttFalse;
  29. RyanMqttBool_e ackHandleLockIsOk = RyanMqttFalse;
  30. RyanMqttBool_e userSessionLockIsOk = RyanMqttFalse;
  31. RyanMqttBool_e networkIsOk = RyanMqttFalse;
  32. result = platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
  33. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  34. criticalLockIsOk = RyanMqttTrue;
  35. result = platformMutexInit(client->config.userData, &client->sendLock); // 初始化发送缓冲区互斥锁
  36. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  37. sendLockIsOk = RyanMqttTrue;
  38. result = platformMutexInit(client->config.userData, &client->msgHandleLock);
  39. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  40. msgHandleLockIsOk = RyanMqttTrue;
  41. result = platformMutexInit(client->config.userData, &client->ackHandleLock);
  42. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  43. ackHandleLockIsOk = RyanMqttTrue;
  44. result = platformMutexInit(client->config.userData, &client->userSessionLock);
  45. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  46. userSessionLockIsOk = RyanMqttTrue;
  47. result = platformNetworkInit(client->config.userData, &client->network); // 网络接口初始化
  48. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  49. // networkIsOk = RyanMqttTrue;
  50. RyanMqttListInit(&client->msgHandlerList);
  51. RyanMqttListInit(&client->ackHandlerList);
  52. RyanMqttListInit(&client->userAckHandlerList);
  53. RyanMqttSetClientState(client, RyanMqttInitState);
  54. *pClient = client;
  55. return RyanMqttSuccessError;
  56. __exit:
  57. // 不能按空来判断,不是指针类型
  58. if (criticalLockIsOk)
  59. {
  60. platformCriticalDestroy(client->config.userData, &client->criticalLock);
  61. }
  62. if (sendLockIsOk)
  63. {
  64. platformMutexDestroy(client->config.userData, &client->sendLock);
  65. }
  66. if (msgHandleLockIsOk)
  67. {
  68. platformMutexDestroy(client->config.userData, &client->msgHandleLock);
  69. }
  70. if (ackHandleLockIsOk)
  71. {
  72. platformMutexDestroy(client->config.userData, &client->ackHandleLock);
  73. }
  74. if (userSessionLockIsOk)
  75. {
  76. platformMutexDestroy(client->config.userData, &client->userSessionLock);
  77. }
  78. if (networkIsOk)
  79. {
  80. platformNetworkClose(client->config.userData, &client->network);
  81. platformNetworkDestroy(client->config.userData, &client->network);
  82. }
  83. platformMemoryFree(client);
  84. return result;
  85. }
  86. /**
  87. * @brief 销毁mqtt客户端
  88. * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
  89. * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
  90. * !mqtt删除自己前会调用 RyanMqttEventDestroyBefore 事件回调
  91. * !调用此函数后就不应该再对该客户端进行任何操作
  92. * @param client
  93. * @return RyanMqttError_e
  94. */
  95. RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
  96. {
  97. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  98. platformCriticalEnter(client->config.userData, &client->criticalLock);
  99. client->destroyFlag = RyanMqttTrue;
  100. platformCriticalExit(client->config.userData, &client->criticalLock);
  101. return RyanMqttSuccessError;
  102. }
  103. /**
  104. * @brief 启动mqtt客户端
  105. * !不要重复调用
  106. *
  107. * @param client
  108. * @return RyanMqttError_e
  109. */
  110. RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
  111. {
  112. RyanMqttError_e result = RyanMqttSuccessError;
  113. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  114. RyanMqttCheck(RyanMqttInitState == RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  115. RyanMqttSetClientState(client, RyanMqttStartState);
  116. // 连接成功,需要初始化 MQTT 线程
  117. result = platformThreadInit(client->config.userData, &client->mqttThread, client->config.taskName,
  118. RyanMqttThread, client, client->config.taskStack, client->config.taskPrio);
  119. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNoRescourceError, RyanMqttLog_d,
  120. { RyanMqttSetClientState(client, RyanMqttInitState); });
  121. return result;
  122. }
  123. /**
  124. * @brief 断开mqtt服务器连接
  125. *
  126. * @param client
  127. * @param sendDiscFlag
  128. * RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
  129. * @return RyanMqttError_e
  130. */
  131. RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
  132. {
  133. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  134. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  135. if (RyanMqttTrue == sendDiscFlag)
  136. {
  137. MQTTStatus_t status;
  138. MQTTFixedBuffer_t fixedBuffer;
  139. // 获取断开连接的数据包大小
  140. status = MQTT_GetDisconnectPacketSize(&fixedBuffer.size);
  141. RyanMqttAssert(MQTTSuccess == status);
  142. // 申请断开连接数据包的空间
  143. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  144. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  145. // 序列化断开连接数据包
  146. status = MQTT_SerializeDisconnect(&fixedBuffer);
  147. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
  148. { platformMemoryFree(fixedBuffer.pBuffer); });
  149. // 发送断开连接数据包
  150. RyanMqttError_e result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  151. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  152. { platformMemoryFree(fixedBuffer.pBuffer); });
  153. platformMemoryFree(fixedBuffer.pBuffer);
  154. }
  155. RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
  156. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  157. return RyanMqttSuccessError;
  158. }
  159. // todo 这里考虑要不要使用信号量来实现吧,会增加platform厚度,目前不想加。最好用自动重连
  160. // ?现在取巧使用线程的暂停和恢复实现,如果mqtt线程还没有暂停,用户就调用这个函数就会没有效果。
  161. // ?用户不用自动重连的话,也可以通过一直判断 client 的状态是不是为 RyanMqttDisconnectState 是的话可以调用
  162. // ?RyanMqttReconnect。 最推荐的是用自动重连
  163. /**
  164. * @brief 手动重连mqtt客户端
  165. * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
  166. *
  167. * @param client
  168. * @return RyanMqttError_e
  169. */
  170. RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
  171. {
  172. RyanMqttError_e result = RyanMqttSuccessError;
  173. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  174. RyanMqttCheck(RyanMqttDisconnectState == RyanMqttGetClientState(client), RyanMqttConnectError, RyanMqttLog_d);
  175. if (RyanMqttTrue == client->config.autoReconnectFlag)
  176. {
  177. return RyanMqttNoRescourceError;
  178. }
  179. result = platformThreadStart(client->config.userData, &client->mqttThread);
  180. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  181. return result;
  182. }
  183. /**
  184. * @brief 订阅主题
  185. *
  186. * @param client
  187. * @param topic
  188. * @param qos
  189. * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
  190. * @return RyanMqttError_e
  191. */
  192. RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
  193. RyanMqttSubscribeData_t subscribeManyData[])
  194. {
  195. RyanMqttError_e result = RyanMqttSuccessError;
  196. uint16_t packetId;
  197. RyanMqttMsgHandler_t *msgHandler;
  198. RyanMqttMsgHandler_t *msgToListHandler;
  199. RyanMqttAckHandler_t *userAckHandler;
  200. MQTTFixedBuffer_t fixedBuffer;
  201. // 校验参数合法性
  202. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  203. RyanMqttCheck(NULL != subscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
  204. RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  205. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  206. // 检查每个主题消息是否合法
  207. for (int32_t i = 0; i < count; i++)
  208. {
  209. RyanMqttCheck(NULL != subscribeManyData[i].topic && subscribeManyData[i].topicLen > 0,
  210. RyanMqttParamInvalidError, RyanMqttLog_d);
  211. RyanMqttCheck(RyanMqttQos0 <= subscribeManyData[i].qos && RyanMqttQos2 >= subscribeManyData[i].qos,
  212. RyanMqttParamInvalidError, RyanMqttLog_d);
  213. }
  214. // 申请 coreMqtt 支持的topic格式空间
  215. MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
  216. RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  217. for (int32_t i = 0; i < count; i++)
  218. {
  219. subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
  220. subscriptionList[i].pTopicFilter = subscribeManyData[i].topic;
  221. subscriptionList[i].topicFilterLength = subscribeManyData[i].topicLen;
  222. }
  223. // 序列化数据包
  224. {
  225. size_t remainingLength;
  226. // 获取数据包大小
  227. MQTTStatus_t status =
  228. MQTT_GetSubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
  229. RyanMqttAssert(MQTTSuccess == status);
  230. // 申请数据包的空间
  231. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  232. RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
  233. { platformMemoryFree(subscriptionList); });
  234. // 序列化数据包
  235. packetId = RyanMqttGetNextPacketId(client);
  236. status = MQTT_SerializeSubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
  237. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
  238. platformMemoryFree(subscriptionList);
  239. platformMemoryFree(fixedBuffer.pBuffer);
  240. });
  241. }
  242. // 创建每个msg主题的ack节点
  243. // ?mqtt空间收到服务器的suback时,会查找所有同名的然后删掉,所以这里不进行同名对比操作
  244. for (int32_t i = 0; i < count; i++)
  245. {
  246. // 创建msg包
  247. result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
  248. subscriptionList[i].topicFilterLength, packetId,
  249. (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
  250. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  251. { goto __RyanMqttSubCreateAckErrorExit; });
  252. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_SUBACK, packetId, 0, NULL, msgHandler,
  253. &userAckHandler, RyanMqttFalse);
  254. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  255. RyanMqttMsgHandlerDestroy(client, msgHandler);
  256. goto __RyanMqttSubCreateAckErrorExit;
  257. });
  258. // 此函数不会失败
  259. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  260. continue;
  261. __RyanMqttSubCreateAckErrorExit:
  262. // 创建 sub ack 数组时失败,查找所有同 packetId 的ack进行清除
  263. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
  264. platformMemoryFree(subscriptionList);
  265. platformMemoryFree(fixedBuffer.pBuffer);
  266. return RyanMqttNotEnoughMemError;
  267. }
  268. // ?创建msg包,3.8.4响应,允许服务端在发送 SUBACK 报文之前就开始发送与订阅匹配的 PUBLISH 报文。
  269. for (int32_t i = 0; i < count; i++)
  270. {
  271. result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
  272. subscriptionList[i].topicFilterLength, packetId,
  273. (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgToListHandler);
  274. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  275. // 将msg信息添加到订阅链表上
  276. RyanMqttMsgHandlerAddToMsgList(client, msgToListHandler);
  277. }
  278. // 发送订阅主题包
  279. // 如果发送失败就清除ack链表,创建ack链表必须在发送前
  280. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  281. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  282. __exit:
  283. // 失败清除session
  284. if (RyanMqttSuccessError != result)
  285. {
  286. // 清除ack链表
  287. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
  288. // 清除msg链表
  289. RyanMqttMsgHandler_t msgMatchCriteria;
  290. for (int32_t i = 0; i < count; i++)
  291. {
  292. msgMatchCriteria.topic = (char *)subscriptionList[i].pTopicFilter;
  293. msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
  294. msgMatchCriteria.packetId = packetId;
  295. RyanMqttMsgHandlerFindAndDestroyByPacketId(client, &msgMatchCriteria, RyanMqttFalse);
  296. }
  297. }
  298. platformMemoryFree(subscriptionList);
  299. platformMemoryFree(fixedBuffer.pBuffer);
  300. return result;
  301. }
  302. /**
  303. * @brief 订阅主题
  304. *
  305. * @param client
  306. * @param topic
  307. * @param qos
  308. * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
  309. * @return RyanMqttError_e
  310. */
  311. RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
  312. {
  313. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
  314. RyanMqttSubscribeData_t subscribeManyData = {.qos = qos, .topic = topic, .topicLen = RyanMqttStrlen(topic)};
  315. return RyanMqttSubscribeMany(client, 1, &subscribeManyData);
  316. }
  317. /**
  318. * @brief 取消订阅指定主题
  319. *
  320. * @param client
  321. * @param topic
  322. * @return RyanMqttError_e
  323. */
  324. RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
  325. RyanMqttUnSubscribeData_t unSubscribeManyData[])
  326. {
  327. RyanMqttError_e result = RyanMqttSuccessError;
  328. uint16_t packetId;
  329. RyanMqttMsgHandler_t *subMsgHandler;
  330. RyanMqttMsgHandler_t *msgHandler;
  331. RyanMqttAckHandler_t *userAckHandler;
  332. MQTTFixedBuffer_t fixedBuffer;
  333. // 校验参数合法性
  334. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  335. RyanMqttCheck(NULL != unSubscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
  336. RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  337. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  338. // 检查有效性
  339. for (int32_t i = 0; i < count; i++)
  340. {
  341. RyanMqttCheck(NULL != unSubscribeManyData[i].topic && unSubscribeManyData[i].topicLen > 0,
  342. RyanMqttParamInvalidError, RyanMqttLog_d);
  343. }
  344. // 申请 coreMqtt 支持的topic格式空间
  345. MQTTSubscribeInfo_t *unSubscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
  346. RyanMqttCheck(NULL != unSubscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  347. for (int32_t i = 0; i < count; i++)
  348. {
  349. unSubscriptionList[i].qos = (MQTTQoS_t)RyanMqttSubFail; // 无效数据,仅用作占位符
  350. unSubscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
  351. unSubscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
  352. }
  353. // 序列化数据包
  354. {
  355. size_t remainingLength;
  356. // 获取数据包大小
  357. MQTTStatus_t status =
  358. MQTT_GetUnsubscribePacketSize(unSubscriptionList, count, &remainingLength, &fixedBuffer.size);
  359. RyanMqttAssert(MQTTSuccess == status);
  360. // 申请数据包的空间
  361. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  362. RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
  363. { platformMemoryFree(unSubscriptionList); });
  364. // 序列化数据包
  365. packetId = RyanMqttGetNextPacketId(client);
  366. status = MQTT_SerializeUnsubscribe(unSubscriptionList, count, packetId, remainingLength, &fixedBuffer);
  367. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
  368. platformMemoryFree(unSubscriptionList);
  369. platformMemoryFree(fixedBuffer.pBuffer);
  370. });
  371. }
  372. // 创建ack
  373. for (int32_t i = 0; i < count; i++)
  374. {
  375. // ?不判断是否订阅,统一都发送取消
  376. RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)unSubscriptionList[i].pTopicFilter,
  377. .topicLen = unSubscriptionList[i].topicFilterLength};
  378. platformMutexLock(client->config.userData, &client->msgHandleLock);
  379. result =
  380. RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler, RyanMqttFalse);
  381. if (RyanMqttSuccessError == result)
  382. {
  383. // !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
  384. // 同步msg qos等级,之后unsub回调使用
  385. unSubscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
  386. }
  387. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  388. result = RyanMqttMsgHandlerCreate(client, unSubscriptionList[i].pTopicFilter,
  389. unSubscriptionList[i].topicFilterLength, packetId,
  390. (RyanMqttQos_e)unSubscriptionList[i].qos, NULL, &msgHandler);
  391. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  392. { goto __RyanMqttUnSubCreateAckErrorExit; });
  393. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, 0, NULL, msgHandler,
  394. &userAckHandler, RyanMqttFalse);
  395. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  396. RyanMqttMsgHandlerDestroy(client, msgHandler);
  397. goto __RyanMqttUnSubCreateAckErrorExit;
  398. });
  399. // 此函数不会失败
  400. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  401. continue;
  402. __RyanMqttUnSubCreateAckErrorExit:
  403. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
  404. platformMemoryFree(unSubscriptionList);
  405. platformMemoryFree(fixedBuffer.pBuffer);
  406. return RyanMqttNotEnoughMemError;
  407. }
  408. // 发送取消订阅主题包
  409. // 如果发送失败就清除ack链表,创建ack链表必须在发送前
  410. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  411. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  412. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
  413. platformMemoryFree(unSubscriptionList);
  414. platformMemoryFree(fixedBuffer.pBuffer);
  415. });
  416. platformMemoryFree(unSubscriptionList);
  417. platformMemoryFree(fixedBuffer.pBuffer);
  418. return result;
  419. }
  420. /**
  421. * @brief 取消订阅指定主题
  422. *
  423. * @param client
  424. * @param topic
  425. * @return RyanMqttError_e
  426. */
  427. RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
  428. {
  429. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
  430. RyanMqttUnSubscribeData_t subscribeManyData = {.topic = topic, .topicLen = RyanMqttStrlen(topic)};
  431. return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
  432. }
  433. RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen, char *payload,
  434. uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain,
  435. void *userData)
  436. {
  437. RyanMqttError_e result = RyanMqttSuccessError;
  438. uint16_t packetId;
  439. MQTTStatus_t status;
  440. MQTTFixedBuffer_t fixedBuffer;
  441. size_t remainingLength;
  442. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  443. RyanMqttCheck(NULL != topic && topicLen > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  444. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
  445. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
  446. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  447. // 报文支持有效载荷长度为0
  448. if (payloadLen > 0 && NULL == payload)
  449. {
  450. return RyanMqttParamInvalidError;
  451. }
  452. // 序列化pub发送包
  453. MQTTPublishInfo_t publishInfo = {
  454. .qos = (MQTTQoS_t)qos,
  455. .pTopicName = topic,
  456. .topicNameLength = topicLen,
  457. .pPayload = payload,
  458. .payloadLength = payloadLen,
  459. .retain = retain,
  460. .dup = 0,
  461. };
  462. // 获取数据包大小
  463. status = MQTT_GetPublishPacketSize(&publishInfo, &remainingLength, &fixedBuffer.size);
  464. RyanMqttAssert(MQTTSuccess == status);
  465. // 申请数据包的空间
  466. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  467. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  468. // qos0不需要 packetId
  469. if (RyanMqttQos0 == qos)
  470. {
  471. packetId = 0;
  472. }
  473. else
  474. {
  475. packetId = RyanMqttGetNextPacketId(client);
  476. }
  477. // 序列化数据包
  478. status = MQTT_SerializePublish(&publishInfo, packetId, remainingLength, &fixedBuffer);
  479. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
  480. { platformMemoryFree(fixedBuffer.pBuffer); });
  481. if (RyanMqttQos0 == qos)
  482. {
  483. // 发送报文
  484. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  485. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  486. { platformMemoryFree(fixedBuffer.pBuffer); });
  487. platformMemoryFree(fixedBuffer.pBuffer);
  488. }
  489. else
  490. {
  491. RyanMqttMsgHandler_t *msgHandler;
  492. RyanMqttAckHandler_t *userAckHandler;
  493. uint8_t packetType = (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC;
  494. // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
  495. result = RyanMqttMsgHandlerCreate(client, publishInfo.pTopicName, publishInfo.topicNameLength,
  496. RyanMqttMsgInvalidPacketId, qos, userData, &msgHandler);
  497. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  498. { platformMemoryFree(fixedBuffer.pBuffer); });
  499. result = RyanMqttAckHandlerCreate(client, packetType, packetId, fixedBuffer.size, fixedBuffer.pBuffer,
  500. msgHandler, &userAckHandler, RyanMqttTrue);
  501. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  502. platformMemoryFree(fixedBuffer.pBuffer);
  503. RyanMqttMsgHandlerDestroy(client, msgHandler);
  504. });
  505. // 一定要先加再send,send一定在mqtt broker回复前执行完,要不可能线程调度mqtt返回消息会比添加ack更快执行
  506. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  507. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  508. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_e, {
  509. RyanMqttLog_e("RyanMqttSendPacket failed, clear user ack session");
  510. // userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
  511. RyanMqttClearAckSession(client, packetType, packetId);
  512. });
  513. }
  514. return result;
  515. }
  516. /**
  517. * @brief 客户端向服务端发送消息
  518. *
  519. * @param client
  520. * @param topic
  521. * @param payload
  522. * @param payloadLen
  523. * @param QOS
  524. * @param retain
  525. * @return RyanMqttError_e
  526. */
  527. RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
  528. RyanMqttQos_e qos, RyanMqttBool_e retain)
  529. {
  530. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
  531. return RyanMqttPublishWithUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain,
  532. NULL);
  533. }
  534. /**
  535. * @brief 获取已订阅主题
  536. * !此函数是非线程安全的,已不推荐使用
  537. * !请使用 RyanMqttGetSubscribeSafe 代替
  538. * !如果另一个线程在这个调用返回后立即取消订阅,topic将指向非法内存
  539. *
  540. * @param client
  541. * @param msgHandles 存放已订阅主题的空间
  542. * @param msgHandleSize 存放已订阅主题的空间大小个数
  543. * @param subscribeNum 函数内部返回已订阅主题的个数
  544. * @return RyanMqttState_e
  545. */
  546. RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize,
  547. int32_t *subscribeNum)
  548. {
  549. RyanMqttError_e result = RyanMqttSuccessError;
  550. RyanMqttList_t *curr, *next;
  551. RyanMqttMsgHandler_t *msgHandler;
  552. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  553. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
  554. RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
  555. RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, RyanMqttLog_d);
  556. *subscribeNum = 0;
  557. platformMutexLock(client->config.userData, &client->msgHandleLock);
  558. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  559. {
  560. if (*subscribeNum >= msgHandleSize)
  561. {
  562. result = RyanMqttNoRescourceError;
  563. break;
  564. }
  565. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  566. msgHandles[*subscribeNum].topic = msgHandler->topic;
  567. msgHandles[*subscribeNum].qos = msgHandler->qos;
  568. (*subscribeNum)++;
  569. }
  570. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  571. return result;
  572. }
  573. /**
  574. * @brief 安全的获取已订阅主题列表,仅可通过 RyanMqttSafeFreeSubscribeResources 进行安全释放。
  575. *
  576. * @param client
  577. * @param msgHandles
  578. * @param subscribeNum
  579. * @return RyanMqttError_e
  580. */
  581. RyanMqttError_e RyanMqttGetSubscribeSafe(RyanMqttClient_t *client, RyanMqttMsgHandler_t **msgHandles,
  582. int32_t *subscribeNum)
  583. {
  584. RyanMqttError_e result = RyanMqttSuccessError;
  585. RyanMqttList_t *curr, *next;
  586. RyanMqttMsgHandler_t *msgHandler;
  587. int32_t subscribeTotal;
  588. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  589. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
  590. RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
  591. RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  592. if (0 == subscribeTotal)
  593. {
  594. *msgHandles = NULL;
  595. *subscribeNum = 0;
  596. return RyanMqttSuccessError;
  597. }
  598. RyanMqttMsgHandler_t *msgHandlerArr = platformMemoryMalloc(sizeof(RyanMqttMsgHandler_t) * subscribeTotal);
  599. if (NULL == msgHandlerArr)
  600. {
  601. result = RyanMqttNotEnoughMemError;
  602. goto __exit;
  603. }
  604. int32_t subscribeCount = 0;
  605. platformMutexLock(client->config.userData, &client->msgHandleLock);
  606. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  607. {
  608. if (subscribeCount >= subscribeTotal)
  609. {
  610. break;
  611. }
  612. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  613. RyanMqttMemcpy(&msgHandlerArr[subscribeCount], msgHandler, sizeof(RyanMqttMsgHandler_t));
  614. result = RyanMqttDupString(&msgHandlerArr[subscribeCount].topic, msgHandler->topic,
  615. msgHandler->topicLen);
  616. if (RyanMqttSuccessError != result)
  617. {
  618. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  619. RyanMqttSafeFreeSubscribeResources(msgHandlerArr, subscribeCount);
  620. result = RyanMqttNotEnoughMemError;
  621. goto __exit;
  622. }
  623. subscribeCount++;
  624. }
  625. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  626. *msgHandles = msgHandlerArr;
  627. *subscribeNum = subscribeCount;
  628. __exit:
  629. return result;
  630. }
  631. /**
  632. * @brief 安全释放订阅主题列表(禁止直接调用free函数)
  633. *
  634. * @param msgHandles
  635. * @param subscribeNum
  636. * @return RyanMqttError_e
  637. */
  638. RyanMqttError_e RyanMqttSafeFreeSubscribeResources(RyanMqttMsgHandler_t *msgHandles, int32_t subscribeNum)
  639. {
  640. RyanMqttError_e result = RyanMqttSuccessError;
  641. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
  642. // RyanMqttGetSubscribeTotalCount 内部调用的时候可以会等于0
  643. RyanMqttCheck(subscribeNum >= 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  644. for (int32_t i = 0; i < subscribeNum; i++)
  645. {
  646. // 不加null判断,因为如果是空,一定是用户程序内存访问越界了
  647. platformMemoryFree(msgHandles[i].topic);
  648. }
  649. platformMemoryFree(msgHandles);
  650. return result;
  651. }
  652. /**
  653. * @brief 获取已订阅主题个数
  654. *
  655. * @param client
  656. * @param subscribeTotalCount
  657. * @return RyanMqttError_e
  658. */
  659. RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t *subscribeTotalCount)
  660. {
  661. RyanMqttList_t *curr, *next;
  662. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  663. RyanMqttCheck(NULL != subscribeTotalCount, RyanMqttParamInvalidError, RyanMqttLog_d);
  664. *subscribeTotalCount = 0;
  665. platformMutexLock(client->config.userData, &client->msgHandleLock);
  666. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  667. {
  668. (*subscribeTotalCount)++;
  669. }
  670. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  671. return RyanMqttSuccessError;
  672. }
  673. /**
  674. * @brief 获取mqtt客户端状态
  675. *
  676. * @param client
  677. * @return RyanMqttState_e
  678. */
  679. RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
  680. {
  681. if (NULL == client)
  682. {
  683. return RyanMqttInvalidState;
  684. }
  685. return RyanMqttGetClientState(client);
  686. }
  687. /**
  688. * @brief 获取 keepalive 剩余时间
  689. *
  690. * @param client
  691. * @param keepAliveRemain
  692. * @return RyanMqttError_e
  693. */
  694. RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain)
  695. {
  696. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  697. RyanMqttCheck(NULL != keepAliveRemain, RyanMqttParamInvalidError, RyanMqttLog_d);
  698. platformCriticalEnter(client->config.userData, &client->criticalLock);
  699. *keepAliveRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
  700. platformCriticalExit(client->config.userData, &client->criticalLock);
  701. return RyanMqttSuccessError;
  702. }
  703. static RyanMqttError_e RyanMqttClientConfigDeepCopy(RyanMqttClientConfig_t *destConfig,
  704. RyanMqttClientConfig_t *srcConfig)
  705. {
  706. RyanMqttError_e result = RyanMqttSuccessError;
  707. RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
  708. RyanMqttMemcpy(destConfig, srcConfig, sizeof(RyanMqttClientConfig_t));
  709. destConfig->clientId = NULL;
  710. destConfig->userName = NULL;
  711. destConfig->password = NULL;
  712. destConfig->host = NULL;
  713. destConfig->taskName = NULL;
  714. result = RyanMqttDupString(&destConfig->clientId, srcConfig->clientId, RyanMqttStrlen(srcConfig->clientId));
  715. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  716. if (NULL != srcConfig->userName)
  717. {
  718. result = RyanMqttDupString(&destConfig->userName, srcConfig->userName,
  719. RyanMqttStrlen(srcConfig->userName));
  720. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  721. }
  722. if (NULL != srcConfig->password)
  723. {
  724. result = RyanMqttDupString(&destConfig->password, srcConfig->password,
  725. RyanMqttStrlen(srcConfig->password));
  726. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  727. }
  728. result = RyanMqttDupString(&destConfig->host, srcConfig->host, RyanMqttStrlen(srcConfig->host));
  729. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  730. result = RyanMqttDupString(&destConfig->taskName, srcConfig->taskName, RyanMqttStrlen(srcConfig->taskName));
  731. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  732. __exit:
  733. if (RyanMqttSuccessError != result)
  734. {
  735. RyanMqttPurgeConfig(destConfig);
  736. }
  737. return result;
  738. }
  739. /**
  740. * @brief 获取mqtt config
  741. * !非线程安全,多线程通过set和get数据可能会错乱甚至崩溃。
  742. * !使用完毕后,需要用户手动调用 RyanMqttFreeConfigFromGet 释放指针空间
  743. *
  744. * @param client
  745. * @param pclientConfig
  746. * @return RyanMqttError_e
  747. */
  748. RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
  749. {
  750. RyanMqttError_e result = RyanMqttSuccessError;
  751. RyanMqttClientConfig_t *clientConfig;
  752. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  753. RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
  754. RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  755. clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
  756. RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  757. result = RyanMqttClientConfigDeepCopy(clientConfig, &client->config);
  758. RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  759. if (RyanMqttSuccessError == result)
  760. {
  761. *pclientConfig = clientConfig;
  762. }
  763. else
  764. {
  765. *pclientConfig = NULL;
  766. platformMemoryFree(clientConfig);
  767. }
  768. return result;
  769. }
  770. /**
  771. * @brief 释放通过 RyanMqttGetConfig 获取的配置信息 (禁止直接调用free函数)
  772. *
  773. * @param clientConfig
  774. * @return RyanMqttError_e
  775. */
  776. RyanMqttError_e RyanMqttFreeConfigFromGet(RyanMqttClientConfig_t *clientConfig)
  777. {
  778. RyanMqttError_e result = RyanMqttSuccessError;
  779. RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
  780. RyanMqttPurgeConfig(clientConfig);
  781. platformMemoryFree(clientConfig);
  782. return result;
  783. }
  784. // todo 增加更多校验,比如判断心跳包和recv的关系
  785. /**
  786. * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt
  787. * thread线程和用户线程的资源互斥
  788. *
  789. * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt
  790. * thread处于挂起状态时调用 mqtt thread处于阻塞状态时调用此函数也是很危险的行为
  791. * 要保证mqtt线程和用户线程的资源互斥
  792. * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
  793. *
  794. * !项目中用户不应频繁调用此函数
  795. *
  796. * 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
  797. *
  798. * @param client
  799. * @param clientConfig
  800. * @return RyanMqttError_e
  801. */
  802. RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
  803. {
  804. RyanMqttError_e result = RyanMqttSuccessError;
  805. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  806. RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
  807. RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  808. RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
  809. RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
  810. RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, RyanMqttLog_d);
  811. RyanMqttCheck(clientConfig->recvTimeout <= (uint32_t)clientConfig->keepaliveTimeoutS * 1000 / 2,
  812. RyanMqttParamInvalidError, RyanMqttLog_d);
  813. RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
  814. RyanMqttClientConfig_t tempConfig;
  815. result = RyanMqttClientConfigDeepCopy(&tempConfig, clientConfig);
  816. RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  817. if (RyanMqttSuccessError == result)
  818. {
  819. // todo !因为这里是非线程安全的
  820. RyanMqttPurgeConfig(&client->config);
  821. client->config = tempConfig;
  822. }
  823. return result;
  824. }
  825. /**
  826. * @brief 设置遗嘱的配置信息
  827. * 此函数必须在发送connect报文前调用,因为遗嘱消息包含在connect报文中
  828. *
  829. * @param client
  830. * @param topicName
  831. * @param qos
  832. * @param retain
  833. * @param payload
  834. * @param payloadLen
  835. * @return RyanMqttError_e
  836. */
  837. RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
  838. RyanMqttQos_e qos, RyanMqttBool_e retain)
  839. {
  840. RyanMqttError_e result = RyanMqttSuccessError;
  841. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  842. RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, RyanMqttLog_d);
  843. RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  844. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
  845. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
  846. // 报文支持有效载荷长度为0
  847. if (NULL == payload && payloadLen > 0)
  848. {
  849. return RyanMqttParamInvalidError;
  850. }
  851. platformMutexLock(client->config.userData, &client->userSessionLock);
  852. // 之前如果设置过遗嘱就进行资源释放,否则申请空间
  853. if (NULL == client->lwtOptions)
  854. {
  855. client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t));
  856. RyanMqttCheckCodeNoReturn(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
  857. result = RyanMqttNotEnoughMemError;
  858. goto __exit;
  859. });
  860. }
  861. else
  862. {
  863. if (NULL != client->lwtOptions->topic)
  864. {
  865. platformMemoryFree(client->lwtOptions->topic);
  866. }
  867. if (NULL != client->lwtOptions->payload)
  868. {
  869. platformMemoryFree(client->lwtOptions->payload);
  870. }
  871. }
  872. RyanMqttMemset(client->lwtOptions, 0, sizeof(lwtOptions_t));
  873. if (payloadLen > 0)
  874. {
  875. result = RyanMqttDupString(&client->lwtOptions->payload, payload, payloadLen);
  876. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  877. }
  878. else
  879. {
  880. client->lwtOptions->payload = NULL;
  881. }
  882. result = RyanMqttDupString(&client->lwtOptions->topic, topicName, RyanMqttStrlen(topicName));
  883. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  884. client->lwtOptions->lwtFlag = RyanMqttTrue;
  885. client->lwtOptions->qos = qos;
  886. client->lwtOptions->retain = retain;
  887. client->lwtOptions->payloadLen = payloadLen;
  888. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  889. return RyanMqttSuccessError;
  890. __exit:
  891. if (NULL != client->lwtOptions)
  892. {
  893. if (NULL != client->lwtOptions->topic)
  894. {
  895. platformMemoryFree(client->lwtOptions->topic);
  896. }
  897. if (NULL != client->lwtOptions->payload)
  898. {
  899. platformMemoryFree(client->lwtOptions->payload);
  900. }
  901. platformMemoryFree(client->lwtOptions);
  902. client->lwtOptions = NULL;
  903. }
  904. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  905. return result;
  906. }
  907. /**
  908. * @brief 丢弃指定ack
  909. *
  910. * @param client
  911. * @param ackHandler
  912. * @return RyanMqttError_e
  913. */
  914. RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId)
  915. {
  916. RyanMqttError_e result = RyanMqttSuccessError;
  917. RyanMqttAckHandler_t *ackHandler;
  918. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  919. RyanMqttCheck(0 < packetId && packetId <= RyanMqttMaxPacketId, RyanMqttParamInvalidError, RyanMqttLog_d);
  920. // 删除pubrel记录
  921. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
  922. if (RyanMqttSuccessError == result)
  923. {
  924. RyanMqttEventMachine(client, RyanMqttEventAckHandlerDiscard, (void *)ackHandler); // 回调函数
  925. RyanMqttAckHandlerDestroy(client, ackHandler);
  926. }
  927. return result;
  928. }
  929. RyanMqttError_e RyanMqttGetEventId(RyanMqttClient_t *client, RyanMqttEventId_e *eventId)
  930. {
  931. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  932. RyanMqttCheck(NULL != eventId, RyanMqttParamInvalidError, RyanMqttLog_d);
  933. platformCriticalEnter(client->config.userData, &client->criticalLock);
  934. *eventId = client->eventFlag;
  935. platformCriticalExit(client->config.userData, &client->criticalLock);
  936. return RyanMqttSuccessError;
  937. }
  938. RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  939. {
  940. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  941. platformCriticalEnter(client->config.userData, &client->criticalLock);
  942. client->eventFlag |= eventId;
  943. platformCriticalExit(client->config.userData, &client->criticalLock);
  944. return RyanMqttSuccessError;
  945. }
  946. RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  947. {
  948. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  949. platformCriticalEnter(client->config.userData, &client->criticalLock);
  950. client->eventFlag &= ~eventId;
  951. platformCriticalExit(client->config.userData, &client->criticalLock);
  952. return RyanMqttSuccessError;
  953. }