pytest_https_wss_server.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. from __future__ import division, print_function, unicode_literals
  6. import logging
  7. import os
  8. import threading
  9. import time
  10. from types import TracebackType
  11. from typing import Any, Optional
  12. import pytest
  13. import websocket
  14. from pytest_embedded import Dut
  15. OPCODE_TEXT = 0x1
  16. OPCODE_BIN = 0x2
  17. OPCODE_PING = 0x9
  18. OPCODE_PONG = 0xa
  19. CORRECT_ASYNC_DATA = 'Hello client'
  20. class WsClient:
  21. def __init__(self, ip, port, ca_file): # type: (str, int, str) -> None
  22. self.port = port
  23. self.ip = ip
  24. sslopt = {'ca_certs':ca_file, 'check_hostname': False}
  25. self.ws = websocket.WebSocket(sslopt=sslopt)
  26. # Set timeout to 10 seconds to avoid conection failure at the time of handshake
  27. self.ws.settimeout(10)
  28. def __enter__(self): # type: ignore
  29. self.ws.connect('wss://{}:{}/ws'.format(self.ip, self.port))
  30. return self
  31. def __exit__(self, exc_type, exc_value, traceback): # type: (type, RuntimeError, TracebackType) -> None
  32. self.ws.close()
  33. def read(self): # type: () -> Any
  34. return self.ws.recv_data(control_frame=True)
  35. def write(self, data, opcode=OPCODE_TEXT): # type: (str, int) -> Any
  36. if opcode == OPCODE_PING:
  37. return self.ws.ping(data)
  38. if opcode == OPCODE_PONG:
  39. return self.ws.pong(data)
  40. return self.ws.send(data)
  41. class wss_client_thread(threading.Thread):
  42. def __init__(self, ip, port, ca_file): # type: (str, int, str) -> None
  43. threading.Thread.__init__(self)
  44. self.ip = ip
  45. self.port = port
  46. self.ca_file = ca_file
  47. self.start_time = time.time()
  48. self.exc = None
  49. self.data = 'Espressif'
  50. self.async_response = False
  51. def run(self): # type: () -> None
  52. with WsClient(self.ip, self.port, self.ca_file) as ws:
  53. while True:
  54. try:
  55. opcode, data = ws.read()
  56. data = data.decode('UTF-8')
  57. if opcode == OPCODE_PING:
  58. ws.write(data=self.data, opcode=OPCODE_PONG)
  59. if opcode == OPCODE_TEXT:
  60. if data == CORRECT_ASYNC_DATA:
  61. self.async_response = True
  62. logging.info('Thread {} obtained correct async message'.format(self.name))
  63. # Keep sending pong to update the keepalive in the server
  64. if (time.time() - self.start_time) > 20:
  65. break
  66. except Exception as e:
  67. logging.info('Failed to connect to the client and read async data')
  68. self.exc = e # type: ignore
  69. if self.async_response is not True:
  70. self.exc = RuntimeError('Failed to obtain correct async data') # type: ignore
  71. def join(self, timeout=0): # type:(Optional[float]) -> None
  72. threading.Thread.join(self)
  73. if self.exc:
  74. raise self.exc
  75. def test_multiple_client_keep_alive_and_async_response(ip, port, ca_file): # type: (str, int, str) -> None
  76. threads = []
  77. for _ in range(3):
  78. try:
  79. thread = wss_client_thread(ip, port, ca_file)
  80. thread.start()
  81. threads.append(thread)
  82. except OSError:
  83. logging.info('Error: unable to start thread')
  84. # keep delay of 5 seconds between two connections to avoid handshake timeout
  85. time.sleep(5)
  86. for t in threads:
  87. t.join()
  88. @pytest.mark.esp32
  89. @pytest.mark.esp32c3
  90. @pytest.mark.esp32s2
  91. @pytest.mark.esp32s3
  92. @pytest.mark.wifi
  93. def test_examples_protocol_https_wss_server(dut: Dut) -> None:
  94. # Get binary file
  95. binary_file = os.path.join(dut.app.binary_path, 'wss_server.bin')
  96. bin_size = os.path.getsize(binary_file)
  97. logging.info('https_wss_server_bin_size : {}KB'.format(bin_size // 1024))
  98. logging.info('Starting wss_server test app')
  99. # Parse IP address of STA
  100. got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
  101. logging.info('Waiting to connect with AP')
  102. got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
  103. logging.info('Got IP : {}'.format(got_ip))
  104. logging.info('Got Port : {}'.format(got_port))
  105. ca_file = os.path.join(os.path.dirname(__file__), 'main', 'certs', 'servercert.pem')
  106. # Start ws server test
  107. with WsClient(got_ip, int(got_port), ca_file) as ws:
  108. # Check for echo
  109. DATA = 'Espressif'
  110. dut.expect('performing session handshake')
  111. client_fd = int(dut.expect(r'New client connected (\d+)', timeout=30)[1].decode())
  112. ws.write(data=DATA, opcode=OPCODE_TEXT)
  113. dut.expect(r'Received packet with message: {}'.format(DATA))
  114. opcode, data = ws.read()
  115. data = data.decode('UTF-8')
  116. if data != DATA:
  117. raise RuntimeError('Failed to receive the correct echo response')
  118. logging.info('Correct echo response obtained from the wss server')
  119. # Test for PING
  120. logging.info('Testing for send PING')
  121. ws.write(data=DATA, opcode=OPCODE_PING)
  122. dut.expect('Got a WS PING frame, Replying PONG')
  123. opcode, data = ws.read()
  124. data = data.decode('UTF-8')
  125. if data != DATA or opcode != OPCODE_PONG:
  126. raise RuntimeError('Failed to receive the PONG response')
  127. logging.info('Passed the test for PING')
  128. # Test for keepalive
  129. logging.info('Testing for keep alive (approx time = 20s)')
  130. start_time = time.time()
  131. while True:
  132. try:
  133. opcode, data = ws.read()
  134. if opcode == OPCODE_PING:
  135. ws.write(data='Espressif', opcode=OPCODE_PONG)
  136. logging.info('Received PING, replying PONG (to update the keepalive)')
  137. # Keep sending pong to update the keepalive in the server
  138. if (time.time() - start_time) > 20:
  139. break
  140. except Exception:
  141. logging.info('Failed the test for keep alive,\nthe client got abruptly disconnected')
  142. raise
  143. # keepalive timeout is 10 seconds so do not respond for (10 + 1) senconds
  144. logging.info('Testing if client is disconnected if it does not respond for 10s i.e. keep_alive timeout (approx time = 11s)')
  145. try:
  146. dut.expect('Client not alive, closing fd {}'.format(client_fd), timeout=20)
  147. dut.expect('Client disconnected {}'.format(client_fd))
  148. except Exception:
  149. logging.info('ENV_ERROR:Failed the test for keep alive,\nthe connection was not closed after timeout')
  150. time.sleep(11)
  151. logging.info('Passed the test for keep alive')
  152. # Test keep alive and async response for multiple simultaneous client connections
  153. logging.info('Testing for multiple simultaneous client connections (approx time = 30s)')
  154. test_multiple_client_keep_alive_and_async_response(got_ip, int(got_port), ca_file)
  155. logging.info('Passed the test for multiple simultaneous client connections')