platformNetwork.c 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. #define rlogEnable // 是否使能日志
  2. #define rlogColorEnable // 是否使能日志颜色
  3. #define rlogLevel (rlogLvlDebug) // 日志打印等级
  4. #define rlogTag "RyanMqttNet" // 日志tag
  5. #include "platformNetwork.h"
  6. #include "RyanMqttLog.h"
  7. /**
  8. * @brief 初始化网络接口层
  9. *
  10. * @param userData
  11. * @param platformNetwork
  12. * @return RyanMqttError_e
  13. */
  14. RyanMqttError_e platformNetworkInit(void *userData, platformNetwork_t *platformNetwork)
  15. {
  16. platformNetwork->socket = -1;
  17. return RyanMqttSuccessError;
  18. }
  19. /**
  20. * @brief 销毁网络接口层
  21. *
  22. * @param userData
  23. * @param platformNetwork
  24. * @return RyanMqttError_e
  25. */
  26. RyanMqttError_e platformNetworkDestroy(void *userData, platformNetwork_t *platformNetwork)
  27. {
  28. platformNetwork->socket = -1;
  29. return RyanMqttSuccessError;
  30. }
  31. /**
  32. * @brief 连接mqtt服务器
  33. *
  34. * @param userData
  35. * @param platformNetwork
  36. * @param host
  37. * @param port
  38. * @return RyanMqttError_e
  39. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  40. */
  41. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, uint16_t port)
  42. {
  43. RyanMqttError_e result = RyanMqttSuccessError;
  44. struct hostent hostinfo = {0};
  45. // 解析域名信息
  46. {
  47. char buf[512];
  48. int h_errnop;
  49. struct hostent *phost;
  50. if (0 != gethostbyname_r(host, &hostinfo, buf, sizeof(buf), &phost, &h_errnop))
  51. {
  52. rlog_w("平台可能不支持 gethostbyname_r 函数, 再次尝试使用 gethostbyname 获取域名信息");
  53. // 非线程安全版本,请根据实际情况选择使用
  54. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  55. struct hostent *phostinfo = gethostbyname(host);
  56. if (NULL == phostinfo)
  57. {
  58. result = RyanMqttNoRescourceError;
  59. goto __exit;
  60. }
  61. hostinfo = *phostinfo;
  62. }
  63. }
  64. platformNetwork->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  65. if (platformNetwork->socket < 0)
  66. {
  67. result = RyanSocketFailedError;
  68. goto __exit;
  69. }
  70. struct sockaddr_in server_addr = {
  71. .sin_family = AF_INET,
  72. .sin_port = htons(port), // 指定端口号
  73. .sin_addr = *((struct in_addr *)hostinfo.h_addr_list[0]),
  74. };
  75. // 绑定套接字到主机地址和端口号
  76. if (connect(platformNetwork->socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  77. {
  78. platformNetworkClose(userData, platformNetwork);
  79. result = RyanMqttSocketConnectFailError;
  80. goto __exit;
  81. }
  82. __exit:
  83. if (RyanMqttSuccessError != result)
  84. rlog_e("socket连接失败: %d", result);
  85. return result;
  86. }
  87. /**
  88. * @brief 非阻塞接收数据
  89. *
  90. * @param userData
  91. * @param platformNetwork
  92. * @param recvBuf
  93. * @param recvLen
  94. * @param timeout
  95. * @return RyanMqttError_e
  96. * socket错误返回 RyanSocketFailedError
  97. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  98. * 接收成功 RyanMqttSuccessError
  99. */
  100. RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
  101. {
  102. int32_t recvResult = 0;
  103. int32_t offset = 0;
  104. int32_t timeOut2 = timeout;
  105. struct timeval tv = {0};
  106. platformTimer_t timer = {0};
  107. if (-1 == platformNetwork->socket)
  108. {
  109. rlog_e("对端关闭socket连接");
  110. return RyanMqttNoRescourceError;
  111. }
  112. platformTimerCutdown(&timer, timeout);
  113. while ((offset < recvLen) && (0 != timeOut2))
  114. {
  115. tv.tv_sec = timeOut2 / 1000;
  116. tv.tv_usec = timeOut2 % 1000 * 1000;
  117. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  118. recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
  119. if (0 == recvResult)
  120. {
  121. rlog_e("对端关闭socket连接");
  122. return RyanMqttNoRescourceError;
  123. }
  124. else if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  125. {
  126. int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
  127. // 下列表示没问题,但需要退出接收
  128. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  129. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  130. EINTR == rt_errno || // 操作被信号中断
  131. ETIME == rt_errno) // 计时器过期
  132. {
  133. break;
  134. }
  135. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  136. rlog_e("recvResult: %d, errno: %d str: %s", recvResult, rt_errno, strerror(rt_errno));
  137. return RyanSocketFailedError;
  138. }
  139. offset += recvResult;
  140. timeOut2 = platformTimerRemain(&timer);
  141. }
  142. if (offset != recvLen)
  143. return RyanMqttRecvPacketTimeOutError;
  144. return RyanMqttSuccessError;
  145. }
  146. /**
  147. * @brief 非阻塞发送数据
  148. *
  149. * @param userData
  150. * @param platformNetwork
  151. * @param sendBuf
  152. * @param sendLen
  153. * @param timeout
  154. * @return RyanMqttError_e
  155. * socket错误返回 RyanSocketFailedError
  156. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  157. * 接收成功 RyanMqttSuccessError
  158. */
  159. RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
  160. {
  161. int32_t sendResult = 0;
  162. int32_t offset = 0;
  163. int32_t timeOut2 = timeout;
  164. struct timeval tv = {0};
  165. platformTimer_t timer = {0};
  166. if (-1 == platformNetwork->socket)
  167. {
  168. rlog_e("对端关闭socket连接");
  169. return RyanMqttNoRescourceError;
  170. }
  171. platformTimerCutdown(&timer, timeout);
  172. while ((offset < sendLen) && (0 != timeOut2))
  173. {
  174. tv.tv_sec = timeOut2 / 1000;
  175. tv.tv_usec = timeOut2 % 1000 * 1000;
  176. setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞
  177. sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0);
  178. if (0 == sendResult)
  179. {
  180. rlog_e("对端关闭socket连接");
  181. return RyanMqttNoRescourceError;
  182. }
  183. else if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  184. {
  185. int32_t rt_errno = errno; // 似乎5.0.0以上版本需要使用 rt_get_errno
  186. // 下列表示没问题,但需要退出发送
  187. if (EAGAIN == rt_errno || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  188. EWOULDBLOCK == rt_errno || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空
  189. EINTR == rt_errno || // 操作被信号中断
  190. ETIME == rt_errno) // 计时器过期
  191. {
  192. break;
  193. }
  194. // NOLINTNEXTLINE(concurrency-mt-unsafe)
  195. rlog_e("sendResult: %d, errno: %d str: %s", sendResult, rt_errno, strerror(rt_errno));
  196. return RyanSocketFailedError;
  197. }
  198. offset += sendResult;
  199. timeOut2 = platformTimerRemain(&timer);
  200. }
  201. if (offset != sendLen)
  202. return RyanMqttSendPacketTimeOutError;
  203. return RyanMqttSuccessError;
  204. }
  205. /**
  206. * @brief 断开mqtt服务器连接
  207. *
  208. * @param userData
  209. * @param platformNetwork
  210. * @return RyanMqttError_e
  211. */
  212. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  213. {
  214. if (platformNetwork->socket >= 0)
  215. {
  216. closesocket(platformNetwork->socket);
  217. rlog_w("platformNetworkClose socket close %d", platformNetwork->socket);
  218. platformNetwork->socket = -1;
  219. }
  220. return RyanMqttSuccessError;
  221. }