platformNetwork.c 7.1 KB


  1. #define RyanMqttLogLevel (RyanMqttLogLevelAssert) // 日志打印等级
  2. // #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  3. #include "RyanMqttPlatform.h"
  4. #include "RyanMqttLog.h"
  5. /**
  6. * @brief 初始化网络接口层
  7. *
  8. * @param userData
  9. * @param platformNetwork
  10. * @return RyanMqttError_e
  11. */
  12. RyanMqttError_e platformNetworkInit(void *userData, platformNetwork_t *platformNetwork)
  13. {
  14. platformNetwork->socket = -1;
  15. return RyanMqttSuccessError;
  16. }
  17. /**
  18. * @brief 销毁网络接口层
  19. *
  20. * @param userData
  21. * @param platformNetwork
  22. * @return RyanMqttError_e
  23. */
  24. RyanMqttError_e platformNetworkDestroy(void *userData, platformNetwork_t *platformNetwork)
  25. {
  26. platformNetwork->socket = -1;
  27. return RyanMqttSuccessError;
  28. }
  29. /**
  30. * @brief 连接mqtt服务器
  31. *
  32. * @param userData
  33. * @param platformNetwork
  34. * @param host
  35. * @param port
  36. * @return RyanMqttError_e
  37. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  38. */
  39. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host,
  40. uint16_t port)
  41. {
  42. RyanMqttError_e result = RyanMqttSuccessError;
  43. char *buf = NULL;
  44. struct sockaddr_in server_addr = {
  45. .sin_family = AF_INET,
  46. .sin_port = htons(port), // 指定端口号
  47. };
  48. // 传递的是ip地址,不用进行dns解析,某些情况下调用dns解析反而会错误
  49. if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr))
  50. {
  51. // inet_pton 已经将地址赋值到 server_addr.sin_addr,无需额外处理
  52. }
  53. // 解析域名信息
  54. else
  55. {
  56. #define dnsBufferSize (384)
  57. // RyanMqttLog_d("host: %s, 需要dns解析", host);
  58. int h_errnop;
  59. struct hostent *phost;
  60. struct hostent hostinfo = {0};
  61. buf = (char *)platformMemoryMalloc(dnsBufferSize);
  62. if (NULL == buf)
  63. {
  64. result = RyanMqttNoRescourceError;
  65. goto __exit;
  66. }
  67. if (0 != gethostbyname_r(host, &hostinfo, buf, dnsBufferSize, &phost, &h_errnop))
  68. {
  69. RyanMqttLog_w(
  70. (char *)"平台可能不支持 gethostbyname_r 函数, 再次尝试使用 gethostbyname 获取域名信息");
  71. // 非线程安全版本,请根据实际情况选择使用
  72. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  73. struct hostent *phostinfo = gethostbyname(host);
  74. if (NULL == phostinfo)
  75. {
  76. result = RyanMqttNoRescourceError;
  77. goto __exit;
  78. }
  79. hostinfo = *phostinfo;
  80. }
  81. else
  82. {
  83. // 成功时也需要校验返回内容有效
  84. if (NULL == hostinfo.h_addr_list || NULL == hostinfo.h_addr_list[0])
  85. {
  86. result = RyanMqttNoRescourceError;
  87. goto __exit;
  88. }
  89. }
  90. server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
  91. }
  92. platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  93. if (platformNetwork->socket < 0)
  94. {
  95. result = RyanSocketFailedError;
  96. goto __exit;
  97. }
  98. #pragma GCC diagnostic push
  99. #pragma GCC diagnostic ignored "-Wanalyzer-fd-leak"
  100. // 绑定套接字到主机地址和端口号
  101. if (0 != connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)))
  102. {
  103. platformNetworkClose(userData, platformNetwork);
  104. result = RyanMqttSocketConnectFailError;
  105. goto __exit;
  106. }
  107. #pragma GCC diagnostic pop
  108. __exit:
  109. if (NULL != buf)
  110. {
  111. platformMemoryFree(buf);
  112. }
  113. if (RyanMqttSuccessError != result)
  114. {
  115. RyanMqttLog_e("socket连接失败: %d", result);
  116. }
  117. return result;
  118. }
  119. /**
  120. * @brief 非阻塞接收数据
  121. *
  122. * @param userData
  123. * @param platformNetwork
  124. * @param recvBuf
  125. * @param recvLen
  126. * @param timeout
  127. * @return int32_t 成功返回接收字节数,错误返回 -1
  128. */
  129. int32_t platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, size_t recvLen,
  130. int32_t timeout)
  131. {
  132. ssize_t recvResult = 0;
  133. struct timeval tv = {
  134. .tv_sec = timeout / 1000,
  135. .tv_usec = (uint32_t)((timeout % 1000) * 1000),
  136. };
  137. if (platformNetwork->socket < 0)
  138. {
  139. RyanMqttLog_e("对端关闭socket连接");
  140. return -1;
  141. }
  142. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,
  143. sizeof(struct timeval)); // 设置操作模式为非阻塞
  144. recvResult = recv(platformNetwork->socket, recvBuf, recvLen, 0);
  145. if (0 == recvResult)
  146. {
  147. RyanMqttLog_e("对端关闭socket连接");
  148. return -1;
  149. }
  150. if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  151. {
  152. int32_t rt_errno = errno; // 似乎RT 5.0.0以上版本需要使用 rt_get_errno
  153. // 下列表示没问题,但需要退出接收
  154. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  155. #if EAGAIN != EWOULDBLOCK
  156. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  157. #endif
  158. EINTR == rt_errno || // 操作被信号中断
  159. ETIME == rt_errno || // 计时器过期(部分平台)
  160. ETIMEDOUT == rt_errno) // 超时(通用)
  161. {
  162. return 0;
  163. }
  164. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  165. RyanMqttLog_e("recvResult: %d, errno: %d str: %s", recvResult, rt_errno, strerror(rt_errno));
  166. return -1;
  167. }
  168. return (int32_t)recvResult;
  169. }
  170. /**
  171. * @brief 非阻塞发送数据
  172. *
  173. * @param userData
  174. * @param platformNetwork
  175. * @param sendBuf
  176. * @param sendLen
  177. * @param timeout
  178. * @return int32_t 成功返回发送字节数,错误返回 -1
  179. */
  180. int32_t platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, size_t sendLen,
  181. int32_t timeout)
  182. {
  183. ssize_t sendResult = 0;
  184. struct timeval tv = {
  185. .tv_sec = timeout / 1000,
  186. .tv_usec = (uint32_t)((timeout % 1000) * 1000),
  187. };
  188. if (platformNetwork->socket < 0)
  189. {
  190. RyanMqttLog_e("对端关闭socket连接");
  191. return -1;
  192. }
  193. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
  194. sizeof(struct timeval)); // 设置操作模式为非阻塞
  195. sendResult = send(platformNetwork->socket, sendBuf, sendLen, 0);
  196. if (0 == sendResult)
  197. {
  198. RyanMqttLog_e("对端关闭socket连接");
  199. return -1;
  200. }
  201. if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  202. {
  203. int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
  204. // 下列表示没问题,但需要退出发送
  205. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  206. #if EAGAIN != EWOULDBLOCK
  207. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  208. #endif
  209. EINTR == rt_errno || // 操作被信号中断
  210. ETIME == rt_errno || // 计时器过期(部分平台)
  211. ETIMEDOUT == rt_errno) // 超时(通用)
  212. {
  213. return 0;
  214. }
  215. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  216. RyanMqttLog_e("sendResult: %d, errno: %d str: %s", sendResult, rt_errno, strerror(rt_errno));
  217. return -1;
  218. }
  219. return (int32_t)sendResult;
  220. }
  221. /**
  222. * @brief 断开mqtt服务器连接
  223. *
  224. * @param userData
  225. * @param platformNetwork
  226. * @return RyanMqttError_e
  227. */
  228. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  229. {
  230. if (platformNetwork->socket >= 0)
  231. {
  232. RyanMqttLog_w("platformNetworkClose socket close %d", platformNetwork->socket);
  233. close(platformNetwork->socket);
  234. platformNetwork->socket = -1;
  235. }
  236. return RyanMqttSuccessError;
  237. }