platformNetwork.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. // #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  4. #define rlogTag "RyanMqttNet" // 日志tag
  5. #include "platformNetwork.h"
  6. #include "RyanMqttLog.h"
  7. /**
  8. * @brief 连接mqtt服务器
  9. *
  10. * @param userData
  11. * @param platformNetwork
  12. * @param host
  13. * @param port
  14. * @return RyanMqttError_e
  15. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  16. */
  17. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, uint16_t port)
  18. {
  19. RyanMqttError_e result = RyanMqttSuccessError;
  20. // ?线程安全版本,有些设备没有实现,默认不启用。如果涉及多个客户端解析域名请使用线程安全版本
  21. // char buf[256];
  22. // int ret;
  23. // struct hostent hostinfo, *phost;
  24. // if (0 != gethostbyname_r(host, &hostinfo, buf, sizeof(buf), &phost, &ret))
  25. // {
  26. // result = RyanSocketFailedError;
  27. // goto exit;
  28. // }
  29. // platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  30. // if (platformNetwork->socket < 0)
  31. // {
  32. // result = RyanSocketFailedError;
  33. // goto exit;
  34. // }
  35. // struct sockaddr_in server_addr;
  36. // memset(&server_addr, 0, sizeof(server_addr));
  37. // server_addr.sin_family = AF_INET;
  38. // server_addr.sin_port = htons(port); // 指定端口号,这里使用HTTP默认端口80
  39. // server_addr.sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]);
  40. // // 绑定套接字到主机地址和端口号
  41. // if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  42. // {
  43. // platformNetworkClose(userData, platformNetwork);
  44. // result = RyanMqttSocketConnectFailError;
  45. // goto exit;
  46. // }
  47. // 非线程安全版本,请根据实际情况选择使用
  48. struct hostent *hostinfo;
  49. hostinfo = gethostbyname(host);
  50. if (NULL == hostinfo)
  51. {
  52. result = RyanSocketFailedError;
  53. goto exit;
  54. }
  55. platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  56. if (platformNetwork->socket < 0)
  57. {
  58. result = RyanSocketFailedError;
  59. goto exit;
  60. }
  61. struct sockaddr_in server_addr;
  62. memset(&server_addr, 0, sizeof(server_addr));
  63. server_addr.sin_family = AF_INET;
  64. server_addr.sin_port = htons(port); // 指定端口号,这里使用HTTP默认端口80
  65. server_addr.sin_addr = *((struct in_addr *)hostinfo->h_addr_list[0]);
  66. // 绑定套接字到主机地址和端口号
  67. if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  68. {
  69. platformNetworkClose(userData, platformNetwork);
  70. result = RyanMqttSocketConnectFailError;
  71. goto exit;
  72. }
  73. exit:
  74. return result;
  75. }
  76. /**
  77. * @brief 非阻塞接收数据
  78. *
  79. * @param userData
  80. * @param platformNetwork
  81. * @param recvBuf
  82. * @param recvLen
  83. * @param timeout
  84. * @return RyanMqttError_e
  85. * socket错误返回 RyanSocketFailedError
  86. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  87. * 接收成功 RyanMqttSuccessError
  88. */
  89. RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
  90. {
  91. int32_t recvResult = 0;
  92. int32_t offset = 0;
  93. int32_t timeOut2 = timeout;
  94. struct timeval tv = {0};
  95. platformTimer_t timer = {0};
  96. if (-1 == platformNetwork->socket)
  97. return RyanSocketFailedError;
  98. platformTimerCutdown(&timer, timeout);
  99. while ((offset < recvLen) && (0 != timeOut2))
  100. {
  101. tv.tv_sec = timeOut2 / 1000;
  102. tv.tv_usec = timeOut2 % 1000 * 1000;
  103. if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
  104. {
  105. tv.tv_sec = 0;
  106. tv.tv_usec = 100;
  107. }
  108. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  109. recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
  110. if (0 == recvResult)
  111. {
  112. rlog_d("对端关闭socket连接");
  113. return RyanSocketFailedError;
  114. }
  115. else if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  116. {
  117. int32_t rt_errno = rt_get_errno();
  118. // 下列3种表示没问题,但需要退出接收
  119. if (rt_errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  120. rt_errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  121. rt_errno == EINTR) // 操作被信号中断
  122. {
  123. rlog_w("recvResult2: %d, errno: %d", recvResult, rt_errno);
  124. rlog_w("recvLen2: %d, timeout: %d", recvLen, timeout);
  125. break;
  126. }
  127. rlog_w("recvResult: %d, errno: %d", recvResult, rt_errno);
  128. rlog_w("recvLen: %d, timeout: %d", recvLen, timeout);
  129. return RyanSocketFailedError;
  130. }
  131. offset += recvResult;
  132. timeOut2 = platformTimerRemain(&timer);
  133. }
  134. if (offset != recvLen)
  135. return RyanMqttRecvPacketTimeOutError;
  136. return RyanMqttSuccessError;
  137. // int32_t recvResult = 0;
  138. // int32_t offset = 0;
  139. // int32_t timeOut2 = timeout;
  140. // struct timeval tv = {0};
  141. // platformTimer_t timer = {0};
  142. // if (-1 == platformNetwork->socket)
  143. // return RyanSocketFailedError;
  144. // platformTimerCutdown(&timer, timeout);
  145. // while ((offset < recvLen) && (0 != timeOut2))
  146. // {
  147. // tv.tv_sec = timeOut2 / 1000;
  148. // tv.tv_usec = timeOut2 % 1000 * 1000;
  149. // if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
  150. // {
  151. // tv.tv_sec = 0;
  152. // tv.tv_usec = 100;
  153. // }
  154. // fd_set readset;
  155. // fd_set exceptset;
  156. // int i, maxfdp1;
  157. // /* 清空可读事件描述符列表 */
  158. // FD_ZERO(&readset);
  159. // FD_ZERO(&exceptset);
  160. // FD_SET(platformNetwork->socket, &readset); // 监听可读事件
  161. // FD_SET(platformNetwork->socket, &exceptset); // 监听异常事件
  162. // /* 等待设定的网络描述符有事件发生 */
  163. // i = select(platformNetwork->socket + 1, &readset, RT_NULL, &exceptset, &tv);
  164. // if (i < 0)
  165. // {
  166. // int32_t rt_errno = rt_get_errno();
  167. // // 下列3种表示没问题,但需要退出接收
  168. // if (rt_errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  169. // rt_errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  170. // rt_errno == EINTR) // 操作被信号中断
  171. // break;
  172. // return RyanSocketFailedError;
  173. // }
  174. // /* 查看 sock 描述符上有没有发生可读事件 */
  175. // else if (i > 0)
  176. // {
  177. // if (FD_ISSET(platformNetwork->socket, &readset))
  178. // {
  179. // recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
  180. // if (recvResult <= 0) // 小于零,表示错误,个别错误不代表socket错误
  181. // {
  182. // int32_t rt_errno = rt_get_errno();
  183. // // 下列3种表示没问题,但需要退出接收
  184. // if (rt_errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  185. // rt_errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  186. // rt_errno == EINTR) // 操作被信号中断
  187. // break;
  188. // return RyanSocketFailedError;
  189. // }
  190. // offset += recvResult;
  191. // }
  192. // if (FD_ISSET(platformNetwork->socket, &exceptset))
  193. // {
  194. // return RyanSocketFailedError;
  195. // }
  196. // }
  197. // timeOut2 = platformTimerRemain(&timer);
  198. // }
  199. // if (offset != recvLen)
  200. // return RyanMqttRecvPacketTimeOutError;
  201. // return RyanMqttSuccessError;
  202. }
  203. /**
  204. * @brief 非阻塞发送数据
  205. *
  206. * @param userData
  207. * @param platformNetwork
  208. * @param sendBuf
  209. * @param sendLen
  210. * @param timeout
  211. * @return RyanMqttError_e
  212. * socket错误返回 RyanSocketFailedError
  213. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  214. * 接收成功 RyanMqttSuccessError
  215. */
  216. RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
  217. {
  218. int32_t sendResult = 0;
  219. int32_t offset = 0;
  220. int32_t timeOut2 = timeout;
  221. struct timeval tv = {0};
  222. platformTimer_t timer = {0};
  223. if (-1 == platformNetwork->socket)
  224. return RyanSocketFailedError;
  225. platformTimerCutdown(&timer, timeout);
  226. while ((offset < sendLen) && (0 != timeOut2))
  227. {
  228. tv.tv_sec = timeOut2 / 1000;
  229. tv.tv_usec = timeOut2 % 1000 * 1000;
  230. if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
  231. {
  232. tv.tv_sec = 0;
  233. tv.tv_usec = 100;
  234. }
  235. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  236. sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0);
  237. if (0 == sendResult)
  238. {
  239. rlog_d("对端关闭socket连接");
  240. return RyanSocketFailedError;
  241. }
  242. else if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  243. {
  244. int32_t rt_errno = rt_get_errno();
  245. rlog_d("sendResult: %d, errno: %d", sendResult, rt_errno);
  246. rlog_d("sendLen: %d, timeout: %d", sendLen, timeout);
  247. // 下列3种表示没问题,但需要退出发送
  248. if (rt_errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  249. rt_errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  250. rt_errno == EINTR) // 操作被信号中断
  251. {
  252. break;
  253. }
  254. return RyanSocketFailedError;
  255. }
  256. offset += sendResult;
  257. timeOut2 = platformTimerRemain(&timer);
  258. }
  259. if (offset != sendLen)
  260. return RyanMqttSendPacketTimeOutError;
  261. return RyanMqttSuccessError;
  262. }
  263. /**
  264. * @brief 断开mqtt服务器连接
  265. *
  266. * @param userData
  267. * @param platformNetwork
  268. * @return RyanMqttError_e
  269. */
  270. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  271. {
  272. if (platformNetwork->socket >= 0)
  273. {
  274. closesocket(platformNetwork->socket);
  275. platformNetwork->socket = -1;
  276. }
  277. return RyanMqttSuccessError;
  278. }