RyanMqttUtil.c 13 KB


  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  3. #include "RyanMqttUtil.h"
  4. #include "RyanMqttLog.h"
  5. #include "RyanMqttThread.h"
  6. #ifdef RyanMqttLinuxTestEnable
  7. #include "RyanMqttTest.h"
  8. #endif
  9. /**
  10. * @brief 字符串拷贝,需要手动释放内存
  11. *
  12. * @param dest
  13. * @param src
  14. * @param strLen
  15. * @return RyanMqttError_e
  16. */
  17. RyanMqttError_e RyanMqttDupString(char **dest, const char *src, uint32_t strLen)
  18. {
  19. RyanMqttAssert(NULL != dest);
  20. RyanMqttAssert(NULL != src);
  21. RyanMqttCheck(0 != strLen, RyanMqttFailedError, RyanMqttLog_d);
  22. *dest = NULL;
  23. char *s = (char *)platformMemoryMalloc(strLen + 1);
  24. if (NULL == s)
  25. {
  26. return RyanMqttNotEnoughMemError;
  27. }
  28. RyanMqttMemcpy(s, src, strLen);
  29. s[strLen] = '\0';
  30. *dest = s;
  31. return RyanMqttSuccessError;
  32. }
  33. /**
  34. * @brief mqtt读取报文,此函数仅Mqtt线程进行调用
  35. *
  36. * @param client
  37. * @param buf
  38. * @param length
  39. * @return RyanMqttError_e
  40. */
  41. RyanMqttError_e RyanMqttRecvPacket(RyanMqttClient_t *client, uint8_t *recvBuf, uint32_t recvLen)
  42. {
  43. uint32_t offset = 0;
  44. int32_t recvResult = 0;
  45. uint32_t timeOut = client->config.recvTimeout;
  46. RyanMqttTimer_t timer;
  47. RyanMqttAssert(NULL != client);
  48. RyanMqttAssert(NULL != recvBuf);
  49. RyanMqttAssert(0 != recvLen);
  50. // 如果需要处理ack,就缩短读取超时时间,避免阻塞太久(保留用户配置的上限)
  51. if (RyanMqttTrue == client->pendingAckFlag)
  52. {
  53. if (client->config.recvTimeout > 100)
  54. {
  55. timeOut = 100;
  56. }
  57. else
  58. {
  59. timeOut = client->config.recvTimeout;
  60. }
  61. }
  62. RyanMqttTimerCutdown(&timer, timeOut);
  63. while ((offset < recvLen) && (timeOut > 0))
  64. {
  65. recvResult =
  66. platformNetworkRecvAsync(client->config.userData, &client->network, (char *)(recvBuf + offset),
  67. (size_t)(recvLen - offset), (int32_t)timeOut);
  68. if (recvResult < 0)
  69. {
  70. break;
  71. }
  72. offset += recvResult;
  73. timeOut = RyanMqttTimerRemain(&timer);
  74. }
  75. // RyanMqttLog_d("offset: %d, recvLen: %d, recvResult: %d", offset, recvLen, recvResult);
  76. // 错误
  77. if (recvResult < 0)
  78. {
  79. RyanMqttConnectStatus_e connectState = RyanMqttConnectNetWorkFail;
  80. RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
  81. RyanMqttLog_d("recv错误, result: %d", recvResult);
  82. return RyanSocketFailedError;
  83. }
  84. // 读取超时
  85. if (offset != recvLen)
  86. {
  87. return RyanMqttRecvPacketTimeOutError;
  88. }
  89. #ifdef RyanMqttLinuxTestEnable
  90. RyanMqttTestEnableCritical();
  91. if (RyanMqttTrue == isEnableRandomNetworkFault)
  92. {
  93. randomCount++;
  94. if (randomCount >= RyanRand(10, 100))
  95. {
  96. randomCount = 0;
  97. RyanMqttTestExitCritical();
  98. // printf("模拟接收超时\r\n");
  99. return RyanMqttRecvPacketTimeOutError;
  100. }
  101. }
  102. RyanMqttTestExitCritical();
  103. #endif
  104. return RyanMqttSuccessError;
  105. }
  106. /**
  107. * @brief mqtt发送报文
  108. *
  109. * @param client
  110. * @param buf
  111. * @param length
  112. * @return RyanMqttError_e
  113. */
  114. RyanMqttError_e RyanMqttSendPacket(RyanMqttClient_t *client, uint8_t *sendBuf, uint32_t sendLen)
  115. {
  116. uint32_t offset = 0;
  117. int32_t sendResult = 0;
  118. uint32_t timeOut = client->config.sendTimeout;
  119. RyanMqttTimer_t timer;
  120. RyanMqttAssert(NULL != client);
  121. RyanMqttAssert(NULL != sendBuf);
  122. RyanMqttAssert(0 != sendLen);
  123. #ifdef RyanMqttLinuxTestEnable
  124. RyanMqttTestEnableCritical();
  125. if (RyanMqttTrue == isEnableRandomNetworkFault)
  126. {
  127. sendRandomCount++;
  128. if (sendRandomCount >= RyanRand(1, 10))
  129. {
  130. sendRandomCount = 0;
  131. RyanMqttTestExitCritical();
  132. // printf("模拟发送超时\r\n");
  133. return RyanMqttSendPacketTimeOutError;
  134. }
  135. }
  136. RyanMqttTestExitCritical();
  137. #endif
  138. RyanMqttTimerCutdown(&timer, timeOut);
  139. platformMutexLock(client->config.userData, &client->sendLock); // 获取互斥锁
  140. while ((offset < sendLen) && (timeOut > 0))
  141. {
  142. sendResult =
  143. platformNetworkSendAsync(client->config.userData, &client->network, (char *)(sendBuf + offset),
  144. (size_t)(sendLen - offset), (int32_t)timeOut);
  145. if (-1 == sendResult)
  146. {
  147. break;
  148. }
  149. offset += sendResult;
  150. timeOut = RyanMqttTimerRemain(&timer);
  151. }
  152. platformMutexUnLock(client->config.userData, &client->sendLock); // 释放互斥锁
  153. if (sendResult < 0)
  154. {
  155. RyanMqttConnectStatus_e connectState = RyanMqttConnectNetWorkFail;
  156. RyanMqttEventMachine(client, RyanMqttEventDisconnected, &connectState);
  157. return RyanSocketFailedError;
  158. }
  159. // 发送超时
  160. if (offset != sendLen)
  161. {
  162. return RyanMqttSendPacketTimeOutError;
  163. }
  164. // 发送数据成功就刷新 keepalive 时间
  165. RyanMqttRefreshKeepaliveTime(client);
  166. return RyanMqttSuccessError;
  167. }
  168. /**
  169. * @brief 设置mqtt客户端状态
  170. *
  171. * @param client
  172. * @param state
  173. */
  174. void RyanMqttSetClientState(RyanMqttClient_t *client, RyanMqttState_e state)
  175. {
  176. RyanMqttAssert(NULL != client);
  177. platformCriticalEnter(client->config.userData, &client->criticalLock);
  178. client->clientState = state;
  179. platformCriticalExit(client->config.userData, &client->criticalLock);
  180. }
  181. /**
  182. * @brief 获取mqtt客户端状态
  183. *
  184. * @param client
  185. * @return RyanMqttState_e
  186. */
  187. RyanMqttState_e RyanMqttGetClientState(RyanMqttClient_t *client)
  188. {
  189. RyanMqttAssert(NULL != client);
  190. platformCriticalEnter(client->config.userData, &client->criticalLock);
  191. RyanMqttState_e state = client->clientState;
  192. platformCriticalExit(client->config.userData, &client->criticalLock);
  193. return state;
  194. }
  195. /**
  196. * @brief 清理session
  197. *
  198. * @param client
  199. */
  200. void RyanMqttPurgeSession(RyanMqttClient_t *client)
  201. {
  202. RyanMqttList_t *curr, *next;
  203. RyanMqttAssert(NULL != client);
  204. // 释放所有msg_handler_list内存
  205. platformMutexLock(client->config.userData, &client->msgHandleLock);
  206. RyanMqttListForEachSafe(curr, next, &client->msgHandlerList)
  207. {
  208. RyanMqttMsgHandler_t *msgHandler = RyanMqttListEntry(curr, RyanMqttMsgHandler_t, list);
  209. RyanMqttMsgHandlerRemoveToMsgList(client, msgHandler);
  210. RyanMqttMsgHandlerDestroy(client, msgHandler);
  211. }
  212. RyanMqttListDelInit(&client->msgHandlerList);
  213. platformMutexUnLock(client->config.userData, &client->msgHandleLock);
  214. // 释放所有ackHandler_list内存
  215. platformMutexLock(client->config.userData, &client->ackHandleLock);
  216. RyanMqttListForEachSafe(curr, next, &client->ackHandlerList)
  217. {
  218. RyanMqttAckHandler_t *ackHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  219. RyanMqttAckListRemoveToAckList(client, ackHandler);
  220. RyanMqttAckHandlerDestroy(client, ackHandler);
  221. }
  222. RyanMqttListDelInit(&client->ackHandlerList);
  223. client->ackHandlerCount = 0;
  224. platformMutexUnLock(client->config.userData, &client->ackHandleLock);
  225. // 释放所有userAckHandler_list内存
  226. platformMutexLock(client->config.userData, &client->userSessionLock);
  227. RyanMqttListForEachSafe(curr, next, &client->userAckHandlerList)
  228. {
  229. RyanMqttAckHandler_t *userAckHandler = RyanMqttListEntry(curr, RyanMqttAckHandler_t, list);
  230. RyanMqttAckListRemoveToUserAckList(client, userAckHandler);
  231. RyanMqttAckHandlerDestroy(client, userAckHandler);
  232. }
  233. RyanMqttListDelInit(&client->userAckHandlerList);
  234. platformMutexUnLock(client->config.userData, &client->userSessionLock);
  235. }
  236. /**
  237. * @brief 销毁mqtt客户端Config信息
  238. *
  239. * @param clientConfig
  240. */
  241. void RyanMqttPurgeConfig(RyanMqttClientConfig_t *clientConfig)
  242. {
  243. RyanMqttAssert(NULL != clientConfig);
  244. if (clientConfig->clientId)
  245. {
  246. platformMemoryFree(clientConfig->clientId);
  247. }
  248. }
  249. /**
  250. * @brief 销毁mqtt客户端资源
  251. *
  252. * @param client
  253. */
  254. void RyanMqttPurgeClient(RyanMqttClient_t *client)
  255. {
  256. RyanMqttAssert(NULL != client);
  257. // 关闭销毁网络组件
  258. platformNetworkClose(client->config.userData, &client->network);
  259. platformNetworkDestroy(client->config.userData, &client->network);
  260. // 清除config信息
  261. RyanMqttPurgeConfig(&client->config);
  262. // 清除遗嘱相关配置
  263. if (NULL != client->lwtOptions)
  264. {
  265. if (NULL != client->lwtOptions->payload)
  266. {
  267. platformMemoryFree(client->lwtOptions->payload);
  268. }
  269. if (NULL != client->lwtOptions->topic)
  270. {
  271. platformMemoryFree(client->lwtOptions->topic);
  272. }
  273. platformMemoryFree(client->lwtOptions);
  274. }
  275. // 清除session ack链表和msg链表
  276. RyanMqttPurgeSession(client);
  277. // 清除互斥锁
  278. platformMutexDestroy(client->config.userData, &client->sendLock);
  279. platformMutexDestroy(client->config.userData, &client->msgHandleLock);
  280. platformMutexDestroy(client->config.userData, &client->ackHandleLock);
  281. platformMutexDestroy(client->config.userData, &client->userSessionLock);
  282. // 清除临界区
  283. platformCriticalDestroy(client->config.userData, &client->criticalLock);
  284. }
  285. /**
  286. * @brief 初始化计时器
  287. *
  288. * @param platformTimer
  289. */
  290. void RyanMqttTimerInit(RyanMqttTimer_t *platformTimer)
  291. {
  292. platformTimer->timeOut = 0;
  293. platformTimer->time = 0;
  294. }
  295. /**
  296. * @brief 添加计数时间
  297. *
  298. * @param platformTimer
  299. * @param timeout
  300. */
  301. void RyanMqttTimerCutdown(RyanMqttTimer_t *platformTimer, uint32_t timeout)
  302. {
  303. platformTimer->time = platformUptimeMs();
  304. platformTimer->timeOut = timeout;
  305. }
  306. /**
  307. * @brief 获取设置的超时时间
  308. *
  309. * @param platformTimer
  310. */
  311. uint32_t RyanMqttTimerGetConfigTimeout(RyanMqttTimer_t *platformTimer)
  312. {
  313. return platformTimer->timeOut;
  314. }
  315. /**
  316. * @brief 计算time还有多长时间超时,考虑了32位溢出判断
  317. *
  318. * @param platformTimer
  319. * @return uint32_t 返回剩余时间,超时返回0
  320. */
  321. uint32_t RyanMqttTimerRemain(RyanMqttTimer_t *platformTimer)
  322. {
  323. uint32_t elapsed = platformUptimeMs() - platformTimer->time; // 计算内部自动绕回
  324. // 如果已过超时时间,返回 0
  325. if (elapsed >= platformTimer->timeOut)
  326. {
  327. return 0;
  328. }
  329. // 否则返回剩余时间
  330. return platformTimer->timeOut - elapsed;
  331. }
  332. /**
  333. * @brief 获取报文标识符,报文标识符不可为0
  334. * 都在sendbuf锁内调用
  335. * @param client
  336. * @return uint16_t
  337. */
  338. uint16_t RyanMqttGetNextPacketId(RyanMqttClient_t *client)
  339. {
  340. uint16_t packetId;
  341. RyanMqttAssert(NULL != client);
  342. platformCriticalEnter(client->config.userData, &client->criticalLock);
  343. if (client->packetId >= RyanMqttMaxPacketId || client->packetId < 1)
  344. {
  345. client->packetId = 1;
  346. }
  347. else
  348. {
  349. client->packetId++;
  350. }
  351. packetId = client->packetId;
  352. platformCriticalExit(client->config.userData, &client->criticalLock);
  353. return packetId;
  354. }
  355. const char *RyanMqttStrError(int32_t state)
  356. {
  357. const char *str;
  358. switch (state)
  359. {
  360. case RyanMqttRecvPacketTimeOutError: str = "读取数据超时"; break;
  361. case RyanMqttParamInvalidError: str = "无效参数"; break;
  362. case RyanSocketFailedError: str = "套接字失败"; break;
  363. case RyanMqttSendPacketError: str = "数据包发送失败"; break;
  364. case RyanMqttSerializePacketError: str = "序列化报文失败"; break;
  365. case RyanMqttDeserializePacketError: str = "反序列化报文失败"; break;
  366. case RyanMqttNoRescourceError: str = "没有资源"; break;
  367. case RyanMqttHaveRescourceError: str = "资源已存在"; break;
  368. case RyanMqttNotConnectError: str = "mqttClient没有连接"; break;
  369. case RyanMqttConnectError: str = "mqttClient已经连接"; break;
  370. case RyanMqttRecvBufToShortError: str = "接收缓冲区不足"; break;
  371. case RyanMqttSendBufToShortError: str = "发送缓冲区不足"; break;
  372. case RyanMqttSocketConnectFailError: str = "socket连接失败"; break;
  373. case RyanMqttNotEnoughMemError: str = "动态内存不足"; break;
  374. case RyanMqttFailedError: str = "mqtt失败, 详细信息请看函数内部"; break;
  375. case RyanMqttSuccessError: str = "mqtt成功, 详细信息请看函数内部"; break;
  376. case RyanMqttConnectRefusedProtocolVersion: str = "mqtt断开连接, 服务端不支持客户端请求的 MQTT 协议级别"; break;
  377. case RyanMqttConnectRefusedIdentifier: str = "mqtt断开连接, 不合格的客户端标识符"; break;
  378. case RyanMqttConnectRefusedServer: str = "mqtt断开连接, 服务端不可用"; break;
  379. case RyanMqttConnectRefusedUsernamePass: str = "mqtt断开连接, 无效的用户名或密码"; break;
  380. case RyanMqttConnectRefusedNotAuthorized: str = "mqtt断开连接, 连接已拒绝,未授权"; break;
  381. case RyanMqttConnectClientInvalid: str = "mqtt断开连接, 客户端处于无效状态"; break;
  382. case RyanMqttConnectNetWorkFail: str = "mqtt断开连接, 网络错误"; break;
  383. case RyanMqttConnectDisconnected: str = "mqtt断开连接, mqtt客户端断开连接"; break;
  384. case RyanMqttKeepaliveTimeout: str = "mqtt断开连接, 心跳超时断开连接"; break;
  385. case RyanMqttConnectUserDisconnected: str = "mqtt断开连接, 用户手动断开连接"; break;
  386. case RyanMqttConnectTimeout: str = "mqtt断开连接, connect超时断开"; break;
  387. default: str = "未知错误描述"; break;
  388. }
  389. return str;
  390. }