Bläddra i källkod

ws_client: Added support for close frame, closing connection gracefully

David Cermak 5 år sedan
förälder
incheckning
b213f2c6d3

+ 80 - 12
components/esp_websocket_client/esp_websocket_client.c

@@ -52,6 +52,8 @@ static const char *TAG = "WEBSOCKET_CLIENT";
         }
 
 const static int STOPPED_BIT = BIT0;
+const static int CLOSING_BIT = BIT1;    // Indicates that a close frame received from server
+                                        // and we are waiting for the "Reset by Peer" from the server
 
 ESP_EVENT_DEFINE_BASE(WEBSOCKET_EVENTS);
 
@@ -477,6 +479,11 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
     do {
         rlen = esp_transport_read(client->transport, client->rx_buffer, client->buffer_size, client->config->network_timeout_ms);
         if (rlen < 0) {
+            if (CLOSING_BIT & xEventGroupGetBits(client->status_bits)) {
+                client->run = false;
+                client->state = WEBSOCKET_STATE_UNKNOW;
+                return ESP_OK;
+            }
             ESP_LOGE(TAG, "Error read data");
             return ESP_FAIL;
         }
@@ -493,9 +500,10 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
         const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
         esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
                                   client->config->network_timeout_ms);
-    }
-    else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
+    } else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
         client->wait_for_pong_resp = false;
+    } else if (client->last_opcode == WS_TRANSPORT_OPCODES_CLOSE) {
+        xEventGroupSetBits(client->status_bits, CLOSING_BIT);
     }
 
     return ESP_OK;
@@ -520,7 +528,7 @@ static void esp_websocket_client_task(void *pv)
     }
 
     client->state = WEBSOCKET_STATE_INIT;
-    xEventGroupClearBits(client->status_bits, STOPPED_BIT);
+    xEventGroupClearBits(client->status_bits, STOPPED_BIT | CLOSING_BIT);
     int read_select = 0;
     while (client->run) {
         if (xSemaphoreTakeRecursive(client->lock, lock_timeout) != pdPASS) {
@@ -598,6 +606,11 @@ static void esp_websocket_client_task(void *pv)
         if (WEBSOCKET_STATE_CONNECTED == client->state) {
             read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
             if (read_select < 0) {
+                if (CLOSING_BIT & xEventGroupGetBits(client->status_bits)) {
+                    client->run = false;
+                    client->state = WEBSOCKET_STATE_UNKNOW;
+                    break;
+                }
                 ESP_LOGE(TAG, "Network error: esp_transport_poll_read() returned %d, errno=%d", read_select, errno);
                 esp_websocket_client_abort_connection(client);
             }
@@ -626,7 +639,7 @@ esp_err_t esp_websocket_client_start(esp_websocket_client_handle_t client)
         ESP_LOGE(TAG, "Error create websocket task");
         return ESP_FAIL;
     }
-    xEventGroupClearBits(client->status_bits, STOPPED_BIT);
+    xEventGroupClearBits(client->status_bits, STOPPED_BIT | CLOSING_BIT);
     return ESP_OK;
 }
 
@@ -645,30 +658,85 @@ esp_err_t esp_websocket_client_stop(esp_websocket_client_handle_t client)
     return ESP_OK;
 }
 
-static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, const char *data, int len, TickType_t timeout);
+static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, uint8_t *data, int len, TickType_t timeout);
+
+int esp_websocket_client_send_close(esp_websocket_client_handle_t client, int code, const char *additional_data, int total_len, TickType_t timeout)
+{
+    uint8_t *close_status_data = NULL;
+    // RFC6455#section-5.5.1: The Close frame MAY contain a body (indicated by total_len >= 2)
+    if (total_len >= 2) {
+        close_status_data = calloc(1, total_len);
+        // RFC6455#section-5.5.1: The first two bytes of the body MUST be a 2-byte representing a status
+        uint16_t *code_network_order = (uint16_t *) close_status_data;
+        *code_network_order = htons(code);
+        memcpy(close_status_data + 2, additional_data, total_len - 2);
+    }
+    int ret = esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_CLOSE, close_status_data, total_len, timeout);
+    free(close_status_data);
+    return ret;
+}
+
+
+static esp_err_t esp_websocket_client_close_with_optional_body(esp_websocket_client_handle_t client, bool send_body, int code, const char *data, int len, TickType_t timeout)
+{
+    if (client == NULL) {
+        return ESP_ERR_INVALID_ARG;
+    }
+    if (!client->run) {
+        ESP_LOGW(TAG, "Client was not started");
+        return ESP_FAIL;
+    }
+
+    if (send_body) {
+        esp_websocket_client_send_close(client, code, data, len + 2, portMAX_DELAY); // len + 2 -> always sending the code
+    } else {
+        esp_websocket_client_send_close(client, 0, NULL, 0, portMAX_DELAY); // only opcode frame
+    }
+
+    if (STOPPED_BIT & xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, timeout)) {
+        return ESP_OK;
+    }
+
+    // If could not close gracefully within timeout, stop the client and disconnect
+    client->run = false;
+    xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
+    client->state = WEBSOCKET_STATE_UNKNOW;
+    return ESP_OK;
+}
+
+esp_err_t esp_websocket_client_close_with_code(esp_websocket_client_handle_t client, int code, const char *data, int len, TickType_t timeout)
+{
+    return esp_websocket_client_close_with_optional_body(client, true, code, data, len, timeout);
+}
+
+esp_err_t esp_websocket_client_close(esp_websocket_client_handle_t client, TickType_t timeout)
+{
+    return esp_websocket_client_close_with_optional_body(client, false, 0, NULL, 0, timeout);
+}
 
 int esp_websocket_client_send_text(esp_websocket_client_handle_t client, const char *data, int len, TickType_t timeout)
 {
-    return esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_TEXT, data, len, timeout);
+    return esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_TEXT, (uint8_t *)data, len, timeout);
 }
 
 int esp_websocket_client_send(esp_websocket_client_handle_t client, const char *data, int len, TickType_t timeout)
 {
-    return esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_BINARY, data, len, timeout);
+    return esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_BINARY, (uint8_t *)data, len, timeout);
 }
 
 int esp_websocket_client_send_bin(esp_websocket_client_handle_t client, const char *data, int len, TickType_t timeout)
 {
-    return esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_BINARY, data, len, timeout);
+    return esp_websocket_client_send_with_opcode(client, WS_TRANSPORT_OPCODES_BINARY, (uint8_t *)data, len, timeout);
 }
 
