platformNetwork.c 8.6 KB


  1. #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  4. #define rlogTag "RyanMqttNet" // 日志tag
  5. #include "platformNetwork.h"
  6. #include "RyanMqttLog.h"
  7. /**
  8. * @brief 初始化网络接口层
  9. *
  10. * @param userData
  11. * @param platformNetwork
  12. * @return RyanMqttError_e
  13. */
  14. RyanMqttError_e platformNetworkInit(void *userData, platformNetwork_t *platformNetwork)
  15. {
  16. platformNetwork->socket = -1;
  17. return RyanMqttSuccessError;
  18. }
  19. /**
  20. * @brief 销毁网络接口层
  21. *
  22. * @param userData
  23. * @param platformNetwork
  24. * @return RyanMqttError_e
  25. */
  26. RyanMqttError_e platformNetworkDestroy(void *userData, platformNetwork_t *platformNetwork)
  27. {
  28. platformNetwork->socket = -1;
  29. return RyanMqttSuccessError;
  30. }
  31. /**
  32. * @brief 连接mqtt服务器
  33. *
  34. * @param userData
  35. * @param platformNetwork
  36. * @param host
  37. * @param port
  38. * @return RyanMqttError_e
  39. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  40. */
  41. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, uint16_t port)
  42. {
  43. RyanMqttError_e result = RyanMqttSuccessError;
  44. char *buf = NULL;
  45. struct sockaddr_in server_addr = {
  46. .sin_family = AF_INET,
  47. .sin_port = htons(port), // 指定端口号
  48. };
  49. // 传递的是ip地址,不用进行dns解析,某些情况下调用dns解析反而会错误
  50. if (INADDR_NONE != inet_addr(host))
  51. {
  52. rlog_d("host: %s, 不用dns解析", host);
  53. server_addr.sin_addr.s_addr = inet_addr(host);
  54. }
  55. // 解析域名信息
  56. else
  57. {
  58. rlog_d("host: %s, 需要dns解析", host);
  59. int h_errnop;
  60. struct hostent *phost;
  61. struct hostent hostinfo = {0};
  62. buf = (char *)platformMemoryMalloc(384);
  63. if (NULL == buf)
  64. {
  65. result = RyanMqttNoRescourceError;
  66. goto __exit;
  67. }
  68. if (0 != gethostbyname_r(host, &hostinfo, buf, 384, &phost, &h_errnop))
  69. {
  70. rlog_w("平台可能不支持 gethostbyname_r 函数, 再次尝试使用 gethostbyname 获取域名信息");
  71. // 非线程安全版本,请根据实际情况选择使用
  72. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  73. struct hostent *phostinfo = gethostbyname(host);
  74. if (NULL == phostinfo)
  75. {
  76. result = RyanMqttNoRescourceError;
  77. goto __exit;
  78. }
  79. hostinfo = *phostinfo;
  80. }
  81. server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
  82. }
  83. platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  84. if (platformNetwork->socket < 0)
  85. {
  86. result = RyanSocketFailedError;
  87. goto __exit;
  88. }
  89. // 绑定套接字到主机地址和端口号
  90. if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  91. {
  92. platformNetworkClose(userData, platformNetwork);
  93. result = RyanMqttSocketConnectFailError;
  94. goto __exit;
  95. }
  96. __exit:
  97. if (NULL != buf)
  98. platformMemoryFree(buf);
  99. if (RyanMqttSuccessError != result)
  100. rlog_e("socket连接失败: %d", result);
  101. return result;
  102. }
  103. /**
  104. * @brief 非阻塞接收数据
  105. *
  106. * @param userData
  107. * @param platformNetwork
  108. * @param recvBuf
  109. * @param recvLen
  110. * @param timeout
  111. * @return RyanMqttError_e
  112. * socket错误返回 RyanSocketFailedError
  113. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  114. * 接收成功 RyanMqttSuccessError
  115. */
  116. RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
  117. {
  118. int32_t recvResult = 0;
  119. int32_t offset = 0;
  120. int32_t timeOut2 = timeout;
  121. struct timeval tv = {0};
  122. platformTimer_t timer = {0};
  123. if (-1 == platformNetwork->socket)
  124. {
  125. rlog_e("对端关闭socket连接");
  126. return RyanMqttNoRescourceError;
  127. }
  128. platformTimerCutdown(&timer, timeout);
  129. while ((offset < recvLen) && (0 != timeOut2))
  130. {
  131. tv.tv_sec = timeOut2 / 1000;
  132. tv.tv_usec = timeOut2 % 1000 * 1000;
  133. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  134. recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
  135. if (0 == recvResult)
  136. {
  137. rlog_e("对端关闭socket连接");
  138. return RyanMqttNoRescourceError;
  139. }
  140. else if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  141. {
  142. int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
  143. if (0 == rt_errno)
  144. rt_errno = rt_get_errno();
  145. // 下列表示没问题,但需要退出接收
  146. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  147. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  148. EINTR == rt_errno || // 操作被信号中断
  149. ETIME == rt_errno) // 计时器过期
  150. {
  151. break;
  152. }
  153. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  154. rlog_e("recvResult: %d, errno: %d str: %s", recvResult, rt_errno, strerror(rt_errno));
  155. return RyanSocketFailedError;
  156. }
  157. offset += recvResult;
  158. timeOut2 = platformTimerRemain(&timer);
  159. }
  160. if (offset != recvLen)
  161. return RyanMqttRecvPacketTimeOutError;
  162. return RyanMqttSuccessError;
  163. }
  164. /**
  165. * @brief 非阻塞发送数据
  166. *
  167. * @param userData
  168. * @param platformNetwork
  169. * @param sendBuf
  170. * @param sendLen
  171. * @param timeout
  172. * @return RyanMqttError_e
  173. * socket错误返回 RyanSocketFailedError
  174. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  175. * 接收成功 RyanMqttSuccessError
  176. */
  177. RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
  178. {
  179. int32_t sendResult = 0;
  180. int32_t offset = 0;
  181. int32_t timeOut2 = timeout;
  182. struct timeval tv = {0};
  183. platformTimer_t timer = {0};
  184. if (-1 == platformNetwork->socket)
  185. {
  186. rlog_e("对端关闭socket连接");
  187. return RyanMqttNoRescourceError;
  188. }
  189. platformTimerCutdown(&timer, timeout);
  190. while ((offset < sendLen) && (0 != timeOut2))
  191. {
  192. tv.tv_sec = timeOut2 / 1000;
  193. tv.tv_usec = timeOut2 % 1000 * 1000;
  194. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  195. sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0);
  196. if (0 == sendResult)
  197. {
  198. rlog_e("对端关闭socket连接");
  199. return RyanMqttNoRescourceError;
  200. }
  201. else if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  202. {
  203. int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
  204. if (0 == rt_errno)
  205. rt_errno = rt_get_errno();
  206. // 下列表示没问题,但需要退出发送
  207. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  208. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  209. EINTR == rt_errno || // 操作被信号中断
  210. ETIME == rt_errno) // 计时器过期
  211. {
  212. break;
  213. }
  214. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  215. rlog_e("sendResult: %d, errno: %d str: %s", sendResult, rt_errno, strerror(rt_errno));
  216. return RyanSocketFailedError;
  217. }
  218. offset += sendResult;
  219. timeOut2 = platformTimerRemain(&timer);
  220. }
  221. if (offset != sendLen)
  222. return RyanMqttSendPacketTimeOutError;
  223. return RyanMqttSuccessError;
  224. }
  225. /**
  226. * @brief 断开mqtt服务器连接
  227. *
  228. * @param userData
  229. * @param platformNetwork
  230. * @return RyanMqttError_e
  231. */
  232. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  233. {
  234. if (platformNetwork->socket >= 0)
  235. {
  236. closesocket(platformNetwork->socket);
  237. rlog_w("platformNetworkClose socket close %d", platformNetwork->socket);
  238. platformNetwork->socket = -1;
  239. }
  240. return RyanMqttSuccessError;
  241. }