platformNetwork.c 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. /**
  2. * 注意!此接口仅提供思路示例,具体请根据实际进行修改
  3. *
  4. */
  5. // #define rlogEnable // 是否使能日志
  6. #define rlogColorEnable // 是否使能日志颜色
  7. #define rlogLevel (rlogLvlWarning) // 日志打印等级
  8. #define rlogTag "RyanMqttNet" // 日志tag
  9. #include "platformNetwork.h"
  10. #include "RyanMqttLog.h"
  11. #define tcpConnect (RyanMqttBit1)
  12. #define tcpSend (RyanMqttBit2)
  13. #define tcpClose (RyanMqttBit3)
  14. #define tcpRecv (RyanMqttBit4)
  15. static char g_resolveIp[64] = {0};
  16. static osMutexId_t mutex = NULL;
  17. static osSemaphoreId_t sem = NULL;
  18. static void callback_socket_GetIPByHostName(u8 contexId, s32 errCode, u32 ipAddrCnt, u8 *ipAddr)
  19. {
  20. if (errCode == SOC_SUCCESS_OK)
  21. {
  22. memset(g_resolveIp, 0, sizeof(g_resolveIp));
  23. for (int i = 0; i < ipAddrCnt; i++)
  24. {
  25. strcpy((char *)g_resolveIp, (char *)ipAddr);
  26. rlog_i("socket 获取ip成功: num_entry=%d, resolve_ip:[%s]", i, (u8 *)g_resolveIp);
  27. }
  28. osSemaphoreRelease(sem);
  29. }
  30. else
  31. {
  32. rlog_e("socket 获取ip失败: %d", errCode);
  33. }
  34. }
  35. static void callback_socket_connect(s32 socketId, s32 errCode, void *customParam)
  36. {
  37. platformNetwork_t *platformNetwork = (platformNetwork_t *)customParam;
  38. if (errCode == SOC_SUCCESS_OK && platformNetwork->socket == socketId)
  39. {
  40. rlog_i("socket 连接成功: %d", socketId);
  41. osEventFlagsSet(platformNetwork.mqttNetEventHandle, tcpConnect);
  42. }
  43. }
  44. static void callback_socket_close(s32 socketId, s32 errCode, void *customParam)
  45. {
  46. if (errCode == SOC_SUCCESS_OK)
  47. {
  48. rlog_w("关闭socket成功: %d", socketId);
  49. }
  50. else
  51. {
  52. rlog_e("关闭socket失败 socketId=%d,error_cause=%d", socketId, errCode);
  53. }
  54. }
  55. static void callback_socket_read(s32 socketId, s32 errCode, void *customParam)
  56. {
  57. platformNetwork_t *platformNetwork = (platformNetwork_t *)customParam;
  58. if (SOC_SUCCESS_OK == errCode && platformNetwork->socket == socketId)
  59. {
  60. rlog_w("socket接收到数据: %d", socketId);
  61. osEventFlagsSet(platformNetwork.mqttNetEventHandle, tcpRecv);
  62. }
  63. }
  64. static void callback_socket_accept(s32 listenSocketId, s32 errCode, void *customParam)
  65. {
  66. }
  67. static ST_SOC_Callback callback_soc_func =
  68. {
  69. callback_socket_connect,
  70. callback_socket_close,
  71. callback_socket_accept,
  72. callback_socket_read,
  73. };
  74. /**
  75. * @brief 连接mqtt服务器
  76. *
  77. * @param userData
  78. * @param platformNetwork
  79. * @param host
  80. * @param port
  81. * @return RyanMqttError_e
  82. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  83. */
  84. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, uint16_t port)
  85. {
  86. RyanMqttError_e result = RyanMqttSuccessError;
  87. u8 nw_state = 0;
  88. int32_t eventId;
  89. char resolveIp[64] = {0};
  90. // 如果第一次connect就创建事件标志组,否则情况事件标志组标志位
  91. if (NULL == mutex)
  92. {
  93. const osMutexAttr_t myMutex01_attributes = {
  94. .attr_bits = osMutexRecursive | osMutexPrioInherit | osMutexRobust};
  95. mutex = osMutexNew(&myMutex01_attributes);
  96. }
  97. if (NULL == sem)
  98. {
  99. sem = osSemaphoreNew(1, 0, NULL);
  100. }
  101. // 如果第一次connect就创建事件标志组,否则情况事件标志组标志位
  102. if (NULL == platformNetwork.mqttNetEventHandle)
  103. platformNetwork.mqttNetEventHandle = osEventFlagsNew(NULL);
  104. Ql_SOC_Register(callback_soc_func, platformNetwork); // 注册socket回调函数
  105. // 获取网络连接状态
  106. Ql_GetCeregState(&nw_state);
  107. if ((1 != nw_state) && (5 != nw_state))
  108. {
  109. result = RyanMqttSocketConnectFailError;
  110. goto __exit;
  111. }
  112. osMutexAcquire(mutex, osWaitForever);
  113. // 清除接收标志位
  114. osSemaphoreAcquire(sem, 0);
  115. // 解析域名
  116. s32 getHostIpResult = Ql_IpHelper_GetIPByHostName(0,
  117. (u8 *)host,
  118. callback_socket_GetIPByHostName);
  119. if (SOC_SUCCESS_OK != getHostIpResult && SOC_NONBLOCK != getHostIpResult)
  120. {
  121. result = RyanMqttSocketConnectFailError;
  122. osMutexRelease(mutex);
  123. goto __exit;
  124. }
  125. if (osOK != osSemaphoreAcquire(sem, 10 * 1000))
  126. {
  127. result = RyanMqttSocketConnectFailError;
  128. osMutexRelease(mutex);
  129. goto __exit;
  130. }
  131. memcpy(resolveIp, g_resolveIp, sizeof(resolveIp));
  132. osMutexRelease(mutex);
  133. // 创建socket
  134. platformNetwork->socket = Ql_SOC_Create(0, SOC_TYPE_TCP);
  135. if (platformNetwork->socket < 0)
  136. {
  137. result = RyanSocketFailedError;
  138. goto __exit;
  139. }
  140. // 等待连接成功
  141. s32 connectResult = Ql_SOC_Connect(platformNetwork->socket, (u8 *)resolveIp, port);
  142. if (SOC_SUCCESS_OK != connectResult && SOC_NONBLOCK != connectResult)
  143. {
  144. platformNetworkClose(userData, platformNetwork);
  145. result = RyanMqttSocketConnectFailError;
  146. goto __exit;
  147. }
  148. eventId = osEventFlagsWait(mqttNetEventHandle, tcpConnect, osFlagsWaitAny, 10000);
  149. if (tcpConnect != eventId)
  150. {
  151. platformNetworkClose(userData, platformNetwork);
  152. result = RyanMqttSocketConnectFailError;
  153. goto __exit;
  154. }
  155. __exit:
  156. return result;
  157. }
  158. /**
  159. * @brief 非阻塞接收数据
  160. *
  161. * @param userData
  162. * @param platformNetwork
  163. * @param recvBuf
  164. * @param recvLen
  165. * @param timeout
  166. * @return RyanMqttError_e
  167. * socket错误返回 RyanSocketFailedError
  168. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  169. * 接收成功 RyanMqttSuccessError
  170. */
  171. RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
  172. {
  173. int32_t recvResult = 0;
  174. int32_t offset = 0;
  175. int32_t timeOut2 = timeout;
  176. int32_t eventId;
  177. platformTimer_t timer = {0};
  178. if (-1 == platformNetwork->socket)
  179. return RyanSocketFailedError;
  180. platformTimerCutdown(&timer, timeout);
  181. while ((offset < recvLen) && (0 != timeOut2))
  182. {
  183. recvResult = Ql_SOC_Recv(platformNetwork->socket, (u8 *)(recvBuf + offset), recvLen - offset);
  184. if (recvResult > 0)
  185. {
  186. offset += recvResult;
  187. }
  188. else
  189. {
  190. eventId = osEventFlagsWait(mqttNetEventHandle, tcpRecv, osFlagsWaitAny, timeOut2);
  191. if (tcpRecv == eventId)
  192. {
  193. recvResult = Ql_SOC_Recv(platformNetwork->socket, (u8 *)(recvBuf + offset), recvLen - offset);
  194. if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  195. {
  196. if (recvResult != SOC_NONBLOCK &&
  197. recvResult != SOC_ERROR_TIMEOUT)
  198. {
  199. rlog_e("recv失败 result: %d, recvLen: %d, eventId: %d", recvResult, recvLen, eventId);
  200. return RyanSocketFailedError;
  201. }
  202. break;
  203. }
  204. offset += recvResult;
  205. }
  206. }
  207. timeOut2 = platformTimerRemain(&timer);
  208. }
  209. if (offset != recvLen)
  210. return RyanMqttRecvPacketTimeOutError;
  211. return RyanMqttSuccessError;
  212. }
  213. /**
  214. * @brief 非阻塞发送数据
  215. *
  216. * @param userData
  217. * @param platformNetwork
  218. * @param sendBuf
  219. * @param sendLen
  220. * @param timeout
  221. * @return RyanMqttError_e
  222. * socket错误返回 RyanSocketFailedError
  223. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  224. * 接收成功 RyanMqttSuccessError
  225. */
  226. RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
  227. {
  228. int32_t sendResult = 0;
  229. int32_t offset = 0;
  230. int32_t timeOut2 = timeout;
  231. int32_t eventId;
  232. platformTimer_t timer = {0};
  233. if (-1 == platformNetwork->socket)
  234. return RyanSocketFailedError;
  235. platformTimerCutdown(&timer, timeout);
  236. while ((offset < sendLen) && (0 != timeOut2))
  237. {
  238. sendResult = Ql_SOC_Send(platformNetwork->socket, (u8 *)(sendBuf + offset), sendLen - offset);
  239. if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  240. {
  241. if (sendResult != SOC_NONBLOCK &&
  242. sendResult != SOC_ERROR_TIMEOUT)
  243. return RyanSocketFailedError;
  244. }
  245. offset += sendResult;
  246. timeOut2 = platformTimerRemain(&timer);
  247. }
  248. // osDelay(1000);
  249. if (offset != sendLen)
  250. return RyanMqttSendPacketTimeOutError;
  251. return RyanMqttSuccessError;
  252. }
  253. /**
  254. * @brief 断开mqtt服务器连接
  255. *
  256. * @param userData
  257. * @param platformNetwork
  258. * @return RyanMqttError_e
  259. */
  260. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  261. {
  262. if (platformNetwork->socket >= 0)
  263. {
  264. Ql_SOC_Close(platformNetwork->socket);
  265. platformNetwork->socket = -1;
  266. // todo 这里还是推荐在close的时候把时间标志组删除,否则没有别的地方可以调用删除函数。
  267. if (platformNetwork.mqttNetEventHandle)
  268. {
  269. osEventFlagsDelete(platformNetwork.mqttNetEventHandle);
  270. platformNetwork.mqttNetEventHandle = NULL;
  271. }
  272. }
  273. return RyanMqttSuccessError;
  274. }