platformNetwork.c 11 KB


  1. // #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlWarning) // 日志打印等级
  4. #define rlogTag "RyanMqttNet" // 日志tag
  5. #include "platformNetwork.h"
  6. #include "RyanMqttLog.h"
  7. /**
  8. * @brief 初始化网络接口层
  9. *
  10. * @param userData
  11. * @param platformNetwork
  12. * @return RyanMqttError_e
  13. */
  14. RyanMqttError_e platformNetworkInit(void *userData, platformNetwork_t *platformNetwork)
  15. {
  16. platformNetwork->socket = -1;
  17. return RyanMqttSuccessError;
  18. }
  19. /**
  20. * @brief 销毁网络接口层
  21. *
  22. * @param userData
  23. * @param platformNetwork
  24. * @return RyanMqttError_e
  25. */
  26. RyanMqttError_e platformNetworkDestroy(void *userData, platformNetwork_t *platformNetwork)
  27. {
  28. platformNetwork->socket = -1;
  29. return RyanMqttSuccessError;
  30. }
  31. /**
  32. * @brief 连接mqtt服务器
  33. *
  34. * @param userData
  35. * @param platformNetwork
  36. * @param host
  37. * @param port
  38. * @return RyanMqttError_e
  39. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  40. */
  41. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, uint16_t port)
  42. {
  43. RyanMqttError_e result = RyanMqttSuccessError;
  44. // ?线程安全版本,有些设备没有实现,默认不启用。如果涉及多个客户端解析域名请使用线程安全版本
  45. char buf[256];
  46. int ret;
  47. struct hostent hostinfo, *phost;
  48. if (0 != gethostbyname_r(host, &hostinfo, buf, sizeof(buf), &phost, &ret))
  49. {
  50. result = RyanSocketFailedError;
  51. goto exit;
  52. }
  53. platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  54. if (platformNetwork->socket < 0)
  55. {
  56. result = RyanSocketFailedError;
  57. goto exit;
  58. }
  59. struct sockaddr_in server_addr;
  60. memset(&server_addr, 0, sizeof(server_addr));
  61. server_addr.sin_family = AF_INET;
  62. server_addr.sin_port = htons(port); // 指定端口号,这里使用HTTP默认端口80
  63. server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
  64. // 绑定套接字到主机地址和端口号
  65. if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  66. {
  67. platformNetworkClose(userData, platformNetwork);
  68. result = RyanMqttSocketConnectFailError;
  69. goto exit;
  70. }
  71. // 非线程安全版本,请根据实际情况选择使用
  72. // struct hostent *hostinfo;
  73. // hostinfo = gethostbyname(host);
  74. // if (NULL == hostinfo)
  75. // {
  76. // result = RyanSocketFailedError;
  77. // goto exit;
  78. // }
  79. // platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  80. // if (platformNetwork->socket < 0)
  81. // {
  82. // result = RyanSocketFailedError;
  83. // goto exit;
  84. // }
  85. // struct sockaddr_in server_addr;
  86. // memset(&server_addr, 0, sizeof(server_addr));
  87. // server_addr.sin_family = AF_INET;
  88. // server_addr.sin_port = htons(port); // 指定端口号,这里使用HTTP默认端口80
  89. // server_addr.sin_addr = *((struct in_addr *)hostinfo->h_addr_list[0]);
  90. // // 绑定套接字到主机地址和端口号
  91. // if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  92. // {
  93. // platformNetworkClose(userData, platformNetwork);
  94. // result = RyanMqttSocketConnectFailError;
  95. // goto exit;
  96. // }
  97. exit:
  98. return result;
  99. }
  100. /**
  101. * @brief 非阻塞接收数据
  102. *
  103. * @param userData
  104. * @param platformNetwork
  105. * @param recvBuf
  106. * @param recvLen
  107. * @param timeout
  108. * @return RyanMqttError_e
  109. * socket错误返回 RyanSocketFailedError
  110. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  111. * 接收成功 RyanMqttSuccessError
  112. */
  113. RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
  114. {
  115. // int32_t recvResult = 0;
  116. // int32_t offset = 0;
  117. // int32_t timeOut2 = timeout;
  118. // struct timeval tv = {0};
  119. // platformTimer_t timer = {0};
  120. // if (-1 == platformNetwork->socket)
  121. // return RyanSocketFailedError;
  122. // platformTimerCutdown(&timer, timeout);
  123. // while ((offset < recvLen) && (0 != timeOut2))
  124. // {
  125. // tv.tv_sec = timeOut2 / 1000;
  126. // tv.tv_usec = timeOut2 % 1000 * 1000;
  127. // if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
  128. // {
  129. // tv.tv_sec = 0;
  130. // tv.tv_usec = 100;
  131. // }
  132. // setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  133. // recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
  134. // if (0 == recvResult)
  135. // {
  136. // rlog_d("对端关闭socket连接");
  137. // return RyanSocketFailedError;
  138. // }
  139. // else if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  140. // {
  141. // int32_t err = errno;
  142. // // 下列3种表示没问题,但需要推出发送
  143. // if (err == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  144. // err == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  145. // err == EINTR) // 操作被信号中断
  146. // break;
  147. // return RyanSocketFailedError;
  148. // }
  149. // offset += recvResult;
  150. // timeOut2 = platformTimerRemain(&timer);
  151. // }
  152. // if (offset != recvLen)
  153. // return RyanMqttRecvPacketTimeOutError;
  154. // return RyanMqttSuccessError;
  155. int32_t recvResult = 0;
  156. int32_t offset = 0;
  157. int32_t timeOut2 = timeout;
  158. struct timeval tv = {0};
  159. platformTimer_t timer = {0};
  160. if (-1 == platformNetwork->socket)
  161. return RyanSocketFailedError;
  162. platformTimerCutdown(&timer, timeout);
  163. while ((offset < recvLen) && (0 != timeOut2))
  164. {
  165. tv.tv_sec = timeOut2 / 1000;
  166. tv.tv_usec = timeOut2 % 1000 * 1000;
  167. if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
  168. {
  169. tv.tv_sec = 0;
  170. tv.tv_usec = 100;
  171. }
  172. fd_set readset;
  173. fd_set exceptset;
  174. int i;
  175. /* 清空可读事件描述符列表 */
  176. FD_ZERO(&readset);
  177. FD_ZERO(&exceptset);
  178. FD_SET(platformNetwork->socket, &readset); // 监听可读事件
  179. FD_SET(platformNetwork->socket, &exceptset); // 监听异常事件
  180. /* 等待设定的网络描述符有事件发生 */
  181. i = select(platformNetwork->socket + 1, &readset, NULL, &exceptset, &tv);
  182. if (i < 0)
  183. {
  184. int32_t err = errno;
  185. // 下列3种表示没问题,但需要退出接收
  186. if (err == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  187. err == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  188. err == EINTR) // 操作被信号中断
  189. break;
  190. return RyanSocketFailedError;
  191. }
  192. /* 查看 sock 描述符上有没有发生可读事件 */
  193. else if (i > 0)
  194. {
  195. if (FD_ISSET(platformNetwork->socket, &readset))
  196. {
  197. recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
  198. if (recvResult <= 0) // 小于零,表示错误,个别错误不代表socket错误
  199. {
  200. int32_t err = errno;
  201. // 下列3种表示没问题,但需要退出接收
  202. if (err == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  203. err == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  204. err == EINTR) // 操作被信号中断
  205. break;
  206. return RyanSocketFailedError;
  207. }
  208. offset += recvResult;
  209. }
  210. if (FD_ISSET(platformNetwork->socket, &exceptset))
  211. {
  212. return RyanSocketFailedError;
  213. }
  214. }
  215. timeOut2 = platformTimerRemain(&timer);
  216. }
  217. if (offset != recvLen)
  218. return RyanMqttRecvPacketTimeOutError;
  219. return RyanMqttSuccessError;
  220. }
  221. /**
  222. * @brief 非阻塞发送数据
  223. *
  224. * @param userData
  225. * @param platformNetwork
  226. * @param sendBuf
  227. * @param sendLen
  228. * @param timeout
  229. * @return RyanMqttError_e
  230. * socket错误返回 RyanSocketFailedError
  231. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  232. * 接收成功 RyanMqttSuccessError
  233. */
  234. RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
  235. {
  236. int32_t sendResult = 0;
  237. int32_t offset = 0;
  238. int32_t timeOut2 = timeout;
  239. struct timeval tv = {0};
  240. platformTimer_t timer = {0};
  241. if (-1 == platformNetwork->socket)
  242. return RyanSocketFailedError;
  243. platformTimerCutdown(&timer, timeout);
  244. while ((offset < sendLen) && (0 != timeOut2))
  245. {
  246. tv.tv_sec = timeOut2 / 1000;
  247. tv.tv_usec = timeOut2 % 1000 * 1000;
  248. if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
  249. {
  250. tv.tv_sec = 0;
  251. tv.tv_usec = 100;
  252. }
  253. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  254. sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0);
  255. if (0 == sendResult)
  256. {
  257. rlog_d("对端关闭socket连接");
  258. return RyanSocketFailedError;
  259. }
  260. else if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  261. {
  262. int32_t err = errno;
  263. // 下列3种表示没问题,但需要退出发送
  264. if (err == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  265. err == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  266. err == EINTR) // 操作被信号中断
  267. break;
  268. return RyanSocketFailedError;
  269. }
  270. offset += sendResult;
  271. timeOut2 = platformTimerRemain(&timer);
  272. }
  273. if (offset != sendLen)
  274. return RyanMqttSendPacketTimeOutError;
  275. return RyanMqttSuccessError;
  276. }
  277. /**
  278. * @brief 断开mqtt服务器连接
  279. *
  280. * @param userData
  281. * @param platformNetwork
  282. * @return RyanMqttError_e
  283. */
  284. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  285. {
  286. if (platformNetwork->socket >= 0)
  287. {
  288. close(platformNetwork->socket);
  289. platformNetwork->socket = -1;
  290. }
  291. return RyanMqttSuccessError;
  292. }