platformNetwork.c 7.4 KB


  1. #define RyanMqttLogLevel (RyanMqttLogLevelDebug) // 日志打印等级
  2. #include "RyanMqttPlatform.h"
  3. #include "RyanMqttLog.h"
  4. /**
  5. * @brief 初始化网络接口层
  6. *
  7. * @param userData
  8. * @param platformNetwork
  9. * @return RyanMqttError_e
  10. */
  11. RyanMqttError_e platformNetworkInit(void *userData, platformNetwork_t *platformNetwork)
  12. {
  13. platformNetwork->socket = -1;
  14. return RyanMqttSuccessError;
  15. }
  16. /**
  17. * @brief 销毁网络接口层
  18. *
  19. * @param userData
  20. * @param platformNetwork
  21. * @return RyanMqttError_e
  22. */
  23. RyanMqttError_e platformNetworkDestroy(void *userData, platformNetwork_t *platformNetwork)
  24. {
  25. platformNetwork->socket = -1;
  26. return RyanMqttSuccessError;
  27. }
  28. /**
  29. * @brief 连接mqtt服务器
  30. *
  31. * @param userData
  32. * @param platformNetwork
  33. * @param host
  34. * @param port
  35. * @return RyanMqttError_e
  36. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  37. */
  38. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host,
  39. uint16_t port)
  40. {
  41. RyanMqttError_e result = RyanMqttSuccessError;
  42. char *buf = NULL;
  43. struct sockaddr_in server_addr = {
  44. .sin_family = AF_INET,
  45. .sin_port = htons(port), // 指定端口号
  46. };
  47. // 传递的是ip地址,不用进行dns解析,某些情况下调用dns解析反而会错误
  48. // RT-Thread平台下lwip和netdev都是通过宏定义方式定义 inet_pton 和 inet_addr,所以这里没有问题
  49. #ifdef inet_pton
  50. if (inet_pton(server_addr.sin_family, host, &server_addr.sin_addr))
  51. {
  52. // inet_pton 已经将地址赋值到 server_addr.sin_addr,无需额外处理
  53. }
  54. #elif defined(inet_addr)
  55. if (INADDR_NONE != inet_addr(host))
  56. {
  57. // RyanMqttLog_d("host: %s, 不用dns解析", host);
  58. server_addr.sin_addr.s_addr = inet_addr(host);
  59. }
  60. #else
  61. #error "RyanMqtt: 平台不支持inet_pton或inet_addr函数,请检查网络接口层实现"
  62. #endif
  63. // 解析域名信息
  64. else
  65. {
  66. #define dnsBufferSize (384)
  67. // RyanMqttLog_d("host: %s, 需要dns解析", host);
  68. int h_errnop;
  69. struct hostent *phost;
  70. struct hostent hostinfo = {0};
  71. buf = (char *)platformMemoryMalloc(dnsBufferSize);
  72. if (NULL == buf)
  73. {
  74. result = RyanMqttNoRescourceError;
  75. goto __exit;
  76. }
  77. if (0 != gethostbyname_r(host, &hostinfo, buf, dnsBufferSize, &phost, &h_errnop))
  78. {
  79. RyanMqttLog_w("平台可能不支持 gethostbyname_r 函数, 再次尝试使用 gethostbyname 获取域名信息");
  80. // 非线程安全版本,请根据实际情况选择使用
  81. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  82. struct hostent *phostinfo = gethostbyname(host);
  83. if (NULL == phostinfo)
  84. {
  85. result = RyanMqttNoRescourceError;
  86. goto __exit;
  87. }
  88. hostinfo = *phostinfo;
  89. }
  90. else
  91. {
  92. // 成功时也需要校验返回内容有效
  93. if (NULL == hostinfo.h_addr_list || NULL == hostinfo.h_addr_list[0])
  94. {
  95. result = RyanMqttNoRescourceError;
  96. goto __exit;
  97. }
  98. }
  99. server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
  100. }
  101. platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  102. if (platformNetwork->socket < 0)
  103. {
  104. result = RyanSocketFailedError;
  105. goto __exit;
  106. }
  107. // 绑定套接字到主机地址和端口号
  108. if (0 != connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)))
  109. {
  110. platformNetworkClose(userData, platformNetwork);
  111. result = RyanMqttSocketConnectFailError;
  112. goto __exit;
  113. }
  114. __exit:
  115. if (NULL != buf)
  116. {
  117. platformMemoryFree(buf);
  118. }
  119. if (RyanMqttSuccessError != result)
  120. {
  121. RyanMqttLog_e("socket连接失败: %d", result);
  122. }
  123. return result;
  124. }
  125. /**
  126. * @brief 非阻塞接收数据
  127. *
  128. * @param userData
  129. * @param platformNetwork
  130. * @param recvBuf
  131. * @param recvLen
  132. * @param timeout
  133. * @return int32_t 成功返回接收字节数,错误返回 -1
  134. */
  135. int32_t platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, size_t recvLen,
  136. int32_t timeout)
  137. {
  138. ssize_t recvResult = 0;
  139. struct timeval tv = {
  140. .tv_sec = timeout / 1000,
  141. .tv_usec = (uint32_t)((timeout % 1000) * 1000),
  142. };
  143. if (platformNetwork->socket < 0)
  144. {
  145. RyanMqttLog_e("对端关闭socket连接");
  146. return -1;
  147. }
  148. // 设置操作模式为非阻塞
  149. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
  150. recvResult = recv(platformNetwork->socket, recvBuf, recvLen, 0);
  151. if (0 == recvResult)
  152. {
  153. RyanMqttLog_e("对端关闭socket连接");
  154. return -1;
  155. }
  156. if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  157. {
  158. int32_t rt_errno = errno; // 似乎RT 5.0.0以上版本需要使用 rt_get_errno
  159. if (0 == rt_errno)
  160. {
  161. rt_errno = rt_get_errno();
  162. }
  163. // 下列表示没问题,但需要退出接收
  164. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  165. #if EAGAIN != EWOULDBLOCK
  166. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  167. #endif
  168. EINTR == rt_errno || // 操作被信号中断
  169. ETIME == rt_errno || // 计时器过期(部分平台)
  170. ETIMEDOUT == rt_errno) // 超时(通用)
  171. {
  172. return 0;
  173. }
  174. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  175. RyanMqttLog_e("recvResult: %d, errno: %d str: %s", recvResult, rt_errno, strerror(rt_errno));
  176. return -1;
  177. }
  178. return (int32_t)recvResult;
  179. }
  180. /**
  181. * @brief 非阻塞发送数据
  182. *
  183. * @param userData
  184. * @param platformNetwork
  185. * @param sendBuf
  186. * @param sendLen
  187. * @param timeout
  188. * @return int32_t 成功返回发送字节数,错误返回 -1
  189. */
  190. int32_t platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, size_t sendLen,
  191. int32_t timeout)
  192. {
  193. ssize_t sendResult = 0;
  194. struct timeval tv = {
  195. .tv_sec = timeout / 1000,
  196. .tv_usec = (uint32_t)((timeout % 1000) * 1000),
  197. };
  198. if (platformNetwork->socket < 0)
  199. {
  200. RyanMqttLog_e("对端关闭socket连接");
  201. return -1;
  202. }
  203. // 设置操作模式为非阻塞
  204. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval));
  205. sendResult = send(platformNetwork->socket, sendBuf, sendLen, 0);
  206. if (0 == sendResult)
  207. {
  208. RyanMqttLog_e("对端关闭socket连接");
  209. return -1;
  210. }
  211. if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  212. {
  213. int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
  214. if (0 == rt_errno)
  215. {
  216. rt_errno = rt_get_errno();
  217. }
  218. // 下列表示没问题,但需要退出发送
  219. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  220. #if EAGAIN != EWOULDBLOCK
  221. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  222. #endif
  223. EINTR == rt_errno || // 操作被信号中断
  224. ETIME == rt_errno || // 计时器过期(部分平台)
  225. ETIMEDOUT == rt_errno) // 超时(通用)
  226. {
  227. return 0;
  228. }
  229. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  230. RyanMqttLog_e("sendResult: %d, errno: %d str: %s", sendResult, rt_errno, strerror(rt_errno));
  231. return -1;
  232. }
  233. return (int32_t)sendResult;
  234. }
  235. /**
  236. * @brief 断开mqtt服务器连接
  237. *
  238. * @param userData
  239. * @param platformNetwork
  240. * @return RyanMqttError_e
  241. */
  242. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  243. {
  244. if (platformNetwork->socket >= 0)
  245. {
  246. RyanMqttLog_w("platformNetworkClose socket close %d", platformNetwork->socket);
  247. closesocket(platformNetwork->socket);
  248. platformNetwork->socket = -1;
  249. }
  250. return RyanMqttSuccessError;
  251. }