RyanMqttClient.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. // #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlError) // 日志打印等级
  4. #define rlogTag "RyanMqttClient" // 日志tag
  5. #include "RyanMqttLog.h"
  6. #include "MQTTPacket.h"
  7. #include "RyanMqttClient.h"
  8. #include "RyanMqttUtile.h"
  9. #include "RyanMqttThread.h"
  10. /**
  11. * @brief 获取报文标识符,报文标识符不可为0
  12. * 都在sendbuf锁内调用
  13. * @param client
  14. * @return uint16_t
  15. */
  16. static uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
  17. {
  18. RyanMqttAssert(NULL != client);
  19. client->packetId = (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1) ? 1 : client->packetId + 1;
  20. return client->packetId;
  21. }
  22. static RyanMqttError_e setConfigValue(char **dest, char const *const rest)
  23. {
  24. if (NULL == dest || NULL == rest)
  25. return RyanMqttNoRescourceError;
  26. if (NULL != *dest)
  27. platformMemoryFree(*dest);
  28. RyanMqttStringCopy(dest, (char *)rest, strlen(rest));
  29. if (NULL == *dest)
  30. return RyanMqttFailedError;
  31. return RyanMqttSuccessError;
  32. }
  33. /**
  34. * @brief mqtt初始化
  35. *
  36. * @param clientConfig
  37. * @param pClient mqtt客户端指针
  38. * @return RyanMqttError_e
  39. */
  40. RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
  41. {
  42. RyanMqttClient_t *client = NULL;
  43. RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, rlog_d);
  44. client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
  45. RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, rlog_d);
  46. memset(client, 0, sizeof(RyanMqttClient_t));
  47. // 网络接口初始化
  48. platformNetworkInit(client->config.userData, &client->network);
  49. platformMutexInit(client->config.userData, &client->sendBufLock); // 初始化发送缓冲区互斥锁
  50. platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
  51. client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
  52. client->clientState = RyanMqttInitState;
  53. client->eventFlag = 0;
  54. client->ackHandlerCount = 0;
  55. client->lwtFlag = RyanMqttFalse;
  56. RyanListInit(&client->msgHandlerList);
  57. platformMutexInit(client->config.userData, &client->msgHandleLock);
  58. RyanListInit(&client->ackHandlerList);
  59. platformMutexInit(client->config.userData, &client->ackHandleLock);
  60. RyanListInit(&client->userAckHandlerList);
  61. platformMutexInit(client->config.userData, &client->userAckHandleLock);
  62. RyanMqttSetClientState(client, RyanMqttInitState);
  63. platformTimerInit(&client->keepaliveTimer);
  64. *pClient = client;
  65. return RyanMqttSuccessError;
  66. }
  67. /**
  68. * @brief 销毁mqtt客户端
  69. * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
  70. * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
  71. * !mqtt删除自己前会调用 RyanMqttEventDestoryBefore 事件回调
  72. * !调用此函数后就不应该再对该客户端进行任何操作
  73. * @param client
  74. * @return RyanMqttError_e
  75. */
  76. RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
  77. {
  78. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  79. platformCriticalEnter(client->config.userData, &client->criticalLock);
  80. client->destoryFlag = RyanMqttTrue;
  81. platformCriticalExit(client->config.userData, &client->criticalLock);
  82. return RyanMqttSuccessError;
  83. }
  84. /**
  85. * @brief 启动mqtt客户端
  86. * !不要重复调用
  87. *
  88. * @param client
  89. * @return RyanMqttError_e
  90. */
  91. RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
  92. {
  93. RyanMqttError_e result = RyanMqttSuccessError;
  94. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  95. RyanMqttCheck(RyanMqttInitState == client->clientState, RyanMqttFailedError, rlog_d);
  96. RyanMqttSetClientState(client, RyanMqttStartState);
  97. // 连接成功,需要初始化 MQTT 线程
  98. result = platformThreadInit(client->config.userData,
  99. &client->mqttThread,
  100. client->config.taskName,
  101. RyanMqttThread,
  102. client,
  103. client->config.taskStack,
  104. client->config.taskPrio);
  105. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttSetClientState(client, RyanMqttInitState); });
  106. return RyanMqttSuccessError;
  107. }
  108. /**
  109. * @brief 断开mqtt服务器连接
  110. *
  111. * @param client
  112. * @param sendDiscFlag RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
  113. * @return RyanMqttError_e
  114. */
  115. RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
  116. {
  117. RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
  118. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  119. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  120. if (RyanMqttTrue == sendDiscFlag)
  121. {
  122. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  123. // 序列化断开连接数据包并发送
  124. int32_t packetLen = MQTTSerialize_disconnect((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
  125. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  126. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  127. });
  128. RyanMqttError_e result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  129. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  130. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  131. });
  132. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  133. }
  134. connectState = RyanMqttConnectUserDisconnected;
  135. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  136. return RyanMqttSuccessError;
  137. }
  138. /**
  139. * @brief 手动重连mqtt客户端
  140. * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
  141. * ! 否则可能会造成内存泄漏
  142. *
  143. * @param client
  144. * @return RyanMqttError_e
  145. */
  146. RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
  147. {
  148. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  149. RyanMqttCheck(RyanMqttDisconnectState == RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
  150. if (RyanMqttTrue == client->config.autoReconnectFlag)
  151. return RyanMqttNoRescourceError;
  152. platformThreadStart(client->config.userData, &client->mqttThread);
  153. return RyanMqttSuccessError;
  154. }
  155. /**
  156. * @brief 订阅主题
  157. *
  158. * @param client
  159. * @param topic
  160. * @param qos 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
  161. * @return RyanMqttError_e
  162. */
  163. RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
  164. {
  165. RyanMqttError_e result = RyanMqttSuccessError;
  166. int32_t packetLen = 0;
  167. uint16_t packetId = 0;
  168. int requestedQoS = 0;
  169. RyanMqttMsgHandler_t *msgHandler = NULL;
  170. RyanMqttAckHandler_t *userAckHandler = NULL;
  171. MQTTString topicName = MQTTString_initializer;
  172. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  173. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  174. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  175. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  176. requestedQoS = qos;
  177. topicName.lenstring.data = topic;
  178. topicName.lenstring.len = strlen(topic);
  179. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, qos, &msgHandler);
  180. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  181. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  182. packetId = RyanMqttGetNextPacketId(client);
  183. packetLen = MQTTSerialize_subscribe((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, packetId, 1, &topicName, &requestedQoS);
  184. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  185. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  186. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  187. });
  188. result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, client->config.sendBuffer, msgHandler, &userAckHandler);
  189. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  190. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  191. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  192. });
  193. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  194. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  195. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  196. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  197. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  198. RyanMqttAckHandlerDestroy(client, userAckHandler);
  199. });
  200. return result;
  201. }
  202. /**
  203. * @brief 取消订阅指定主题
  204. *
  205. * @param client
  206. * @param topic
  207. * @return RyanMqttError_e
  208. */
  209. RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
  210. {
  211. int32_t packetLen = 0;
  212. RyanMqttError_e result = RyanMqttFailedError;
  213. uint16_t packetId;
  214. RyanMqttMsgHandler_t *subMsgHandler = NULL;
  215. RyanMqttMsgHandler_t *msgHandler = NULL;
  216. RyanMqttAckHandler_t *userAckHandler = NULL;
  217. MQTTString topicName = MQTTString_initializer;
  218. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  219. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  220. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  221. topicName.lenstring.data = topic;
  222. topicName.lenstring.len = strlen(topic);
  223. // 查找当前主题是否已经订阅,没有订阅就取消发送
  224. result = RyanMqttMsgHandlerFind(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttFalse, &subMsgHandler);
  225. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  226. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttQos0, &msgHandler);
  227. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  228. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  229. packetId = RyanMqttGetNextPacketId(client);
  230. packetLen = MQTTSerialize_unsubscribe((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, packetId, 1, &topicName);
  231. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  232. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  233. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  234. });
  235. result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, client->config.sendBuffer, msgHandler, &userAckHandler);
  236. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  237. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  238. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  239. });
  240. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  241. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  242. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  243. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  244. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  245. RyanMqttAckHandlerDestroy(client, userAckHandler);
  246. });
  247. return result;
  248. }
  249. /**
  250. * @brief 客户端向服务端发送消息
  251. *
  252. * @param client
  253. * @param topic
  254. * @param payload
  255. * @param payloadLen
  256. * @param QOS
  257. * @param retain
  258. * @return RyanMqttError_e
  259. */
  260. RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
  261. {
  262. RyanMqttError_e result = RyanMqttSuccessError;
  263. int32_t packetLen = 0;
  264. int32_t packetId = 0;
  265. MQTTString topicName = MQTTString_initializer;
  266. RyanMqttMsgHandler_t *msgHandler = NULL;
  267. RyanMqttAckHandler_t *userAckHandler = NULL;
  268. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  269. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  270. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
  271. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  272. RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
  273. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  274. if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
  275. return RyanMqttParamInvalidError;
  276. topicName.lenstring.data = topic;
  277. topicName.lenstring.len = strlen(topic);
  278. if (RyanMqttQos0 == qos)
  279. {
  280. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  281. packetLen = MQTTSerialize_publish((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, qos, retain, packetId,
  282. topicName, (uint8_t *)payload, payloadLen);
  283. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
  284. { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
  285. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  286. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  287. return result;
  288. }
  289. // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
  290. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, qos, &msgHandler);
  291. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  292. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  293. packetId = RyanMqttGetNextPacketId(client);
  294. packetLen = MQTTSerialize_publish((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, qos, retain, packetId,
  295. topicName, (uint8_t *)payload, payloadLen);
  296. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  297. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  298. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  299. });
  300. result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, client->config.sendBuffer, msgHandler, &userAckHandler);
  301. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  302. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  303. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  304. });
  305. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  306. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  307. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  308. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  309. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  310. RyanMqttAckHandlerDestroy(client, userAckHandler);
  311. });
  312. return RyanMqttSuccessError;
  313. }
  314. /**
  315. * @brief 获取mqtt客户端状态
  316. *
  317. * @param client
  318. * @return RyanMqttState_e
  319. */
  320. RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
  321. {
  322. if (NULL == client)
  323. return RyanMqttInvalidState;
  324. return RyanMqttGetClientState(client);
  325. }
  326. /**
  327. * @brief 获取已订阅主题
  328. *
  329. * @param client
  330. * @param msgHandles 存放已订阅主题的空间
  331. * @param msgHandleSize 存放已订阅主题的空间大小个数
  332. * @param subscribeNum 函数内部返回已订阅主题的个数
  333. * @return RyanMqttState_e
  334. */
  335. RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize, int32_t *subscribeNum)
  336. {
  337. RyanMqttError_e result = RyanMqttSuccessError;
  338. RyanList_t *curr = NULL,
  339. *next = NULL;
  340. RyanMqttMsgHandler_t *msgHandler = NULL;
  341. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  342. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, rlog_d);
  343. RyanMqttCheck(1 <= msgHandleSize, RyanMqttParamInvalidError, rlog_d);
  344. *subscribeNum = 0;
  345. platformMutexLock(client->config.userData, &client->msgHandleLock);
  346. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  347. {
  348. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  349. msgHandles[*subscribeNum].topic = msgHandler->topic;
  350. msgHandles[*subscribeNum].qos = msgHandler->qos;
  351. (*subscribeNum)++;
  352. if (*subscribeNum >= msgHandleSize)
  353. {
  354. result = RyanMqttNoRescourceError;
  355. goto __next;
  356. }
  357. }
  358. __next:
  359. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  360. return result;
  361. }
  362. /**
  363. * @brief 获取mqtt config
  364. * 使用完毕后,需要用户释放pclientConfig指针内容
  365. *
  366. * @param client
  367. * @param pclientConfig
  368. * @return RyanMqttError_e
  369. */
  370. /* RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
  371. {
  372. RyanMqttError_e result = RyanMqttSuccessError;
  373. RyanMqttClientConfig_t *clientConfig = NULL;
  374. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  375. RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, rlog_d);
  376. RyanMqttCheck(NULL != client->config, RyanMqttNoRescourceError);
  377. clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
  378. RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError);
  379. memcpy(clientConfig, client->config, sizeof(RyanMqttClientConfig_t));
  380. result = setConfigValue(&clientConfig->clientId, client->config->clientId);
  381. RyanMqttCheck(RyanMqttSuccessError == result, result);
  382. result = setConfigValue(&clientConfig->userName, client->config->userName);
  383. RyanMqttCheck(RyanMqttSuccessError == result, result);
  384. result = setConfigValue(&clientConfig->password, client->config->password);
  385. RyanMqttCheck(RyanMqttSuccessError == result, result);
  386. result = setConfigValue(&clientConfig->host, client->config->host);
  387. RyanMqttCheck(RyanMqttSuccessError == result, result);
  388. result = setConfigValue(&clientConfig->port, client->config->port);
  389. RyanMqttCheck(RyanMqttSuccessError == result, result);
  390. result = setConfigValue(&clientConfig->taskName, client->config->taskName);
  391. RyanMqttCheck(RyanMqttSuccessError == result, result);
  392. *pclientConfig = clientConfig;
  393. return RyanMqttSuccessError;
  394. }
  395. */
  396. /**
  397. * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt thread线程和用户线程的资源互斥
  398. *
  399. * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt thread处于挂起状态时调用
  400. * mqtt thread处于阻塞状态时调用此函数也是很危险的行为
  401. * 要保证mqtt线程和用户线程的资源互斥
  402. * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
  403. *
  404. * !项目中用户不应频繁调用此函数
  405. * ! 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
  406. *
  407. * @param client
  408. * @param clientConfig
  409. * @return RyanMqttError_e
  410. */
  411. RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
  412. {
  413. RyanMqttError_e result = RyanMqttSuccessError;
  414. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  415. RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, rlog_d);
  416. RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, rlog_d);
  417. RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, rlog_d);
  418. RyanMqttCheck(13 < clientConfig->recvBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->recvBufferSize, RyanMqttParamInvalidError, rlog_d);
  419. RyanMqttCheck(13 < clientConfig->sendBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->sendBufferSize, RyanMqttParamInvalidError, rlog_d);
  420. result = setConfigValue(&client->config.clientId, clientConfig->clientId);
  421. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  422. if (NULL == clientConfig->userName)
  423. {
  424. client->config.userName = NULL;
  425. }
  426. else
  427. {
  428. result = setConfigValue(&client->config.userName, clientConfig->userName);
  429. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  430. }
  431. if (NULL == clientConfig->password)
  432. {
  433. client->config.password = NULL;
  434. }
  435. else
  436. {
  437. result = setConfigValue(&client->config.password, clientConfig->password);
  438. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  439. }
  440. result = setConfigValue(&client->config.host, clientConfig->host);
  441. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  442. result = setConfigValue(&client->config.taskName, clientConfig->taskName);
  443. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  444. client->config.port = clientConfig->port;
  445. client->config.taskPrio = clientConfig->taskPrio;
  446. client->config.taskStack = clientConfig->taskStack;
  447. client->config.mqttVersion = clientConfig->mqttVersion;
  448. client->config.ackHandlerRepeatCountWarning = clientConfig->ackHandlerRepeatCountWarning;
  449. client->config.ackHandlerCountWarning = clientConfig->ackHandlerCountWarning;
  450. client->config.autoReconnectFlag = clientConfig->autoReconnectFlag;
  451. client->config.cleanSessionFlag = clientConfig->cleanSessionFlag;
  452. client->config.reconnectTimeout = clientConfig->reconnectTimeout;
  453. client->config.recvTimeout = clientConfig->recvTimeout;
  454. client->config.sendTimeout = clientConfig->sendTimeout;
  455. client->config.ackTimeout = clientConfig->ackTimeout;
  456. client->config.keepaliveTimeoutS = clientConfig->keepaliveTimeoutS;
  457. client->config.mqttEventHandle = clientConfig->mqttEventHandle;
  458. client->config.userData = clientConfig->userData;
  459. client->config.recvBufferSize = clientConfig->recvBufferSize;
  460. client->config.sendBufferSize = clientConfig->sendBufferSize;
  461. client->config.recvBuffer = clientConfig->recvBuffer;
  462. client->config.sendBuffer = clientConfig->sendBuffer;
  463. return RyanMqttSuccessError;
  464. __exit:
  465. RyanMqttDestroy(client);
  466. return RyanMqttFailedError;
  467. }
  468. /**
  469. * @brief 设置遗嘱的配置信息
  470. * 此函数必须在发送connect报文前调用,因为遗嘱消息包含在connect报文中
  471. * 例如 RyanMqttStart前 / RyanMqttEventReconnectBefore事件中
  472. *
  473. * @param client
  474. * @param topicName
  475. * @param qos
  476. * @param retain
  477. * @param payload
  478. * @return RyanMqttError_e
  479. */
  480. RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
  481. {
  482. RyanMqttError_e result = RyanMqttSuccessError;
  483. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  484. RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, rlog_d);
  485. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
  486. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  487. RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
  488. if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
  489. return RyanMqttParamInvalidError;
  490. if (NULL != client->lwtOptions.topic)
  491. platformMemoryFree(client->lwtOptions.topic);
  492. if (NULL != client->lwtOptions.payload)
  493. platformMemoryFree(client->lwtOptions.payload);
  494. memset(&client->lwtOptions, 0, sizeof(lwtOptions_t));
  495. client->lwtFlag = RyanMqttTrue;
  496. client->lwtOptions.qos = qos;
  497. client->lwtOptions.retain = retain;
  498. client->lwtOptions.payloadLen = payloadLen;
  499. result = RyanMqttStringCopy(&client->lwtOptions.payload, payload, payloadLen);
  500. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  501. result = RyanMqttStringCopy(&client->lwtOptions.topic, topicName, strlen(topicName));
  502. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(client->lwtOptions.payload); });
  503. return RyanMqttSuccessError;
  504. }
  505. /**
  506. * @brief 丢弃指定ack
  507. *
  508. * @param client
  509. * @param packetId
  510. * @return RyanMqttError_e
  511. */
  512. RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId)
  513. {
  514. RyanMqttError_e result = RyanMqttSuccessError;
  515. RyanMqttAckHandler_t *ackHandler = NULL;
  516. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  517. RyanMqttCheck(CONNECT <= packetType && DISCONNECT >= packetType, RyanMqttParamInvalidError, rlog_d);
  518. RyanMqttCheck(0 < packetId, RyanMqttParamInvalidError, rlog_d);
  519. // 删除pubrel记录
  520. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
  521. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttNoRescourceError, rlog_d);
  522. RyanMqttEventMachine(client, RyanMqttEventAckHandlerdiscard, (void *)ackHandler); // 回调函数
  523. RyanMqttAckListRemoveToAckList(client, ackHandler);
  524. RyanMqttAckHandlerDestroy(client, ackHandler);
  525. return RyanMqttSuccessError;
  526. }
  527. RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  528. {
  529. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  530. client->eventFlag |= eventId;
  531. return RyanMqttSuccessError;
  532. }
  533. RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  534. {
  535. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  536. client->eventFlag &= ~eventId;
  537. return RyanMqttSuccessError;
  538. }