Explorar el Código

Merge branch 'bugfix/ws_client_sending_race_v4.0' into 'release/v4.0'

websocket_client: fix locking in ws client task (v4.0)

See merge request espressif/esp-idf!7712
David Čermák hace 6 años
padre
commit
8a56c31a25
Se han modificado 1 ficheros con 36 adiciones y 22 borrados
  1. 36 22
      components/esp_websocket_client/esp_websocket_client.c

+ 36 - 22
components/esp_websocket_client/esp_websocket_client.c

@@ -243,7 +243,7 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie
         return NULL;
     }
 
-    client->lock = xSemaphoreCreateMutex();
+    client->lock = xSemaphoreCreateRecursiveMutex();
     ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail);
 
     client->config = calloc(1, sizeof(websocket_config_storage_t));
@@ -428,6 +428,7 @@ esp_err_t esp_websocket_client_set_uri(esp_websocket_client_handle_t client, con
 
 static void esp_websocket_client_task(void *pv)
 {
+    const int lock_timeout = portMAX_DELAY;
     int rlen;
     esp_websocket_client_handle_t client = (esp_websocket_client_handle_t) pv;
     client->run = true;
@@ -446,8 +447,12 @@ static void esp_websocket_client_task(void *pv)
 
     client->state = WEBSOCKET_STATE_INIT;
     xEventGroupClearBits(client->status_bits, STOPPED_BIT);
-    int read_select;
+    int read_select = 0;
     while (client->run) {
+        if (xSemaphoreTakeRecursive(client->lock, lock_timeout) != pdPASS) {
+            ESP_LOGE(TAG, "Failed to lock ws-client tasks, exitting the task...");
+            break;
+        }
         switch ((int)client->state) {
             case WEBSOCKET_STATE_INIT:
                 if (client->transport == NULL) {
@@ -455,7 +460,6 @@ static void esp_websocket_client_task(void *pv)
                     client->run = false;
                     break;
                 }
-
                 if (esp_transport_connect(client->transport,
                                           client->config->host,
                                           client->config->port,
@@ -471,20 +475,14 @@ static void esp_websocket_client_task(void *pv)
 
                 break;
             case WEBSOCKET_STATE_CONNECTED:
-                read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
-                if (read_select < 0) {
-                    ESP_LOGE(TAG, "Network error, errorno");
-                    esp_websocket_client_abort_connection(client);
-                    break;
-                }
                 if (_tick_get_ms() - client->ping_tick_ms > WEBSOCKET_PING_TIMEOUT_MS) {
                     client->ping_tick_ms = _tick_get_ms();
-                    // Send PING
+                    ESP_LOGD(TAG, "Sending PING...");
                     esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING, NULL, 0, client->config->network_timeout_ms);
                 }
                 if (read_select == 0) {
-                    ESP_LOGD(TAG, "Timeout...");
-                    continue;
+                    ESP_LOGV(TAG, "Read poll timeout: skipping esp_transport_read()...");
+                    break;
                 }
                 client->ping_tick_ms = _tick_get_ms();
 
@@ -516,9 +514,19 @@ static void esp_websocket_client_task(void *pv)
                     client->reconnect_tick_ms = _tick_get_ms();
                     ESP_LOGD(TAG, "Reconnecting...");
                 }
-                vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS);
                 break;
         }
+        xSemaphoreGiveRecursive(client->lock);
+        if (WEBSOCKET_STATE_CONNECTED == client->state) {
+            read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
+            if (read_select < 0) {
+                ESP_LOGE(TAG, "Network error: esp_transport_poll_read() returned %d, errno=%d", read_select, errno);
+                esp_websocket_client_abort_connection(client);
+            }
+        } else if (WEBSOCKET_STATE_WAIT_TIMEOUT == client->state) {
+            // waiting for reconnecting...
+            vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS);
+        }
     }
 
     esp_transport_close(client->transport);
@@ -580,25 +588,28 @@ static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t c
 {
     int need_write = len;
     int wlen = 0, widx = 0;
+    int ret = ESP_FAIL;
 
     if (client == NULL || data == NULL || len <= 0) {
         ESP_LOGE(TAG, "Invalid arguments");
         return ESP_FAIL;
     }
 
+    if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
+        ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", timeout);
+        return ESP_FAIL;
+    }
+
     if (!esp_websocket_client_is_connected(client)) {
         ESP_LOGE(TAG, "Websocket client is not connected");
-        return ESP_FAIL;
+        goto unlock_and_return;
     }
 
     if (client->transport == NULL) {
         ESP_LOGE(TAG, "Invalid transport");
-        return ESP_FAIL;
+        goto unlock_and_return;
     }
 
-    if (xSemaphoreTake(client->lock, timeout) != pdPASS) {
-        return ESP_FAIL;
-    }
 
     while (widx < len) {
         if (need_write > client->buffer_size) {
@@ -608,14 +619,17 @@ static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t c
         // send with ws specific way and specific opcode
         wlen = esp_transport_ws_send_raw(client->transport, opcode, (char *)client->tx_buffer, need_write, timeout);
         if (wlen <= 0) {
-            xSemaphoreGive(client->lock);
-            return wlen;
+            ret = wlen;
+            ESP_LOGE(TAG, "Network error: esp_transport_write() returned %d, errno=%d", ret, errno);
+            goto unlock_and_return;
         }
         widx += wlen;
         need_write = len - widx;
     }
-    xSemaphoreGive(client->lock);
-    return widx;
+    ret = widx;
+unlock_and_return:
+    xSemaphoreGiveRecursive(client->lock);
+    return ret;
 }
 
 bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client)