RyanMqttClient.c 41 KB


  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelError) // 日志打印等级
  3. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  4. #include "RyanMqttClient.h"
  5. #include "RyanMqttThread.h"
  6. #include "RyanMqttUtil.h"
  7. #include "core_mqtt_serializer.h"
  8. /**
  9. * @brief mqtt初始化
  10. *
  11. * @param clientConfig
  12. * @param pClient mqtt客户端指针
  13. * @return RyanMqttError_e
  14. */
  15. RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
  16. {
  17. RyanMqttError_e result = RyanMqttSuccessError;
  18. RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, RyanMqttLog_d);
  19. RyanMqttClient_t *client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
  20. RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  21. RyanMqttMemset(client, 0, sizeof(RyanMqttClient_t));
  22. client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
  23. client->clientState = RyanMqttInitState;
  24. client->eventFlag = 0;
  25. client->ackHandlerCount = 0;
  26. RyanMqttBool_e criticalLockIsOk = RyanMqttFalse;
  27. RyanMqttBool_e sendLockIsOk = RyanMqttFalse;
  28. RyanMqttBool_e msgHandleLockIsOk = RyanMqttFalse;
  29. RyanMqttBool_e ackHandleLockIsOk = RyanMqttFalse;
  30. RyanMqttBool_e userSessionLockIsOk = RyanMqttFalse;
  31. RyanMqttBool_e networkIsOk = RyanMqttFalse;
  32. result = platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
  33. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  34. criticalLockIsOk = RyanMqttTrue;
  35. result = platformMutexInit(client->config.userData, &client->sendLock); // 初始化发送缓冲区互斥锁
  36. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  37. sendLockIsOk = RyanMqttTrue;
  38. result = platformMutexInit(client->config.userData, &client->msgHandleLock);
  39. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  40. msgHandleLockIsOk = RyanMqttTrue;
  41. result = platformMutexInit(client->config.userData, &client->ackHandleLock);
  42. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  43. ackHandleLockIsOk = RyanMqttTrue;
  44. result = platformMutexInit(client->config.userData, &client->userSessionLock);
  45. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  46. userSessionLockIsOk = RyanMqttTrue;
  47. result = platformNetworkInit(client->config.userData, &client->network); // 网络接口初始化
  48. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  49. // networkIsOk = RyanMqttTrue;
  50. RyanMqttListInit(&client->msgHandlerList);
  51. RyanMqttListInit(&client->ackHandlerList);
  52. RyanMqttListInit(&client->userAckHandlerList);
  53. RyanMqttSetClientState(client, RyanMqttInitState);
  54. *pClient = client;
  55. return RyanMqttSuccessError;
  56. __exit:
  57. // 不能按空来判断,不是指针类型
  58. if (criticalLockIsOk)
  59. {
  60. platformCriticalDestroy(client->config.userData, &client->criticalLock);
  61. }
  62. if (sendLockIsOk)
  63. {
  64. platformMutexDestroy(client->config.userData, &client->sendLock);
  65. }
  66. if (msgHandleLockIsOk)
  67. {
  68. platformMutexDestroy(client->config.userData, &client->msgHandleLock);
  69. }
  70. if (ackHandleLockIsOk)
  71. {
  72. platformMutexDestroy(client->config.userData, &client->ackHandleLock);
  73. }
  74. if (userSessionLockIsOk)
  75. {
  76. platformMutexDestroy(client->config.userData, &client->userSessionLock);
  77. }
  78. if (networkIsOk)
  79. {
  80. platformNetworkClose(client->config.userData, &client->network);
  81. platformNetworkDestroy(client->config.userData, &client->network);
  82. }
  83. platformMemoryFree(client);
  84. return result;
  85. }
  86. /**
  87. * @brief 销毁mqtt客户端
  88. * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
  89. * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
  90. * !mqtt删除自己前会调用 RyanMqttEventDestroyBefore 事件回调
  91. * !调用此函数后就不应该再对该客户端进行任何操作
  92. * ?这里用信号量通知mqtt线程是最好的,但是为了简化platform层,这里用标志位代替信号量
  93. * @param client
  94. * @return RyanMqttError_e
  95. */
  96. RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
  97. {
  98. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  99. if (RyanMqttInitState == RyanMqttGetClientState(client))
  100. {
  101. RyanMqttPurgeClient(client);
  102. platformMemoryFree(client);
  103. }
  104. else
  105. {
  106. platformCriticalEnter(client->config.userData, &client->criticalLock);
  107. client->destroyFlag = RyanMqttTrue;
  108. platformCriticalExit(client->config.userData, &client->criticalLock);
  109. }
  110. return RyanMqttSuccessError;
  111. }
  112. /**
  113. * @brief 启动mqtt客户端
  114. * !不要重复调用
  115. *
  116. * @param client
  117. * @return RyanMqttError_e
  118. */
  119. RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
  120. {
  121. RyanMqttError_e result = RyanMqttSuccessError;
  122. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  123. RyanMqttCheck(RyanMqttInitState == RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  124. RyanMqttSetClientState(client, RyanMqttStartState);
  125. // 连接成功,需要初始化 MQTT 线程
  126. result = platformThreadInit(client->config.userData, &client->mqttThread, client->config.taskName,
  127. RyanMqttThread, client, client->config.taskStack, client->config.taskPrio);
  128. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNoRescourceError, RyanMqttLog_d,
  129. { RyanMqttSetClientState(client, RyanMqttInitState); });
  130. return result;
  131. }
  132. /**
  133. * @brief 断开mqtt服务器连接
  134. *
  135. * @param client
  136. * @param sendDiscFlag
  137. * RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
  138. * @return RyanMqttError_e
  139. */
  140. RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
  141. {
  142. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  143. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  144. if (RyanMqttTrue == sendDiscFlag)
  145. {
  146. MQTTStatus_t status;
  147. MQTTFixedBuffer_t fixedBuffer;
  148. // 获取断开连接的数据包大小
  149. status = MQTT_GetDisconnectPacketSize(&fixedBuffer.size);
  150. RyanMqttAssert(MQTTSuccess == status);
  151. // 申请断开连接数据包的空间
  152. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  153. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  154. // 序列化断开连接数据包
  155. status = MQTT_SerializeDisconnect(&fixedBuffer);
  156. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
  157. { platformMemoryFree(fixedBuffer.pBuffer); });
  158. // 发送断开连接数据包
  159. RyanMqttError_e result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  160. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  161. { platformMemoryFree(fixedBuffer.pBuffer); });
  162. platformMemoryFree(fixedBuffer.pBuffer);
  163. }
  164. RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
  165. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  166. return RyanMqttSuccessError;
  167. }
  168. // todo 这里考虑要不要使用信号量来实现吧,会增加platform厚度,目前不想加。最好用自动重连
  169. // ?现在取巧使用线程的暂停和恢复实现,如果mqtt线程还没有暂停,用户就调用这个函数就会没有效果。
  170. // ?用户不用自动重连的话,也可以通过一直判断 client 的状态是不是为 RyanMqttDisconnectState 是的话可以调用
  171. // ?RyanMqttReconnect。 最推荐的是用自动重连
  172. /**
  173. * @brief 手动重连mqtt客户端
  174. * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
  175. *
  176. * @param client
  177. * @return RyanMqttError_e
  178. */
  179. RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
  180. {
  181. RyanMqttError_e result = RyanMqttSuccessError;
  182. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  183. RyanMqttCheck(RyanMqttDisconnectState == RyanMqttGetClientState(client), RyanMqttConnectError, RyanMqttLog_d);
  184. if (RyanMqttTrue == client->config.autoReconnectFlag)
  185. {
  186. return RyanMqttNoRescourceError;
  187. }
  188. result = platformThreadStart(client->config.userData, &client->mqttThread);
  189. RyanMqttCheck(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  190. return result;
  191. }
  192. /**
  193. * @brief 订阅主题
  194. *
  195. * @param client
  196. * @param topic
  197. * @param qos
  198. * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
  199. * @return RyanMqttError_e
  200. */
  201. RyanMqttError_e RyanMqttSubscribeMany(RyanMqttClient_t *client, int32_t count,
  202. RyanMqttSubscribeData_t subscribeManyData[])
  203. {
  204. RyanMqttError_e result = RyanMqttSuccessError;
  205. uint16_t packetId;
  206. RyanMqttMsgHandler_t *msgHandler;
  207. RyanMqttMsgHandler_t *msgToListHandler;
  208. RyanMqttAckHandler_t *userAckHandler;
  209. MQTTFixedBuffer_t fixedBuffer;
  210. // 校验参数合法性
  211. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  212. RyanMqttCheck(NULL != subscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
  213. RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  214. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  215. // 检查每个主题消息是否合法
  216. for (int32_t i = 0; i < count; i++)
  217. {
  218. RyanMqttCheck(NULL != subscribeManyData[i].topic && subscribeManyData[i].topicLen > 0,
  219. RyanMqttParamInvalidError, RyanMqttLog_d);
  220. RyanMqttCheck(RyanMqttQos0 <= subscribeManyData[i].qos && RyanMqttQos2 >= subscribeManyData[i].qos,
  221. RyanMqttParamInvalidError, RyanMqttLog_d);
  222. }
  223. // 申请 coreMqtt 支持的topic格式空间
  224. MQTTSubscribeInfo_t *subscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
  225. RyanMqttCheck(NULL != subscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  226. for (int32_t i = 0; i < count; i++)
  227. {
  228. subscriptionList[i].qos = (MQTTQoS_t)subscribeManyData[i].qos;
  229. subscriptionList[i].pTopicFilter = subscribeManyData[i].topic;
  230. subscriptionList[i].topicFilterLength = subscribeManyData[i].topicLen;
  231. }
  232. // 序列化数据包
  233. {
  234. size_t remainingLength;
  235. // 获取数据包大小
  236. MQTTStatus_t status =
  237. MQTT_GetSubscribePacketSize(subscriptionList, count, &remainingLength, &fixedBuffer.size);
  238. RyanMqttAssert(MQTTSuccess == status);
  239. // 申请数据包的空间
  240. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  241. RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
  242. { platformMemoryFree(subscriptionList); });
  243. // 序列化数据包
  244. packetId = RyanMqttGetNextPacketId(client);
  245. status = MQTT_SerializeSubscribe(subscriptionList, count, packetId, remainingLength, &fixedBuffer);
  246. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
  247. platformMemoryFree(subscriptionList);
  248. platformMemoryFree(fixedBuffer.pBuffer);
  249. });
  250. }
  251. // 创建每个msg主题的ack节点
  252. // ?mqtt空间收到服务器的suback时,会查找所有同名的然后删掉,所以这里不进行同名对比操作
  253. for (int32_t i = 0; i < count; i++)
  254. {
  255. // 创建msg包
  256. result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
  257. subscriptionList[i].topicFilterLength, packetId,
  258. (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgHandler);
  259. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  260. { goto __RyanMqttSubCreateAckErrorExit; });
  261. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_SUBACK, packetId, 0, NULL, msgHandler,
  262. &userAckHandler, RyanMqttFalse);
  263. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  264. RyanMqttMsgHandlerDestroy(client, msgHandler);
  265. goto __RyanMqttSubCreateAckErrorExit;
  266. });
  267. // 此函数不会失败
  268. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  269. continue;
  270. __RyanMqttSubCreateAckErrorExit:
  271. // 创建 sub ack 数组时失败,查找所有同 packetId 的ack进行清除
  272. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
  273. platformMemoryFree(subscriptionList);
  274. platformMemoryFree(fixedBuffer.pBuffer);
  275. return RyanMqttNotEnoughMemError;
  276. }
  277. // ?创建msg包,3.8.4响应,允许服务端在发送 SUBACK 报文之前就开始发送与订阅匹配的 PUBLISH 报文。
  278. for (int32_t i = 0; i < count; i++)
  279. {
  280. result = RyanMqttMsgHandlerCreate(client, subscriptionList[i].pTopicFilter,
  281. subscriptionList[i].topicFilterLength, packetId,
  282. (RyanMqttQos_e)subscriptionList[i].qos, NULL, &msgToListHandler);
  283. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  284. // 将msg信息添加到订阅链表上
  285. RyanMqttMsgHandlerAddToMsgList(client, msgToListHandler);
  286. }
  287. // 发送订阅主题包
  288. // 如果发送失败就清除ack链表,创建ack链表必须在发送前
  289. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  290. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  291. __exit:
  292. // 失败清除session
  293. if (RyanMqttSuccessError != result)
  294. {
  295. // 清除ack链表
  296. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_SUBACK, packetId);
  297. // 清除msg链表
  298. RyanMqttMsgHandler_t msgMatchCriteria;
  299. for (int32_t i = 0; i < count; i++)
  300. {
  301. msgMatchCriteria.topic = (char *)subscriptionList[i].pTopicFilter;
  302. msgMatchCriteria.topicLen = subscriptionList[i].topicFilterLength;
  303. msgMatchCriteria.packetId = packetId;
  304. RyanMqttMsgHandlerFindAndDestroyByPacketId(client, &msgMatchCriteria, RyanMqttFalse);
  305. }
  306. }
  307. platformMemoryFree(subscriptionList);
  308. platformMemoryFree(fixedBuffer.pBuffer);
  309. return result;
  310. }
  311. /**
  312. * @brief 订阅主题
  313. *
  314. * @param client
  315. * @param topic
  316. * @param qos
  317. * 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
  318. * @return RyanMqttError_e
  319. */
  320. RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
  321. {
  322. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
  323. RyanMqttSubscribeData_t subscribeManyData = {.qos = qos, .topic = topic, .topicLen = RyanMqttStrlen(topic)};
  324. return RyanMqttSubscribeMany(client, 1, &subscribeManyData);
  325. }
  326. /**
  327. * @brief 取消订阅指定主题
  328. *
  329. * @param client
  330. * @param topic
  331. * @return RyanMqttError_e
  332. */
  333. RyanMqttError_e RyanMqttUnSubscribeMany(RyanMqttClient_t *client, int32_t count,
  334. RyanMqttUnSubscribeData_t unSubscribeManyData[])
  335. {
  336. RyanMqttError_e result = RyanMqttSuccessError;
  337. uint16_t packetId;
  338. RyanMqttMsgHandler_t *subMsgHandler;
  339. RyanMqttMsgHandler_t *msgHandler;
  340. RyanMqttAckHandler_t *userAckHandler;
  341. MQTTFixedBuffer_t fixedBuffer;
  342. // 校验参数合法性
  343. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  344. RyanMqttCheck(NULL != unSubscribeManyData, RyanMqttParamInvalidError, RyanMqttLog_d);
  345. RyanMqttCheck(count > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  346. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  347. // 检查有效性
  348. for (int32_t i = 0; i < count; i++)
  349. {
  350. RyanMqttCheck(NULL != unSubscribeManyData[i].topic && unSubscribeManyData[i].topicLen > 0,
  351. RyanMqttParamInvalidError, RyanMqttLog_d);
  352. }
  353. // 申请 coreMqtt 支持的topic格式空间
  354. MQTTSubscribeInfo_t *unSubscriptionList = platformMemoryMalloc(sizeof(MQTTSubscribeInfo_t) * count);
  355. RyanMqttCheck(NULL != unSubscriptionList, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  356. for (int32_t i = 0; i < count; i++)
  357. {
  358. unSubscriptionList[i].qos = (MQTTQoS_t)RyanMqttSubFail; // 无效数据,仅用作占位符
  359. unSubscriptionList[i].pTopicFilter = unSubscribeManyData[i].topic;
  360. unSubscriptionList[i].topicFilterLength = unSubscribeManyData[i].topicLen;
  361. }
  362. // 序列化数据包
  363. {
  364. size_t remainingLength;
  365. // 获取数据包大小
  366. MQTTStatus_t status =
  367. MQTT_GetUnsubscribePacketSize(unSubscriptionList, count, &remainingLength, &fixedBuffer.size);
  368. RyanMqttAssert(MQTTSuccess == status);
  369. // 申请数据包的空间
  370. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  371. RyanMqttCheckCode(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d,
  372. { platformMemoryFree(unSubscriptionList); });
  373. // 序列化数据包
  374. packetId = RyanMqttGetNextPacketId(client);
  375. status = MQTT_SerializeUnsubscribe(unSubscriptionList, count, packetId, remainingLength, &fixedBuffer);
  376. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d, {
  377. platformMemoryFree(unSubscriptionList);
  378. platformMemoryFree(fixedBuffer.pBuffer);
  379. });
  380. }
  381. // 创建ack
  382. for (int32_t i = 0; i < count; i++)
  383. {
  384. // ?不判断是否订阅,统一都发送取消
  385. RyanMqttMsgHandler_t msgMatchCriteria = {.topic = (char *)unSubscriptionList[i].pTopicFilter,
  386. .topicLen = unSubscriptionList[i].topicFilterLength};
  387. platformMutexLock(client->config.userData, &client->msgHandleLock);
  388. result =
  389. RyanMqttMsgHandlerFind(client, &msgMatchCriteria, RyanMqttFalse, &subMsgHandler, RyanMqttFalse);
  390. if (RyanMqttSuccessError == result)
  391. {
  392. // !有线程安全问题,subMsgHandler是指针,但用户层只要不是特别的混乱重复取消订阅这里应该就问题,暂时不管成本太高
  393. // 同步msg qos等级,之后unsub回调使用
  394. unSubscriptionList[i].qos = (MQTTQoS_t)subMsgHandler->qos;
  395. }
  396. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  397. result = RyanMqttMsgHandlerCreate(client, unSubscriptionList[i].pTopicFilter,
  398. unSubscriptionList[i].topicFilterLength, packetId,
  399. (RyanMqttQos_e)unSubscriptionList[i].qos, NULL, &msgHandler);
  400. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  401. { goto __RyanMqttUnSubCreateAckErrorExit; });
  402. result = RyanMqttAckHandlerCreate(client, MQTT_PACKET_TYPE_UNSUBACK, packetId, 0, NULL, msgHandler,
  403. &userAckHandler, RyanMqttFalse);
  404. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  405. RyanMqttMsgHandlerDestroy(client, msgHandler);
  406. goto __RyanMqttUnSubCreateAckErrorExit;
  407. });
  408. // 此函数不会失败
  409. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  410. continue;
  411. __RyanMqttUnSubCreateAckErrorExit:
  412. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
  413. platformMemoryFree(unSubscriptionList);
  414. platformMemoryFree(fixedBuffer.pBuffer);
  415. return RyanMqttNotEnoughMemError;
  416. }
  417. // 发送取消订阅主题包
  418. // 如果发送失败就清除ack链表,创建ack链表必须在发送前
  419. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  420. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  421. RyanMqttClearAckSession(client, MQTT_PACKET_TYPE_UNSUBACK, packetId);
  422. platformMemoryFree(unSubscriptionList);
  423. platformMemoryFree(fixedBuffer.pBuffer);
  424. });
  425. platformMemoryFree(unSubscriptionList);
  426. platformMemoryFree(fixedBuffer.pBuffer);
  427. return result;
  428. }
  429. /**
  430. * @brief 取消订阅指定主题
  431. *
  432. * @param client
  433. * @param topic
  434. * @return RyanMqttError_e
  435. */
  436. RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
  437. {
  438. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
  439. RyanMqttUnSubscribeData_t subscribeManyData = {.topic = topic, .topicLen = RyanMqttStrlen(topic)};
  440. return RyanMqttUnSubscribeMany(client, 1, &subscribeManyData);
  441. }
  442. RyanMqttError_e RyanMqttPublishWithUserData(RyanMqttClient_t *client, char *topic, uint16_t topicLen, char *payload,
  443. uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain,
  444. void *userData)
  445. {
  446. RyanMqttError_e result = RyanMqttSuccessError;
  447. uint16_t packetId;
  448. MQTTStatus_t status;
  449. MQTTFixedBuffer_t fixedBuffer;
  450. size_t remainingLength;
  451. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  452. RyanMqttCheck(NULL != topic && topicLen > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  453. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
  454. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
  455. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, RyanMqttLog_d);
  456. // 报文支持有效载荷长度为0
  457. if (payloadLen > 0 && NULL == payload)
  458. {
  459. return RyanMqttParamInvalidError;
  460. }
  461. // 序列化pub发送包
  462. MQTTPublishInfo_t publishInfo = {
  463. .qos = (MQTTQoS_t)qos,
  464. .pTopicName = topic,
  465. .topicNameLength = topicLen,
  466. .pPayload = payload,
  467. .payloadLength = payloadLen,
  468. .retain = retain,
  469. .dup = 0,
  470. };
  471. // 获取数据包大小
  472. status = MQTT_GetPublishPacketSize(&publishInfo, &remainingLength, &fixedBuffer.size);
  473. RyanMqttAssert(MQTTSuccess == status);
  474. // 申请数据包的空间
  475. fixedBuffer.pBuffer = platformMemoryMalloc(fixedBuffer.size);
  476. RyanMqttCheck(NULL != fixedBuffer.pBuffer, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  477. // qos0不需要 packetId
  478. if (RyanMqttQos0 == qos)
  479. {
  480. packetId = 0;
  481. }
  482. else
  483. {
  484. packetId = RyanMqttGetNextPacketId(client);
  485. }
  486. // 序列化数据包
  487. status = MQTT_SerializePublish(&publishInfo, packetId, remainingLength, &fixedBuffer);
  488. RyanMqttCheckCode(MQTTSuccess == status, RyanMqttSerializePacketError, RyanMqttLog_d,
  489. { platformMemoryFree(fixedBuffer.pBuffer); });
  490. if (RyanMqttQos0 == qos)
  491. {
  492. // 发送报文
  493. result = RyanMqttSendPacket(client, fixedBuffer.pBuffer, fixedBuffer.size);
  494. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  495. { platformMemoryFree(fixedBuffer.pBuffer); });
  496. platformMemoryFree(fixedBuffer.pBuffer);
  497. }
  498. else
  499. {
  500. RyanMqttMsgHandler_t *msgHandler;
  501. RyanMqttAckHandler_t *userAckHandler;
  502. uint8_t packetType = (RyanMqttQos1 == qos) ? MQTT_PACKET_TYPE_PUBACK : MQTT_PACKET_TYPE_PUBREC;
  503. // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
  504. result = RyanMqttMsgHandlerCreate(client, publishInfo.pTopicName, publishInfo.topicNameLength,
  505. RyanMqttMsgInvalidPacketId, qos, userData, &msgHandler);
  506. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d,
  507. { platformMemoryFree(fixedBuffer.pBuffer); });
  508. result = RyanMqttAckHandlerCreate(client, packetType, packetId, fixedBuffer.size, fixedBuffer.pBuffer,
  509. msgHandler, &userAckHandler, RyanMqttTrue);
  510. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, {
  511. platformMemoryFree(fixedBuffer.pBuffer);
  512. RyanMqttMsgHandlerDestroy(client, msgHandler);
  513. });
  514. // 一定要先加再send,send一定在mqtt broker回复前执行完,要不可能线程调度mqtt返回消息会比添加ack更快执行
  515. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  516. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  517. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_e, {
  518. RyanMqttLog_e("RyanMqttSendPacket failed, clear user ack session");
  519. // userAck 必须通过这个执行,因为可能已经复制到mqtt内核空间了
  520. RyanMqttClearAckSession(client, packetType, packetId);
  521. });
  522. }
  523. return result;
  524. }
  525. /**
  526. * @brief 客户端向服务端发送消息
  527. *
  528. * @param client
  529. * @param topic
  530. * @param payload
  531. * @param payloadLen
  532. * @param QOS
  533. * @param retain
  534. * @return RyanMqttError_e
  535. */
  536. RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen,
  537. RyanMqttQos_e qos, RyanMqttBool_e retain)
  538. {
  539. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, RyanMqttLog_d);
  540. return RyanMqttPublishWithUserData(client, topic, RyanMqttStrlen(topic), payload, payloadLen, qos, retain,
  541. NULL);
  542. }
  543. /**
  544. * @brief 获取已订阅主题
  545. * !此函数是非线程安全的,已不推荐使用
  546. * !请使用 RyanMqttGetSubscribeSafe 代替
  547. * !如果另一个线程在这个调用返回后立即取消订阅,topic将指向非法内存
  548. *
  549. * @param client
  550. * @param msgHandles 存放已订阅主题的空间
  551. * @param msgHandleSize 存放已订阅主题的空间大小个数
  552. * @param subscribeNum 函数内部返回已订阅主题的个数
  553. * @return RyanMqttState_e
  554. */
  555. RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize,
  556. int32_t *subscribeNum)
  557. {
  558. RyanMqttError_e result = RyanMqttSuccessError;
  559. RyanMqttList_t *curr, *next;
  560. RyanMqttMsgHandler_t *msgHandler;
  561. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  562. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
  563. RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
  564. RyanMqttCheck(0 < msgHandleSize, RyanMqttParamInvalidError, RyanMqttLog_d);
  565. *subscribeNum = 0;
  566. platformMutexLock(client->config.userData, &client->msgHandleLock);
  567. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  568. {
  569. if (*subscribeNum >= msgHandleSize)
  570. {
  571. result = RyanMqttNoRescourceError;
  572. break;
  573. }
  574. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  575. msgHandles[*subscribeNum].topic = msgHandler->topic;
  576. msgHandles[*subscribeNum].qos = msgHandler->qos;
  577. (*subscribeNum)++;
  578. }
  579. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  580. return result;
  581. }
  582. /**
  583. * @brief 安全的获取已订阅主题列表,仅可通过 RyanMqttSafeFreeSubscribeResources 进行安全释放。
  584. *
  585. * @param client
  586. * @param msgHandles
  587. * @param subscribeNum
  588. * @return RyanMqttError_e
  589. */
  590. RyanMqttError_e RyanMqttGetSubscribeSafe(RyanMqttClient_t *client, RyanMqttMsgHandler_t **msgHandles,
  591. int32_t *subscribeNum)
  592. {
  593. RyanMqttError_e result = RyanMqttSuccessError;
  594. RyanMqttList_t *curr, *next;
  595. RyanMqttMsgHandler_t *msgHandler;
  596. int32_t subscribeTotal;
  597. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  598. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
  599. RyanMqttCheck(NULL != subscribeNum, RyanMqttParamInvalidError, RyanMqttLog_d);
  600. RyanMqttGetSubscribeTotalCount(client, &subscribeTotal);
  601. if (0 == subscribeTotal)
  602. {
  603. *msgHandles = NULL;
  604. *subscribeNum = 0;
  605. return RyanMqttSuccessError;
  606. }
  607. RyanMqttMsgHandler_t *msgHandlerArr = platformMemoryMalloc(sizeof(RyanMqttMsgHandler_t) * subscribeTotal);
  608. if (NULL == msgHandlerArr)
  609. {
  610. result = RyanMqttNotEnoughMemError;
  611. goto __exit;
  612. }
  613. int32_t subscribeCount = 0;
  614. platformMutexLock(client->config.userData, &client->msgHandleLock);
  615. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  616. {
  617. if (subscribeCount >= subscribeTotal)
  618. {
  619. break;
  620. }
  621. msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  622. RyanMqttMemcpy(&msgHandlerArr[subscribeCount], msgHandler, sizeof(RyanMqttMsgHandler_t));
  623. result = RyanMqttDupString(&msgHandlerArr[subscribeCount].topic, msgHandler->topic,
  624. msgHandler->topicLen);
  625. if (RyanMqttSuccessError != result)
  626. {
  627. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  628. RyanMqttSafeFreeSubscribeResources(msgHandlerArr, subscribeCount);
  629. result = RyanMqttNotEnoughMemError;
  630. goto __exit;
  631. }
  632. subscribeCount++;
  633. }
  634. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  635. *msgHandles = msgHandlerArr;
  636. *subscribeNum = subscribeCount;
  637. __exit:
  638. return result;
  639. }
  640. /**
  641. * @brief 安全释放订阅主题列表(禁止直接调用free函数)
  642. *
  643. * @param msgHandles
  644. * @param subscribeNum
  645. * @return RyanMqttError_e
  646. */
  647. RyanMqttError_e RyanMqttSafeFreeSubscribeResources(RyanMqttMsgHandler_t *msgHandles, int32_t subscribeNum)
  648. {
  649. RyanMqttError_e result = RyanMqttSuccessError;
  650. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, RyanMqttLog_d);
  651. // RyanMqttGetSubscribeTotalCount 内部调用的时候可以会等于0
  652. RyanMqttCheck(subscribeNum >= 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  653. for (int32_t i = 0; i < subscribeNum; i++)
  654. {
  655. // 不加null判断,因为如果是空,一定是用户程序内存访问越界了
  656. platformMemoryFree(msgHandles[i].topic);
  657. }
  658. platformMemoryFree(msgHandles);
  659. return result;
  660. }
  661. /**
  662. * @brief 获取已订阅主题个数
  663. *
  664. * @param client
  665. * @param subscribeTotalCount
  666. * @return RyanMqttError_e
  667. */
  668. RyanMqttError_e RyanMqttGetSubscribeTotalCount(RyanMqttClient_t *client, int32_t *subscribeTotalCount)
  669. {
  670. RyanMqttList_t *curr, *next;
  671. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  672. RyanMqttCheck(NULL != subscribeTotalCount, RyanMqttParamInvalidError, RyanMqttLog_d);
  673. *subscribeTotalCount = 0;
  674. platformMutexLock(client->config.userData, &client->msgHandleLock);
  675. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  676. {
  677. (*subscribeTotalCount)++;
  678. }
  679. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  680. return RyanMqttSuccessError;
  681. }
  682. /**
  683. * @brief 获取mqtt客户端状态
  684. *
  685. * @param client
  686. * @return RyanMqttState_e
  687. */
  688. RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
  689. {
  690. if (NULL == client)
  691. {
  692. return RyanMqttInvalidState;
  693. }
  694. return RyanMqttGetClientState(client);
  695. }
  696. /**
  697. * @brief 获取 keepalive 剩余时间
  698. *
  699. * @param client
  700. * @param keepAliveRemain
  701. * @return RyanMqttError_e
  702. */
  703. RyanMqttError_e RyanMqttGetKeepAliveRemain(RyanMqttClient_t *client, uint32_t *keepAliveRemain)
  704. {
  705. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  706. RyanMqttCheck(NULL != keepAliveRemain, RyanMqttParamInvalidError, RyanMqttLog_d);
  707. platformCriticalEnter(client->config.userData, &client->criticalLock);
  708. *keepAliveRemain = RyanMqttTimerRemain(&client->keepaliveTimer);
  709. platformCriticalExit(client->config.userData, &client->criticalLock);
  710. return RyanMqttSuccessError;
  711. }
  712. static RyanMqttError_e RyanMqttClientConfigDeepCopy(RyanMqttClientConfig_t *destConfig,
  713. RyanMqttClientConfig_t *srcConfig)
  714. {
  715. RyanMqttError_e result = RyanMqttSuccessError;
  716. RyanMqttAssert(NULL != destConfig && NULL != srcConfig);
  717. // 清除需要申请内存的字段
  718. uint16_t clientIdLen = RyanMqttStrlen(srcConfig->clientId) + 1;
  719. uint16_t userNameLen = 0;
  720. uint16_t passwordLen = 0;
  721. uint16_t hostLen = RyanMqttStrlen(srcConfig->host) + 1;
  722. uint16_t taskNameLen = RyanMqttStrlen(srcConfig->taskName) + 1;
  723. if (NULL != srcConfig->userName)
  724. {
  725. userNameLen += RyanMqttStrlen(srcConfig->userName) + 1;
  726. }
  727. if (NULL != srcConfig->password)
  728. {
  729. passwordLen += RyanMqttStrlen(srcConfig->password) + 1;
  730. }
  731. // 获取申请内存大小
  732. uint32_t mallocSize = clientIdLen + userNameLen + passwordLen + hostLen + taskNameLen;
  733. char *buf = (char *)platformMemoryMalloc(mallocSize);
  734. RyanMqttCheck(NULL != buf, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  735. RyanMqttMemset(buf, 0, mallocSize);
  736. // 拷贝数据
  737. RyanMqttMemcpy(destConfig, srcConfig, sizeof(RyanMqttClientConfig_t));
  738. uint32_t offset = 0;
  739. // 共同使用一块内存
  740. #define copyConfigFieldWithOffset(key, valueLen) \
  741. destConfig->key = buf + offset; \
  742. if ((valueLen) != 1) RyanMqttMemcpy(destConfig->key, srcConfig->key, valueLen); \
  743. offset += (valueLen);
  744. copyConfigFieldWithOffset(clientId, clientIdLen); // 必须第一个字段
  745. if (NULL != srcConfig->userName)
  746. {
  747. copyConfigFieldWithOffset(userName, userNameLen);
  748. }
  749. else
  750. {
  751. destConfig->userName = NULL;
  752. }
  753. if (NULL != srcConfig->password)
  754. {
  755. copyConfigFieldWithOffset(password, passwordLen);
  756. }
  757. else
  758. {
  759. destConfig->password = NULL;
  760. }
  761. copyConfigFieldWithOffset(host, hostLen);
  762. copyConfigFieldWithOffset(taskName, taskNameLen);
  763. return result;
  764. }
  765. /**
  766. * @brief 获取mqtt config
  767. * !非线程安全,多线程通过set和get数据可能会错乱甚至崩溃。
  768. * !使用完毕后,需要用户手动调用 RyanMqttFreeConfigFromGet 释放指针空间
  769. *
  770. * @param client
  771. * @param pclientConfig
  772. * @return RyanMqttError_e
  773. */
  774. RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
  775. {
  776. RyanMqttError_e result = RyanMqttSuccessError;
  777. RyanMqttClientConfig_t *clientConfig;
  778. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  779. RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
  780. RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  781. clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
  782. RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError, RyanMqttLog_d);
  783. result = RyanMqttClientConfigDeepCopy(clientConfig, &client->config);
  784. RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  785. if (RyanMqttSuccessError == result)
  786. {
  787. *pclientConfig = clientConfig;
  788. }
  789. else
  790. {
  791. *pclientConfig = NULL;
  792. platformMemoryFree(clientConfig);
  793. }
  794. return result;
  795. }
  796. /**
  797. * @brief 释放通过 RyanMqttGetConfig 获取的配置信息 (禁止直接调用free函数)
  798. *
  799. * @param clientConfig
  800. * @return RyanMqttError_e
  801. */
  802. RyanMqttError_e RyanMqttFreeConfigFromGet(RyanMqttClientConfig_t *clientConfig)
  803. {
  804. RyanMqttError_e result = RyanMqttSuccessError;
  805. RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
  806. RyanMqttPurgeConfig(clientConfig);
  807. platformMemoryFree(clientConfig);
  808. return result;
  809. }
  810. // todo 增加更多校验,比如判断心跳包和recv的关系
  811. /**
  812. * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt
  813. * thread线程和用户线程的资源互斥
  814. *
  815. * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt
  816. * thread处于挂起状态时调用 mqtt thread处于阻塞状态时调用此函数也是很危险的行为
  817. * 要保证mqtt线程和用户线程的资源互斥
  818. * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
  819. *
  820. * !项目中用户不应频繁调用此函数
  821. *
  822. * 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
  823. *
  824. * @param client
  825. * @param clientConfig
  826. * @return RyanMqttError_e
  827. */
  828. RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
  829. {
  830. RyanMqttError_e result = RyanMqttSuccessError;
  831. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  832. RyanMqttCheck(NULL != clientConfig, RyanMqttParamInvalidError, RyanMqttLog_d);
  833. RyanMqttCheck(RyanMqttInvalidState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  834. RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, RyanMqttLog_d);
  835. RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, RyanMqttLog_d);
  836. RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, RyanMqttLog_d);
  837. RyanMqttCheck(clientConfig->recvTimeout <= (uint32_t)clientConfig->keepaliveTimeoutS * 1000 / 2,
  838. RyanMqttParamInvalidError, RyanMqttLog_d);
  839. RyanMqttCheck(clientConfig->recvTimeout >= clientConfig->sendTimeout, RyanMqttParamInvalidError, RyanMqttLog_d);
  840. RyanMqttClientConfig_t tempConfig;
  841. result = RyanMqttClientConfigDeepCopy(&tempConfig, clientConfig);
  842. RyanMqttCheckNoReturn(RyanMqttSuccessError == result, result, RyanMqttLog_d);
  843. if (RyanMqttSuccessError == result)
  844. {
  845. // todo !因为这里是非线程安全的
  846. RyanMqttPurgeConfig(&client->config);
  847. client->config = tempConfig;
  848. }
  849. return result;
  850. }
  851. /**
  852. * @brief 设置遗嘱的配置信息
  853. * 此函数必须在发送connect报文前调用,因为遗嘱消息包含在connect报文中
  854. *
  855. * @param client
  856. * @param topicName
  857. * @param qos
  858. * @param retain
  859. * @param payload
  860. * @param payloadLen
  861. * @return RyanMqttError_e
  862. */
  863. RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen,
  864. RyanMqttQos_e qos, RyanMqttBool_e retain)
  865. {
  866. RyanMqttError_e result = RyanMqttSuccessError;
  867. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  868. RyanMqttCheck(NULL != topicName && RyanMqttStrlen(topicName) > 0, RyanMqttParamInvalidError, RyanMqttLog_d);
  869. RyanMqttCheck(RyanMqttConnectState != RyanMqttGetClientState(client), RyanMqttFailedError, RyanMqttLog_d);
  870. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, RyanMqttLog_d);
  871. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, RyanMqttLog_d);
  872. // 报文支持有效载荷长度为0
  873. if (NULL == payload && payloadLen > 0)
  874. {
  875. return RyanMqttParamInvalidError;
  876. }
  877. platformMutexLock(client->config.userData, &client->userSessionLock);
  878. // 之前如果设置过遗嘱就进行资源释放,否则申请空间
  879. if (NULL == client->lwtOptions)
  880. {
  881. client->lwtOptions = (lwtOptions_t *)platformMemoryMalloc(sizeof(lwtOptions_t));
  882. RyanMqttCheckCodeNoReturn(NULL != client->lwtOptions, RyanMqttNotEnoughMemError, RyanMqttLog_d, {
  883. result = RyanMqttNotEnoughMemError;
  884. goto __exit;
  885. });
  886. }
  887. else
  888. {
  889. if (NULL != client->lwtOptions->topic)
  890. {
  891. platformMemoryFree(client->lwtOptions->topic);
  892. }
  893. if (NULL != client->lwtOptions->payload)
  894. {
  895. platformMemoryFree(client->lwtOptions->payload);
  896. }
  897. }
  898. RyanMqttMemset(client->lwtOptions, 0, sizeof(lwtOptions_t));
  899. if (payloadLen > 0)
  900. {
  901. result = RyanMqttDupString(&client->lwtOptions->payload, payload, payloadLen);
  902. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  903. }
  904. else
  905. {
  906. client->lwtOptions->payload = NULL;
  907. }
  908. result = RyanMqttDupString(&client->lwtOptions->topic, topicName, RyanMqttStrlen(topicName));
  909. RyanMqttCheckCode(RyanMqttSuccessError == result, result, RyanMqttLog_d, { goto __exit; });
  910. client->lwtOptions->lwtFlag = RyanMqttTrue;
  911. client->lwtOptions->qos = qos;
  912. client->lwtOptions->retain = retain;
  913. client->lwtOptions->payloadLen = payloadLen;
  914. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  915. return RyanMqttSuccessError;
  916. __exit:
  917. if (NULL != client->lwtOptions)
  918. {
  919. if (NULL != client->lwtOptions->topic)
  920. {
  921. platformMemoryFree(client->lwtOptions->topic);
  922. }
  923. if (NULL != client->lwtOptions->payload)
  924. {
  925. platformMemoryFree(client->lwtOptions->payload);
  926. }
  927. platformMemoryFree(client->lwtOptions);
  928. client->lwtOptions = NULL;
  929. }
  930. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  931. return result;
  932. }
  933. /**
  934. * @brief 丢弃指定ack
  935. *
  936. * @param client
  937. * @param ackHandler
  938. * @return RyanMqttError_e
  939. */
  940. RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, uint8_t packetType, uint16_t packetId)
  941. {
  942. RyanMqttError_e result = RyanMqttSuccessError;
  943. RyanMqttAckHandler_t *ackHandler;
  944. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  945. RyanMqttCheck(0 < packetId && packetId <= RyanMqttMaxPacketId, RyanMqttParamInvalidError, RyanMqttLog_d);
  946. // 删除pubrel记录
  947. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler, RyanMqttTrue);
  948. if (RyanMqttSuccessError == result)
  949. {
  950. RyanMqttEventMachine(client, RyanMqttEventAckHandlerDiscard, (void *)ackHandler); // 回调函数
  951. RyanMqttAckHandlerDestroy(client, ackHandler);
  952. }
  953. return result;
  954. }
  955. RyanMqttError_e RyanMqttGetEventId(RyanMqttClient_t *client, RyanMqttEventId_e *eventId)
  956. {
  957. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  958. RyanMqttCheck(NULL != eventId, RyanMqttParamInvalidError, RyanMqttLog_d);
  959. platformCriticalEnter(client->config.userData, &client->criticalLock);
  960. *eventId = client->eventFlag;
  961. platformCriticalExit(client->config.userData, &client->criticalLock);
  962. return RyanMqttSuccessError;
  963. }
  964. RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  965. {
  966. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  967. platformCriticalEnter(client->config.userData, &client->criticalLock);
  968. client->eventFlag |= eventId;
  969. platformCriticalExit(client->config.userData, &client->criticalLock);
  970. return RyanMqttSuccessError;
  971. }
  972. RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  973. {
  974. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, RyanMqttLog_d);
  975. platformCriticalEnter(client->config.userData, &client->criticalLock);
  976. client->eventFlag &= ~eventId;
  977. platformCriticalExit(client->config.userData, &client->criticalLock);
  978. return RyanMqttSuccessError;
  979. }