pytest_https_wss_server.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. from __future__ import division, print_function, unicode_literals
  6. import logging
  7. import os
  8. import threading
  9. import time
  10. from types import TracebackType
  11. from typing import Any, Optional
  12. import pytest
  13. import websocket
  14. from common_test_methods import get_env_config_variable
  15. from pytest_embedded import Dut
  16. OPCODE_TEXT = 0x1
  17. OPCODE_BIN = 0x2
  18. OPCODE_PING = 0x9
  19. OPCODE_PONG = 0xa
  20. CORRECT_ASYNC_DATA = 'Hello client'
  21. class WsClient:
  22. def __init__(self, ip, port, ca_file): # type: (str, int, str) -> None
  23. self.port = port
  24. self.ip = ip
  25. sslopt = {'ca_certs':ca_file, 'check_hostname': False}
  26. self.ws = websocket.WebSocket(sslopt=sslopt)
  27. # Set timeout to 10 seconds to avoid conection failure at the time of handshake
  28. self.ws.settimeout(10)
  29. def __enter__(self): # type: ignore
  30. self.ws.connect('wss://{}:{}/ws'.format(self.ip, self.port))
  31. return self
  32. def __exit__(self, exc_type, exc_value, traceback): # type: (type, RuntimeError, TracebackType) -> None
  33. self.ws.close()
  34. def read(self): # type: () -> Any
  35. return self.ws.recv_data(control_frame=True)
  36. def write(self, data, opcode=OPCODE_TEXT): # type: (str, int) -> Any
  37. if opcode == OPCODE_PING:
  38. return self.ws.ping(data)
  39. if opcode == OPCODE_PONG:
  40. return self.ws.pong(data)
  41. return self.ws.send(data)
  42. class wss_client_thread(threading.Thread):
  43. def __init__(self, ip, port, ca_file): # type: (str, int, str) -> None
  44. threading.Thread.__init__(self)
  45. self.ip = ip
  46. self.port = port
  47. self.ca_file = ca_file
  48. self.start_time = time.time()
  49. self.exc = None
  50. self.data = 'Espressif'
  51. self.async_response = False
  52. def run(self): # type: () -> None
  53. with WsClient(self.ip, self.port, self.ca_file) as ws:
  54. while True:
  55. try:
  56. opcode, data = ws.read()
  57. data = data.decode('UTF-8')
  58. if opcode == OPCODE_PING:
  59. ws.write(data=self.data, opcode=OPCODE_PONG)
  60. if opcode == OPCODE_TEXT:
  61. if data == CORRECT_ASYNC_DATA:
  62. self.async_response = True
  63. logging.info('Thread {} obtained correct async message'.format(self.name))
  64. # Keep sending pong to update the keepalive in the server
  65. if (time.time() - self.start_time) > 20:
  66. break
  67. except Exception as e:
  68. logging.info('Failed to connect to the client and read async data')
  69. self.exc = e # type: ignore
  70. if self.async_response is not True:
  71. self.exc = RuntimeError('Failed to obtain correct async data') # type: ignore
  72. def join(self, timeout=0): # type:(Optional[float]) -> None
  73. threading.Thread.join(self)
  74. if self.exc:
  75. raise self.exc
  76. def test_multiple_client_keep_alive_and_async_response(ip, port, ca_file): # type: (str, int, str) -> None
  77. threads = []
  78. for _ in range(3):
  79. try:
  80. thread = wss_client_thread(ip, port, ca_file)
  81. thread.start()
  82. threads.append(thread)
  83. except OSError:
  84. logging.info('Error: unable to start thread')
  85. # keep delay of 5 seconds between two connections to avoid handshake timeout
  86. time.sleep(5)
  87. for t in threads:
  88. t.join()
  89. @pytest.mark.esp32
  90. @pytest.mark.esp32c3
  91. @pytest.mark.esp32s2
  92. @pytest.mark.esp32s3
  93. @pytest.mark.wifi_router
  94. def test_examples_protocol_https_wss_server(dut: Dut) -> None:
  95. # Get binary file
  96. binary_file = os.path.join(dut.app.binary_path, 'wss_server.bin')
  97. bin_size = os.path.getsize(binary_file)
  98. logging.info('https_wss_server_bin_size : {}KB'.format(bin_size // 1024))
  99. logging.info('Starting wss_server test app')
  100. logging.info('Waiting to connect with AP')
  101. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  102. dut.expect('Please input ssid password:')
  103. env_name = 'wifi_router'
  104. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  105. ap_password = get_env_config_variable(env_name, 'ap_password')
  106. dut.write(f'{ap_ssid} {ap_password}')
  107. # Parse IP address of STA
  108. got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
  109. got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  110. logging.info('Got IP : {}'.format(got_ip))
  111. logging.info('Got Port : {}'.format(got_port))
  112. ca_file = os.path.join(os.path.dirname(__file__), 'main', 'certs', 'servercert.pem')
  113. # Start ws server test
  114. with WsClient(got_ip, int(got_port), ca_file) as ws:
  115. # Check for echo
  116. DATA = 'Espressif'
  117. dut.expect('performing session handshake')
  118. client_fd = int(dut.expect(r'New client connected (\d+)', timeout=30)[1].decode())
  119. ws.write(data=DATA, opcode=OPCODE_TEXT)
  120. dut.expect(r'Received packet with message: {}'.format(DATA))
  121. opcode, data = ws.read()
  122. data = data.decode('UTF-8')
  123. if data != DATA:
  124. raise RuntimeError(f'Failed to receive the correct echo response.')
  125. logging.info('Correct echo response obtained from the wss server')
  126. # Test for PING
  127. logging.info('Testing for send PING')
  128. ws.write(data=DATA, opcode=OPCODE_PING)
  129. dut.expect('Got a WS PING frame, Replying PONG')
  130. opcode, data = ws.read()
  131. data = data.decode('UTF-8')
  132. if data != DATA or opcode != OPCODE_PONG:
  133. raise RuntimeError('Failed to receive the PONG response')
  134. logging.info('Passed the test for PING')
  135. # Test for keepalive
  136. logging.info('Testing for keep alive (approx time = 20s)')
  137. start_time = time.time()
  138. while True:
  139. try:
  140. opcode, data = ws.read()
  141. if opcode == OPCODE_PING:
  142. ws.write(data='Espressif', opcode=OPCODE_PONG)
  143. logging.info('Received PING, replying PONG (to update the keepalive)')
  144. # Keep sending pong to update the keepalive in the server
  145. if (time.time() - start_time) > 20:
  146. break
  147. except Exception:
  148. logging.info('Failed the test for keep alive,\nthe client got abruptly disconnected')
  149. raise
  150. # keepalive timeout is 10 seconds so do not respond for (10 + 1) senconds
  151. logging.info('Testing if client is disconnected if it does not respond for 10s i.e. keep_alive timeout (approx time = 11s)')
  152. try:
  153. dut.expect('Client not alive, closing fd {}'.format(client_fd), timeout=20)
  154. dut.expect('Client disconnected {}'.format(client_fd))
  155. except Exception:
  156. logging.info('ENV_ERROR:Failed the test for keep alive,\nthe connection was not closed after timeout')
  157. time.sleep(11)
  158. logging.info('Passed the test for keep alive')
  159. # Test keep alive and async response for multiple simultaneous client connections
  160. logging.info('Testing for multiple simultaneous client connections (approx time = 30s)')
  161. test_multiple_client_keep_alive_and_async_response(got_ip, int(got_port), ca_file)
  162. logging.info('Passed the test for multiple simultaneous client connections')