RyanMqttClient.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. // #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlError) // 日志打印等级
  4. #define rlogTag "RyanMqttClient" // 日志tag
  5. #include "RyanMqttLog.h"
  6. #include "MQTTPacket.h"
  7. #include "RyanMqttClient.h"
  8. #include "RyanMqttUtile.h"
  9. #include "RyanMqttThread.h"
  10. /**
  11. * @brief 获取报文标识符,报文标识符不可为0
  12. * 都在sendbuf锁内调用
  13. * @param client
  14. * @return uint16_t
  15. */
  16. static uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
  17. {
  18. RyanMqttAssert(NULL != client);
  19. client->packetId = (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1) ? 1 : client->packetId + 1;
  20. return client->packetId;
  21. }
  22. static RyanMqttError_e setConfigValue(char **dest, char const *const rest)
  23. {
  24. if (NULL == dest || NULL == rest)
  25. return RyanMqttNoRescourceError;
  26. if (NULL != *dest)
  27. platformMemoryFree(*dest);
  28. RyanMqttStringCopy(dest, (char *)rest, strlen(rest));
  29. if (NULL == *dest)
  30. return RyanMqttFailedError;
  31. return RyanMqttSuccessError;
  32. }
  33. /**
  34. * @brief mqtt初始化
  35. *
  36. * @param clientConfig
  37. * @param pClient mqtt客户端指针
  38. * @return RyanMqttError_e
  39. */
  40. RyanMqttError_e RyanMqttInit(RyanMqttClient_t **pClient)
  41. {
  42. RyanMqttClient_t *client = NULL;
  43. RyanMqttCheck(NULL != pClient, RyanMqttParamInvalidError, rlog_d);
  44. client = (RyanMqttClient_t *)platformMemoryMalloc(sizeof(RyanMqttClient_t));
  45. RyanMqttCheck(NULL != client, RyanMqttNotEnoughMemError, rlog_d);
  46. memset(client, 0, sizeof(RyanMqttClient_t));
  47. // 网络接口初始化
  48. client->network.socket = -1;
  49. platformMutexInit(client->config.userData, &client->sendBufLock); // 初始化发送缓冲区互斥锁
  50. platformCriticalInit(client->config.userData, &client->criticalLock); // 初始化临界区
  51. client->packetId = 1; // 控制报文必须包含一个非零的 16 位报文标识符
  52. client->clientState = RyanMqttInitState;
  53. client->eventFlag = 0;
  54. client->ackHandlerCount = 0;
  55. client->lwtFlag = RyanMqttFalse;
  56. RyanListInit(&client->msgHandlerList);
  57. platformMutexInit(client->config.userData, &client->msgHandleLock);
  58. RyanListInit(&client->ackHandlerList);
  59. platformMutexInit(client->config.userData, &client->ackHandleLock);
  60. RyanListInit(&client->userAckHandlerList);
  61. platformMutexInit(client->config.userData, &client->userAckHandleLock);
  62. RyanMqttSetClientState(client, RyanMqttInitState);
  63. platformTimerInit(&client->keepaliveTimer);
  64. *pClient = client;
  65. return RyanMqttSuccessError;
  66. }
  67. /**
  68. * @brief 销毁mqtt客户端
  69. * !用户线程直接删除mqtt线程是很危险的行为。所以这里设置标志位,稍后由mqtt线程自己释放所占有的资源。
  70. * !mqtt删除自己的延时最大不会超过config里面 recvTimeout + 1秒
  71. * !mqtt删除自己前会调用 RyanMqttEventDestoryBefore 事件回调
  72. * !调用此函数后就不应该再对该客户端进行任何操作
  73. * @param client
  74. * @return RyanMqttError_e
  75. */
  76. RyanMqttError_e RyanMqttDestroy(RyanMqttClient_t *client)
  77. {
  78. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  79. platformCriticalEnter(client->config.userData, &client->criticalLock);
  80. client->destoryFlag = RyanMqttTrue;
  81. platformCriticalExit(client->config.userData, &client->criticalLock);
  82. return RyanMqttSuccessError;
  83. }
  84. /**
  85. * @brief 启动mqtt客户端
  86. * !不要重复调用
  87. *
  88. * @param client
  89. * @return RyanMqttError_e
  90. */
  91. RyanMqttError_e RyanMqttStart(RyanMqttClient_t *client)
  92. {
  93. RyanMqttError_e result = RyanMqttSuccessError;
  94. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  95. RyanMqttCheck(RyanMqttInitState == client->clientState, RyanMqttFailedError, rlog_d);
  96. RyanMqttSetClientState(client, RyanMqttStartState);
  97. // 连接成功,需要初始化 MQTT 线程
  98. result = platformThreadInit(client->config.userData,
  99. &client->mqttThread,
  100. client->config.taskName,
  101. RyanMqttThread,
  102. client,
  103. client->config.taskStack,
  104. client->config.taskPrio);
  105. RyanMqttCheckCode(RyanMqttSuccessError == result, RyanMqttNotEnoughMemError, rlog_d, { RyanMqttSetClientState(client, RyanMqttInitState); });
  106. return RyanMqttSuccessError;
  107. }
  108. /**
  109. * @brief 断开mqtt服务器连接
  110. *
  111. * @param client
  112. * @param sendDiscFlag RyanMqttTrue表示发送断开连接报文,RyanMqttFalse表示不发送断开连接报文
  113. * @return RyanMqttError_e
  114. */
  115. RyanMqttError_e RyanMqttDisconnect(RyanMqttClient_t *client, RyanMqttBool_e sendDiscFlag)
  116. {
  117. RyanMqttConnectStatus_e connectState = RyanMqttConnectUserDisconnected;
  118. RyanMqttError_e result = RyanMqttFailedError;
  119. int32_t packetLen = 0;
  120. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  121. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  122. if (RyanMqttTrue == sendDiscFlag)
  123. {
  124. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  125. // 序列化断开连接数据包并发送
  126. packetLen = MQTTSerialize_disconnect((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize);
  127. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  128. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  129. });
  130. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  131. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  132. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  133. });
  134. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  135. }
  136. connectState = RyanMqttConnectUserDisconnected;
  137. RyanMqttEventMachine(client, RyanMqttEventDisconnected, (void *)&connectState);
  138. return RyanMqttSuccessError;
  139. }
  140. /**
  141. * @brief 手动重连mqtt客户端
  142. * ! 仅在未使能自动连接时,客户端断开连接时用户手动调用
  143. * ! 否则可能会造成内存泄漏
  144. *
  145. * @param client
  146. * @return RyanMqttError_e
  147. */
  148. RyanMqttError_e RyanMqttReconnect(RyanMqttClient_t *client)
  149. {
  150. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  151. RyanMqttCheck(RyanMqttDisconnectState != RyanMqttGetClientState(client), RyanMqttConnectError, rlog_d);
  152. RyanMqttEventMachine(client, RyanMqttEventReconnectBefore, NULL);
  153. platformThreadStart(client->config.userData, &client->mqttThread);
  154. return RyanMqttSuccessError;
  155. }
  156. /**
  157. * @brief 订阅主题
  158. *
  159. * @param client
  160. * @param topic
  161. * @param qos 服务端可以授予比订阅者要求的低一些的QoS等级,可在订阅成功回调函数中查看服务端给定的qos等级
  162. * @return RyanMqttError_e
  163. */
  164. RyanMqttError_e RyanMqttSubscribe(RyanMqttClient_t *client, char *topic, RyanMqttQos_e qos)
  165. {
  166. RyanMqttError_e result = RyanMqttSuccessError;
  167. int32_t packetLen = 0;
  168. uint16_t packetId = 0;
  169. int requestedQoS = 0;
  170. RyanMqttMsgHandler_t *msgHandler = NULL;
  171. RyanMqttAckHandler_t *userAckHandler = NULL;
  172. MQTTString topicName = MQTTString_initializer;
  173. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  174. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  175. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  176. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  177. requestedQoS = qos;
  178. topicName.lenstring.data = topic;
  179. topicName.lenstring.len = strlen(topic);
  180. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, qos, &msgHandler);
  181. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  182. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  183. packetId = RyanMqttGetNextPacketId(client);
  184. packetLen = MQTTSerialize_subscribe((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, packetId, 1, &topicName, &requestedQoS);
  185. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  186. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  187. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  188. });
  189. result = RyanMqttAckHandlerCreate(client, SUBACK, packetId, packetLen, client->config.sendBuffer, msgHandler, &userAckHandler);
  190. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  191. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  192. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  193. });
  194. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  195. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  196. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  197. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  198. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  199. RyanMqttAckHandlerDestroy(client, userAckHandler);
  200. });
  201. return result;
  202. }
  203. /**
  204. * @brief 取消订阅指定主题
  205. *
  206. * @param client
  207. * @param topic
  208. * @return RyanMqttError_e
  209. */
  210. RyanMqttError_e RyanMqttUnSubscribe(RyanMqttClient_t *client, char *topic)
  211. {
  212. int32_t packetLen = 0;
  213. RyanMqttError_e result = RyanMqttFailedError;
  214. uint16_t packetId;
  215. RyanMqttMsgHandler_t *subMsgHandler = NULL;
  216. RyanMqttMsgHandler_t *msgHandler = NULL;
  217. RyanMqttAckHandler_t *userAckHandler = NULL;
  218. MQTTString topicName = MQTTString_initializer;
  219. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  220. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  221. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  222. topicName.lenstring.data = topic;
  223. topicName.lenstring.len = strlen(topic);
  224. // 查找当前主题是否已经订阅,没有订阅就取消发送
  225. result = RyanMqttMsgHandlerFind(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttFalse, &subMsgHandler);
  226. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  227. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, RyanMqttQos0, &msgHandler);
  228. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  229. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  230. packetId = RyanMqttGetNextPacketId(client);
  231. packetLen = MQTTSerialize_unsubscribe((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, packetId, 1, &topicName);
  232. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  233. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  234. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  235. });
  236. result = RyanMqttAckHandlerCreate(client, UNSUBACK, packetId, packetLen, client->config.sendBuffer, msgHandler, &userAckHandler);
  237. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  238. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  239. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  240. });
  241. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  242. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  243. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  244. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  245. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  246. RyanMqttAckHandlerDestroy(client, userAckHandler);
  247. });
  248. return result;
  249. }
  250. /**
  251. * @brief 客户端向服务端发送消息
  252. *
  253. * @param client
  254. * @param topic
  255. * @param payload
  256. * @param payloadLen
  257. * @param QOS
  258. * @param retain
  259. * @return RyanMqttError_e
  260. */
  261. RyanMqttError_e RyanMqttPublish(RyanMqttClient_t *client, char *topic, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
  262. {
  263. RyanMqttError_e result = RyanMqttSuccessError;
  264. int32_t packetLen = 0;
  265. int32_t packetId = 0;
  266. MQTTString topicName = MQTTString_initializer;
  267. RyanMqttMsgHandler_t *msgHandler = NULL;
  268. RyanMqttAckHandler_t *userAckHandler = NULL;
  269. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  270. RyanMqttCheck(NULL != topic, RyanMqttParamInvalidError, rlog_d);
  271. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
  272. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  273. RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
  274. RyanMqttCheck(RyanMqttConnectState == RyanMqttGetClientState(client), RyanMqttNotConnectError, rlog_d);
  275. if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
  276. return RyanMqttParamInvalidError;
  277. topicName.lenstring.data = topic;
  278. topicName.lenstring.len = strlen(topic);
  279. if (RyanMqttQos0 == qos)
  280. {
  281. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  282. packetLen = MQTTSerialize_publish((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, qos, retain, packetId,
  283. topicName, (uint8_t *)payload, payloadLen);
  284. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d,
  285. { platformMutexUnLock(client->config.userData, &client->sendBufLock); });
  286. result = RyanMqttSendPacket(client, client->config.sendBuffer, packetLen);
  287. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  288. return result;
  289. }
  290. // qos1 / qos2需要收到预期响应ack,否则数据将被重新发送
  291. result = RyanMqttMsgHandlerCreate(client, topicName.lenstring.data, topicName.lenstring.len, qos, &msgHandler);
  292. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  293. platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
  294. packetId = RyanMqttGetNextPacketId(client);
  295. packetLen = MQTTSerialize_publish((uint8_t *)client->config.sendBuffer, client->config.sendBufferSize, 0, qos, retain, packetId,
  296. topicName, (uint8_t *)payload, payloadLen);
  297. RyanMqttCheckCode(packetLen > 0, RyanMqttSerializePacketError, rlog_d, {
  298. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  299. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  300. });
  301. result = RyanMqttAckHandlerCreate(client, (RyanMqttQos1 == qos) ? PUBACK : PUBREC, packetId, packetLen, client->config.sendBuffer, msgHandler, &userAckHandler);
  302. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  303. RyanMqttMsgHandlerDestory(client->config.userData, msgHandler);
  304. platformMutexUnLock(client->config.userData, &client->sendBufLock);
  305. });
  306. platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
  307. RyanMqttAckListAddToUserAckList(client, userAckHandler);
  308. result = RyanMqttSendPacket(client, userAckHandler->packet, userAckHandler->packetLen);
  309. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, {
  310. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  311. RyanMqttAckHandlerDestroy(client, userAckHandler);
  312. });
  313. return RyanMqttSuccessError;
  314. }
  315. /**
  316. * @brief 获取mqtt客户端状态
  317. *
  318. * @param client
  319. * @return RyanMqttState_e
  320. */
  321. RyanMqttState_e RyanMqttGetState(RyanMqttClient_t *client)
  322. {
  323. if (NULL == client)
  324. return RyanMqttInvalidState;
  325. return RyanMqttGetClientState(client);
  326. }
  327. /**
  328. * @brief 获取已订阅主题
  329. *
  330. * @param client
  331. * @param msgHandles 存放已订阅主题的空间
  332. * @param msgHandleSize 存放已订阅主题的空间大小个数
  333. * @param subscribeNum 函数内部返回已订阅主题的个数
  334. * @return RyanMqttState_e
  335. */
  336. RyanMqttError_e RyanMqttGetSubscribe(RyanMqttClient_t *client, RyanMqttMsgHandler_t *msgHandles, int32_t msgHandleSize, int32_t *subscribeNum)
  337. {
  338. RyanMqttError_e result = RyanMqttSuccessError;
  339. RyanList_t *curr = NULL,
  340. *next = NULL;
  341. RyanMqttMsgHandler_t *msgHandler = NULL;
  342. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  343. RyanMqttCheck(NULL != msgHandles, RyanMqttParamInvalidError, rlog_d);
  344. RyanMqttCheck(1 <= msgHandleSize, RyanMqttParamInvalidError, rlog_d);
  345. *subscribeNum = 0;
  346. platformMutexLock(client->config.userData, &client->msgHandleLock);
  347. RyanListForEachSafe(curr, next, &client->msgHandlerList)
  348. {
  349. msgHandler = RyanListEntry(curr, RyanMqttMsgHandler_t, list);
  350. msgHandles[*subscribeNum].topic = msgHandler->topic;
  351. msgHandles[*subscribeNum].qos = msgHandler->qos;
  352. (*subscribeNum)++;
  353. if (*subscribeNum >= msgHandleSize)
  354. {
  355. result = RyanMqttNoRescourceError;
  356. goto __next;
  357. }
  358. }
  359. __next:
  360. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  361. return result;
  362. }
  363. /**
  364. * @brief 获取mqtt config
  365. * 使用完毕后,需要用户释放pclientConfig指针内容
  366. *
  367. * @param client
  368. * @param pclientConfig
  369. * @return RyanMqttError_e
  370. */
  371. /* RyanMqttError_e RyanMqttGetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t **pclientConfig)
  372. {
  373. RyanMqttError_e result = RyanMqttSuccessError;
  374. RyanMqttClientConfig_t *clientConfig = NULL;
  375. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  376. RyanMqttCheck(NULL != pclientConfig, RyanMqttParamInvalidError, rlog_d);
  377. RyanMqttCheck(NULL != client->config, RyanMqttNoRescourceError);
  378. clientConfig = (RyanMqttClientConfig_t *)platformMemoryMalloc(sizeof(RyanMqttClientConfig_t));
  379. RyanMqttCheck(NULL != clientConfig, RyanMqttNotEnoughMemError);
  380. memcpy(clientConfig, client->config, sizeof(RyanMqttClientConfig_t));
  381. result = setConfigValue(&clientConfig->clientId, client->config->clientId);
  382. RyanMqttCheck(RyanMqttSuccessError == result, result);
  383. result = setConfigValue(&clientConfig->userName, client->config->userName);
  384. RyanMqttCheck(RyanMqttSuccessError == result, result);
  385. result = setConfigValue(&clientConfig->password, client->config->password);
  386. RyanMqttCheck(RyanMqttSuccessError == result, result);
  387. result = setConfigValue(&clientConfig->host, client->config->host);
  388. RyanMqttCheck(RyanMqttSuccessError == result, result);
  389. result = setConfigValue(&clientConfig->port, client->config->port);
  390. RyanMqttCheck(RyanMqttSuccessError == result, result);
  391. result = setConfigValue(&clientConfig->taskName, client->config->taskName);
  392. RyanMqttCheck(RyanMqttSuccessError == result, result);
  393. *pclientConfig = clientConfig;
  394. return RyanMqttSuccessError;
  395. }
  396. */
  397. /**
  398. * @brief 设置mqtt config 这是很危险的操作,需要考虑mqtt thread线程和用户线程的资源互斥
  399. *
  400. * 推荐在 RyanMqttStart函数前 / 非用户手动触发的事件回调函数中 / mqtt thread处于挂起状态时调用
  401. * mqtt thread处于阻塞状态时调用此函数也是很危险的行为
  402. * 要保证mqtt线程和用户线程的资源互斥
  403. * 如果修改参数需要重新连接才生效的,这里set不会生效。比如 keepAlive
  404. *
  405. * !项目中用户不应频繁调用此函数
  406. * ! 此函数如果返回RyanMqttFailedError,需要立即停止mqtt客户端相关操作.因为操作失败此函数会调用RyanMqttDestroy()销毁客户端
  407. *
  408. * @param client
  409. * @param clientConfig
  410. * @return RyanMqttError_e
  411. */
  412. RyanMqttError_e RyanMqttSetConfig(RyanMqttClient_t *client, RyanMqttClientConfig_t *clientConfig)
  413. {
  414. RyanMqttError_e result = RyanMqttSuccessError;
  415. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  416. RyanMqttCheck(NULL != clientConfig->clientId, RyanMqttParamInvalidError, rlog_d);
  417. RyanMqttCheck(NULL != clientConfig->host, RyanMqttParamInvalidError, rlog_d);
  418. RyanMqttCheck(NULL != clientConfig->taskName, RyanMqttParamInvalidError, rlog_d);
  419. RyanMqttCheck(13 < clientConfig->recvBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->recvBufferSize, RyanMqttParamInvalidError, rlog_d);
  420. RyanMqttCheck(13 < clientConfig->sendBufferSize && (RyanMqttMaxPayloadLen + 5) >= clientConfig->sendBufferSize, RyanMqttParamInvalidError, rlog_d);
  421. result = setConfigValue(&client->config.clientId, clientConfig->clientId);
  422. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  423. if (NULL == clientConfig->userName)
  424. {
  425. client->config.userName = NULL;
  426. }
  427. else
  428. {
  429. result = setConfigValue(&client->config.userName, clientConfig->userName);
  430. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  431. }
  432. if (NULL == clientConfig->password)
  433. {
  434. client->config.password = NULL;
  435. }
  436. else
  437. {
  438. result = setConfigValue(&client->config.password, clientConfig->password);
  439. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  440. }
  441. result = setConfigValue(&client->config.host, clientConfig->host);
  442. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  443. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  444. result = setConfigValue(&client->config.taskName, clientConfig->taskName);
  445. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { goto __exit; });
  446. client->config.port = clientConfig->port;
  447. client->config.taskPrio = clientConfig->taskPrio;
  448. client->config.taskStack = clientConfig->taskStack;
  449. client->config.mqttVersion = clientConfig->mqttVersion;
  450. client->config.ackHandlerRepeatCountWarning = clientConfig->ackHandlerRepeatCountWarning;
  451. client->config.ackHandlerCountWarning = clientConfig->ackHandlerCountWarning;
  452. client->config.autoReconnectFlag = clientConfig->autoReconnectFlag;
  453. client->config.cleanSessionFlag = clientConfig->cleanSessionFlag;
  454. client->config.reconnectTimeout = clientConfig->reconnectTimeout;
  455. client->config.recvTimeout = clientConfig->recvTimeout;
  456. client->config.sendTimeout = clientConfig->sendTimeout;
  457. client->config.ackTimeout = clientConfig->ackTimeout;
  458. client->config.keepaliveTimeoutS = clientConfig->keepaliveTimeoutS;
  459. client->config.mqttEventHandle = clientConfig->mqttEventHandle;
  460. client->config.userData = clientConfig->userData;
  461. client->config.recvBufferSize = clientConfig->recvBufferSize;
  462. client->config.sendBufferSize = clientConfig->sendBufferSize;
  463. client->config.recvBuffer = clientConfig->recvBuffer;
  464. client->config.sendBuffer = clientConfig->sendBuffer;
  465. return RyanMqttSuccessError;
  466. __exit:
  467. RyanMqttDestroy(client);
  468. return RyanMqttFailedError;
  469. }
  470. /**
  471. * @brief 设置遗嘱的配置信息
  472. * 此函数必须在发送connect报文前调用,因为遗嘱消息包含在connect报文中
  473. * 例如 RyanMqttStart前 / RyanMqttEventReconnectBefore事件中
  474. *
  475. * @param client
  476. * @param topicName
  477. * @param qos
  478. * @param retain
  479. * @param payload
  480. * @return RyanMqttError_e
  481. */
  482. RyanMqttError_e RyanMqttSetLwt(RyanMqttClient_t *client, char *topicName, char *payload, uint32_t payloadLen, RyanMqttQos_e qos, RyanMqttBool_e retain)
  483. {
  484. RyanMqttError_e result = RyanMqttSuccessError;
  485. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  486. RyanMqttCheck(NULL != topicName, RyanMqttParamInvalidError, rlog_d);
  487. RyanMqttCheck(RyanMqttMaxPayloadLen >= payloadLen, RyanMqttParamInvalidError, rlog_d);
  488. RyanMqttCheck(RyanMqttQos0 <= qos && RyanMqttQos2 >= qos, RyanMqttParamInvalidError, rlog_d);
  489. RyanMqttCheck(RyanMqttTrue == retain || RyanMqttFalse == retain, RyanMqttParamInvalidError, rlog_d);
  490. if (payloadLen > 0 && NULL == payload) // 报文支持有效载荷长度为0
  491. return RyanMqttParamInvalidError;
  492. if (NULL != client->lwtOptions.topic)
  493. platformMemoryFree(client->lwtOptions.topic);
  494. if (NULL != client->lwtOptions.payload)
  495. platformMemoryFree(client->lwtOptions.payload);
  496. memset(&client->lwtOptions, 0, sizeof(lwtOptions_t));
  497. client->lwtFlag = RyanMqttTrue;
  498. client->lwtOptions.qos = qos;
  499. client->lwtOptions.retain = retain;
  500. client->lwtOptions.payloadLen = payloadLen;
  501. result = RyanMqttStringCopy(&client->lwtOptions.payload, payload, payloadLen);
  502. RyanMqttCheck(RyanMqttSuccessError == result, result, rlog_d);
  503. result = RyanMqttStringCopy(&client->lwtOptions.topic, topicName, strlen(topicName));
  504. RyanMqttCheckCode(RyanMqttSuccessError == result, result, rlog_d, { platformMemoryFree(client->lwtOptions.payload); });
  505. return RyanMqttSuccessError;
  506. }
  507. /**
  508. * @brief 丢弃指定ack
  509. *
  510. * @param client
  511. * @param packetId
  512. * @return RyanMqttError_e
  513. */
  514. RyanMqttError_e RyanMqttDiscardAckHandler(RyanMqttClient_t *client, enum msgTypes packetType, uint16_t packetId)
  515. {
  516. RyanMqttError_e result = RyanMqttSuccessError;
  517. RyanMqttAckHandler_t *ackHandler = NULL;
  518. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  519. RyanMqttCheck(CONNECT <= packetType && DISCONNECT >= packetType, RyanMqttParamInvalidError, rlog_d);
  520. RyanMqttCheck(0 < packetId, RyanMqttParamInvalidError, rlog_d);
  521. // 删除pubrel记录
  522. result = RyanMqttAckListNodeFind(client, packetType, packetId, &ackHandler);
  523. RyanMqttCheck(RyanMqttSuccessError == result, RyanMqttNoRescourceError, rlog_d);
  524. RyanMqttEventMachine(client, RyanMqttEventAckHandlerdiscard, (void *)ackHandler); // 回调函数
  525. RyanMqttAckListRemoveToAckList(client, ackHandler);
  526. RyanMqttAckHandlerDestroy(client, ackHandler);
  527. return RyanMqttSuccessError;
  528. }
  529. RyanMqttError_e RyanMqttRegisterEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  530. {
  531. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  532. client->eventFlag |= eventId;
  533. return RyanMqttSuccessError;
  534. }
  535. RyanMqttError_e RyanMqttCancelEventId(RyanMqttClient_t *client, RyanMqttEventId_e eventId)
  536. {
  537. RyanMqttCheck(NULL != client, RyanMqttParamInvalidError, rlog_d);
  538. client->eventFlag &= ~eventId;
  539. return RyanMqttSuccessError;
  540. }