platformNetwork.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. /**
  2. * 注意!此接口仅提供思路示例,具体请根据实际进行修改
  3. *
  4. */
  5. #define rlogLevel (rlogLvlWarning) // 日志打印等级
  6. #include "platformNetwork.h"
  7. #include "RyanMqttLog.h"
  8. #define tcpConnect (RyanMqttBit1)
  9. #define tcpSend (RyanMqttBit2)
  10. #define tcpClose (RyanMqttBit3)
  11. #define tcpRecv (RyanMqttBit4)
  12. static char g_resolveIp[64] = {0};
  13. static osMutexId_t mutex = NULL;
  14. static osSemaphoreId_t sem = NULL;
  15. static void callback_socket_GetIPByHostName(u8 contexId, s32 errCode, u32 ipAddrCnt, u8 *ipAddr)
  16. {
  17. if (errCode == SOC_SUCCESS_OK)
  18. {
  19. memset(g_resolveIp, 0, sizeof(g_resolveIp));
  20. for (int i = 0; i < ipAddrCnt; i++)
  21. {
  22. strcpy((char *)g_resolveIp, (char *)ipAddr);
  23. rlog_i("socket 获取ip成功: num_entry=%d, resolve_ip:[%s]", i, (u8 *)g_resolveIp);
  24. }
  25. osSemaphoreRelease(sem);
  26. }
  27. else
  28. {
  29. rlog_e("socket 获取ip失败: %d", errCode);
  30. }
  31. }
  32. static void callback_socket_connect(s32 socketId, s32 errCode, void *customParam)
  33. {
  34. platformNetwork_t *platformNetwork = (platformNetwork_t *)customParam;
  35. if (errCode == SOC_SUCCESS_OK && platformNetwork->socket == socketId)
  36. {
  37. rlog_i("socket 连接成功: %d", socketId);
  38. osEventFlagsSet(platformNetwork.mqttNetEventHandle, tcpConnect);
  39. }
  40. }
  41. static void callback_socket_close(s32 socketId, s32 errCode, void *customParam)
  42. {
  43. if (errCode == SOC_SUCCESS_OK)
  44. {
  45. rlog_w("关闭socket成功: %d", socketId);
  46. }
  47. else
  48. {
  49. rlog_e("关闭socket失败 socketId=%d,error_cause=%d", socketId, errCode);
  50. }
  51. }
  52. static void callback_socket_read(s32 socketId, s32 errCode, void *customParam)
  53. {
  54. platformNetwork_t *platformNetwork = (platformNetwork_t *)customParam;
  55. if (SOC_SUCCESS_OK == errCode && platformNetwork->socket == socketId)
  56. {
  57. rlog_w("socket接收到数据: %d", socketId);
  58. osEventFlagsSet(platformNetwork.mqttNetEventHandle, tcpRecv);
  59. }
  60. }
  61. static void callback_socket_accept(s32 listenSocketId, s32 errCode, void *customParam)
  62. {
  63. }
  64. static ST_SOC_Callback callback_soc_func =
  65. {
  66. callback_socket_connect,
  67. callback_socket_close,
  68. callback_socket_accept,
  69. callback_socket_read,
  70. };
  71. /**
  72. * @brief 连接mqtt服务器
  73. *
  74. * @param userData
  75. * @param platformNetwork
  76. * @param host
  77. * @param port
  78. * @return RyanMqttError_e
  79. * 成功返回RyanMqttSuccessError, 失败返回错误信息
  80. */
  81. RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, uint16_t port)
  82. {
  83. RyanMqttError_e result = RyanMqttSuccessError;
  84. u8 nw_state = 0;
  85. int32_t eventId;
  86. char resolveIp[64] = {0};
  87. // 如果第一次connect就创建事件标志组,否则情况事件标志组标志位
  88. if (NULL == mutex)
  89. {
  90. const osMutexAttr_t myMutex01_attributes = {
  91. .attr_bits = osMutexRecursive | osMutexPrioInherit | osMutexRobust};
  92. mutex = osMutexNew(&myMutex01_attributes);
  93. }
  94. if (NULL == sem)
  95. {
  96. sem = osSemaphoreNew(1, 0, NULL);
  97. }
  98. // 如果第一次connect就创建事件标志组,否则情况事件标志组标志位
  99. if (NULL == platformNetwork.mqttNetEventHandle)
  100. platformNetwork.mqttNetEventHandle = osEventFlagsNew(NULL);
  101. Ql_SOC_Register(callback_soc_func, platformNetwork); // 注册socket回调函数
  102. // 获取网络连接状态
  103. Ql_GetCeregState(&nw_state);
  104. if ((1 != nw_state) && (5 != nw_state))
  105. {
  106. result = RyanMqttSocketConnectFailError;
  107. goto __exit;
  108. }
  109. osMutexAcquire(mutex, osWaitForever);
  110. // 清除接收标志位
  111. osSemaphoreAcquire(sem, 0);
  112. // 解析域名
  113. s32 getHostIpResult = Ql_IpHelper_GetIPByHostName(0,
  114. (u8 *)host,
  115. callback_socket_GetIPByHostName);
  116. if (SOC_SUCCESS_OK != getHostIpResult && SOC_NONBLOCK != getHostIpResult)
  117. {
  118. result = RyanMqttSocketConnectFailError;
  119. osMutexRelease(mutex);
  120. goto __exit;
  121. }
  122. if (osOK != osSemaphoreAcquire(sem, 10 * 1000))
  123. {
  124. result = RyanMqttSocketConnectFailError;
  125. osMutexRelease(mutex);
  126. goto __exit;
  127. }
  128. memcpy(resolveIp, g_resolveIp, sizeof(resolveIp));
  129. osMutexRelease(mutex);
  130. // 创建socket
  131. platformNetwork->socket = Ql_SOC_Create(0, SOC_TYPE_TCP);
  132. if (platformNetwork->socket < 0)
  133. {
  134. result = RyanSocketFailedError;
  135. goto __exit;
  136. }
  137. // 等待连接成功
  138. s32 connectResult = Ql_SOC_Connect(platformNetwork->socket, (u8 *)resolveIp, port);
  139. if (SOC_SUCCESS_OK != connectResult && SOC_NONBLOCK != connectResult)
  140. {
  141. platformNetworkClose(userData, platformNetwork);
  142. result = RyanMqttSocketConnectFailError;
  143. goto __exit;
  144. }
  145. eventId = osEventFlagsWait(mqttNetEventHandle, tcpConnect, osFlagsWaitAny, 10000);
  146. if (tcpConnect != eventId)
  147. {
  148. platformNetworkClose(userData, platformNetwork);
  149. result = RyanMqttSocketConnectFailError;
  150. goto __exit;
  151. }
  152. __exit:
  153. return result;
  154. }
  155. /**
  156. * @brief 非阻塞接收数据
  157. *
  158. * @param userData
  159. * @param platformNetwork
  160. * @param recvBuf
  161. * @param recvLen
  162. * @param timeout
  163. * @return RyanMqttError_e
  164. * socket错误返回 RyanSocketFailedError
  165. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  166. * 接收成功 RyanMqttSuccessError
  167. */
  168. RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout)
  169. {
  170. int32_t recvResult = 0;
  171. int32_t offset = 0;
  172. int32_t timeOut2 = timeout;
  173. int32_t eventId;
  174. platformTimer_t timer = {0};
  175. if (-1 == platformNetwork->socket)
  176. return RyanSocketFailedError;
  177. platformTimerCutdown(&timer, timeout);
  178. while ((offset < recvLen) && (0 != timeOut2))
  179. {
  180. recvResult = Ql_SOC_Recv(platformNetwork->socket, (u8 *)(recvBuf + offset), recvLen - offset);
  181. if (recvResult > 0)
  182. {
  183. offset += recvResult;
  184. }
  185. else
  186. {
  187. eventId = osEventFlagsWait(mqttNetEventHandle, tcpRecv, osFlagsWaitAny, timeOut2);
  188. if (tcpRecv == eventId)
  189. {
  190. recvResult = Ql_SOC_Recv(platformNetwork->socket, (u8 *)(recvBuf + offset), recvLen - offset);
  191. if (recvResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  192. {
  193. if (recvResult != SOC_NONBLOCK &&
  194. recvResult != SOC_ERROR_TIMEOUT)
  195. {
  196. rlog_e("recv失败 result: %d, recvLen: %d, eventId: %d", recvResult, recvLen, eventId);
  197. return RyanSocketFailedError;
  198. }
  199. break;
  200. }
  201. offset += recvResult;
  202. }
  203. }
  204. timeOut2 = platformTimerRemain(&timer);
  205. }
  206. if (offset != recvLen)
  207. return RyanMqttRecvPacketTimeOutError;
  208. return RyanMqttSuccessError;
  209. }
  210. /**
  211. * @brief 非阻塞发送数据
  212. *
  213. * @param userData
  214. * @param platformNetwork
  215. * @param sendBuf
  216. * @param sendLen
  217. * @param timeout
  218. * @return RyanMqttError_e
  219. * socket错误返回 RyanSocketFailedError
  220. * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError
  221. * 接收成功 RyanMqttSuccessError
  222. */
  223. RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout)
  224. {
  225. int32_t sendResult = 0;
  226. int32_t offset = 0;
  227. int32_t timeOut2 = timeout;
  228. int32_t eventId;
  229. platformTimer_t timer = {0};
  230. if (-1 == platformNetwork->socket)
  231. return RyanSocketFailedError;
  232. platformTimerCutdown(&timer, timeout);
  233. while ((offset < sendLen) && (0 != timeOut2))
  234. {
  235. sendResult = Ql_SOC_Send(platformNetwork->socket, (u8 *)(sendBuf + offset), sendLen - offset);
  236. if (sendResult < 0) // 小于零,表示错误,个别错误不代表socket错误
  237. {
  238. if (sendResult != SOC_NONBLOCK &&
  239. sendResult != SOC_ERROR_TIMEOUT)
  240. return RyanSocketFailedError;
  241. }
  242. offset += sendResult;
  243. timeOut2 = platformTimerRemain(&timer);
  244. }
  245. // osDelay(1000);
  246. if (offset != sendLen)
  247. return RyanMqttSendPacketTimeOutError;
  248. return RyanMqttSuccessError;
  249. }
  250. /**
  251. * @brief 断开mqtt服务器连接
  252. *
  253. * @param userData
  254. * @param platformNetwork
  255. * @return RyanMqttError_e
  256. */
  257. RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
  258. {
  259. if (platformNetwork->socket >= 0)
  260. {
  261. Ql_SOC_Close(platformNetwork->socket);
  262. platformNetwork->socket = -1;
  263. // todo 这里还是推荐在close的时候把时间标志组删除,否则没有别的地方可以调用删除函数。
  264. if (platformNetwork.mqttNetEventHandle)
  265. {
  266. osEventFlagsDelete(platformNetwork.mqttNetEventHandle);
  267. platformNetwork.mqttNetEventHandle = NULL;
  268. }
  269. }
  270. return RyanMqttSuccessError;
  271. }