|
|
@@ -92,7 +92,7 @@ MQTTStatus_t mqttConnect(NetworkContext_t *networkContext)
|
|
|
return status;
|
|
|
}
|
|
|
|
|
|
- rt_kprintf("MQTT broker connected\n");
|
|
|
+ rt_kprintf("[%d] MQTT broker connected\n", getCurrentTime());
|
|
|
return MQTTSuccess;
|
|
|
}
|
|
|
|
|
|
@@ -128,21 +128,6 @@ MQTTStatus_t mqttPublish(MQTTPublishInfo_t *publishInfo)
|
|
|
return MQTTSuccess;
|
|
|
}
|
|
|
|
|
|
-static bool isSocketReadable(int socket, int timeout_ms)
|
|
|
-{
|
|
|
- fd_set readfds;
|
|
|
- struct timeval timeout;
|
|
|
-
|
|
|
- FD_ZERO(&readfds);
|
|
|
- FD_SET(socket, &readfds);
|
|
|
-
|
|
|
- timeout.tv_sec = timeout_ms / 1000;
|
|
|
- timeout.tv_usec = (timeout_ms % 1000) * 1000;
|
|
|
-
|
|
|
- int result = select(socket + 1, &readfds, NULL, NULL, &timeout);
|
|
|
- return (result > 0 && FD_ISSET(socket, &readfds));
|
|
|
-}
|
|
|
-
|
|
|
const char *mqttStatus(MQTTStatus_t status)
|
|
|
{
|
|
|
const char *const statusStrings[] = {
|
|
|
@@ -252,15 +237,14 @@ void mqttClientTask(void *parameter)
|
|
|
|
|
|
while (1)
|
|
|
{
|
|
|
- if (isSocketReadable(networkContext.socket, 100))
|
|
|
+ status = MQTT_ProcessLoop(&mqttContext);
|
|
|
+ if (status != MQTTSuccess && status != MQTTNeedMoreBytes)
|
|
|
{
|
|
|
- status = MQTT_ProcessLoop(&mqttContext);
|
|
|
- if (status != MQTTSuccess)
|
|
|
- {
|
|
|
- MQTT_PRINT("MQTT_ProcessLoop failed: %d (%s)\n", status, mqttStatus(status));
|
|
|
- break;
|
|
|
- }
|
|
|
+ MQTT_PRINT("MQTT_ProcessLoop failed: %d (%s)\n", status, mqttStatus(status));
|
|
|
+ status = MQTT_Disconnect(&mqttContext);
|
|
|
+ break;
|
|
|
}
|
|
|
+
|
|
|
rt_thread_mdelay(MQTT_LOOP_CNT);
|
|
|
}
|
|
|
|