-static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, const char *data, int len, TickType_t timeout)
+static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, uint8_t *data, int len, TickType_t timeout)
 {
     int need_write = len;
     int wlen = 0, widx = 0;
     int ret = ESP_FAIL;
 
-    if (client == NULL || data == NULL || len <= 0) {
+    if (client == NULL || len < 0 ||
+        (opcode != WS_TRANSPORT_OPCODES_CLOSE && (data == NULL || len <= 0))) {
         ESP_LOGE(TAG, "Invalid arguments");
         return ESP_FAIL;
     }
@@ -688,7 +756,7 @@ static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t c
         goto unlock_and_return;
     }
     uint32_t current_opcode = opcode;
-    while (widx < len) {
+    while (widx < len || current_opcode) {  // allow for sending "current_opcode" only massage with len==0
         if (need_write > client->buffer_size) {
             need_write = client->buffer_size;
         } else {
@@ -698,7 +766,7 @@ static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t c
         // send with ws specific way and specific opcode
         wlen = esp_transport_ws_send_raw(client->transport, current_opcode, (char *)client->tx_buffer, need_write,
                                         (timeout==portMAX_DELAY)? -1 : timeout * portTICK_PERIOD_MS);
-        if (wlen <= 0) {
+        if (wlen < 0 || (wlen == 0 && need_write != 0)) {
             ret = wlen;
             ESP_LOGE(TAG, "Network error: esp_transport_write() returned %d, errno=%d", ret, errno);
             esp_websocket_client_abort_connection(client);

+ 30 - 0
components/esp_websocket_client/include/esp_websocket_client.h

@@ -187,6 +187,36 @@ int esp_websocket_client_send_bin(esp_websocket_client_handle_t client, const ch
  */
 int esp_websocket_client_send_text(esp_websocket_client_handle_t client, const char *data, int len, TickType_t timeout);
 
+/**
+ * @brief      Close the WebSocket connection in a clean way
+ *
+ * Sequence of clean close initiated by client:
+ * * Client sends CLOSE frame
+ * * Client waits until server echos the CLOSE frame
+ * * Client waits until server closes the connection
+ * * Client is stopped the same way as by the `esp_websocket_client_stop()`
+ *
+ * @param[in]  client  The client
+ * @param[in]  timeout Timeout in RTOS ticks for waiting
+ *
+ * @return     esp_err_t
+ */
+esp_err_t esp_websocket_client_close(esp_websocket_client_handle_t client, TickType_t timeout);
+
+/**
+ * @brief      Close the WebSocket connection in a clean way with custom code/data
+ *             Closing sequence is the same as for esp_websocket_client_close()
+ *
+ * @param[in]  client  The client
+ * @param[in]  code    Close status code as defined in RFC6455 section-7.4
+ * @param[in]  data    Additional data to closing message
+ * @param[in]  len     The length of the additional data
+ * @param[in]  timeout Timeout in RTOS ticks for waiting
+ *
+ * @return     esp_err_t
+ */
+esp_err_t esp_websocket_client_close_with_code(esp_websocket_client_handle_t client, int code, const char *data, int len, TickType_t timeout);
+
 /**
  * @brief      Check the WebSocket client connection state
  *

+ 6 - 2
examples/protocols/websocket/main/websocket_example.c

@@ -69,7 +69,11 @@ static void websocket_event_handler(void *handler_args, esp_event_base_t base, i
     case WEBSOCKET_EVENT_DATA:
         ESP_LOGI(TAG, "WEBSOCKET_EVENT_DATA");
         ESP_LOGI(TAG, "Received opcode=%d", data->op_code);
-        ESP_LOGW(TAG, "Received=%.*s", data->data_len, (char *)data->data_ptr);
+        if (data->op_code == 0x08 && data->data_len == 2) {
+            ESP_LOGW(TAG, "Received closed message with code=%d", 256*data->data_ptr[0] + data->data_ptr[1]);
+        } else {
+            ESP_LOGW(TAG, "Received=%.*s", data->data_len, (char *)data->data_ptr);
+        }
         ESP_LOGW(TAG, "Total payload length=%d, data_len=%d, current payload offset=%d\r\n", data->payload_len, data->data_len, data->payload_offset);
 
         xTimerReset(shutdown_signal_timer, portMAX_DELAY);
@@ -121,7 +125,7 @@ static void websocket_app_start(void)
     }
 
     xSemaphoreTake(shutdown_sema, portMAX_DELAY);
-    esp_websocket_client_stop(client);
+    esp_websocket_client_close(client, portMAX_DELAY);
     ESP_LOGI(TAG, "Websocket Stopped");
     esp_websocket_client_destroy(client);
 }