pytest_udp_server.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import pytest
  5. from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
  6. from pytest_embedded import Dut
  7. try:
  8. from run_udp_client import udp_client
  9. except ImportError:
  10. import os
  11. import sys
  12. sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
  13. from run_udp_client import udp_client
  14. PORT = 3333
  15. MESSAGE = 'Data to ESP'
  16. MAX_RETRIES = 3
  17. @pytest.mark.esp32
  18. @pytest.mark.wifi_router
  19. def test_examples_udp_server_ipv4(dut: Dut) -> None:
  20. # Parse IP address of STA
  21. logging.info('Waiting to connect with AP')
  22. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  23. dut.expect('Please input ssid password:')
  24. env_name = 'wifi_router'
  25. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  26. ap_password = get_env_config_variable(env_name, 'ap_password')
  27. dut.write(f'{ap_ssid} {ap_password}')
  28. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  29. print(f'Connected with IPv4={ipv4}')
  30. # test IPv4
  31. for _ in range(MAX_RETRIES):
  32. print('Testing UDP on IPv4...')
  33. received = udp_client(ipv4, PORT, MESSAGE)
  34. if received == MESSAGE:
  35. print('OK')
  36. break
  37. else:
  38. raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
  39. dut.expect(MESSAGE)
  40. @pytest.mark.esp32
  41. @pytest.mark.wifi_router
  42. def test_examples_udp_server_ipv6(dut: Dut) -> None:
  43. # Parse IP address of STA
  44. logging.info('Waiting to connect with AP')
  45. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  46. dut.expect('Please input ssid password:')
  47. env_name = 'wifi_router'
  48. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  49. ap_password = get_env_config_variable(env_name, 'ap_password')
  50. dut.write(f'{ap_ssid} {ap_password}')
  51. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  52. # expect all 8 octets from IPv6 (assumes it's printed in the long form)
  53. ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
  54. ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
  55. print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
  56. interface = get_my_interface_by_dest_ip(ipv4)
  57. # test IPv6
  58. for _ in range(MAX_RETRIES):
  59. print('Testing UDP on IPv6...')
  60. received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
  61. if received == MESSAGE:
  62. print('OK')
  63. break
  64. else:
  65. raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
  66. dut.expect(MESSAGE)