pytest_udp_server.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import pytest
  5. from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
  6. from pytest_embedded import Dut
  7. try:
  8. from run_udp_client import udp_client
  9. except ImportError:
  10. import os
  11. import sys
  12. sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
  13. from run_udp_client import udp_client
  14. PORT = 3333
  15. MESSAGE = 'Data to ESP'
  16. MAX_RETRIES = 3
  17. @pytest.mark.esp32
  18. @pytest.mark.esp32c3
  19. @pytest.mark.esp32s3
  20. @pytest.mark.wifi_router
  21. def test_examples_udp_server_ipv4(dut: Dut) -> None:
  22. # Parse IP address of STA
  23. logging.info('Waiting to connect with AP')
  24. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  25. dut.expect('Please input ssid password:')
  26. env_name = 'wifi_router'
  27. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  28. ap_password = get_env_config_variable(env_name, 'ap_password')
  29. dut.write(f'{ap_ssid} {ap_password}')
  30. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  31. print(f'Connected with IPv4={ipv4}')
  32. # test IPv4
  33. for _ in range(MAX_RETRIES):
  34. print('Testing UDP on IPv4...')
  35. received = udp_client(ipv4, PORT, MESSAGE)
  36. if received == MESSAGE:
  37. print('OK')
  38. break
  39. else:
  40. raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
  41. dut.expect(MESSAGE)
  42. @pytest.mark.esp32
  43. @pytest.mark.esp32c3
  44. @pytest.mark.esp32s3
  45. @pytest.mark.wifi_router
  46. def test_examples_udp_server_ipv6(dut: Dut) -> None:
  47. # Parse IP address of STA
  48. logging.info('Waiting to connect with AP')
  49. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  50. dut.expect('Please input ssid password:')
  51. env_name = 'wifi_router'
  52. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  53. ap_password = get_env_config_variable(env_name, 'ap_password')
  54. dut.write(f'{ap_ssid} {ap_password}')
  55. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  56. # expect all 8 octets from IPv6 (assumes it's printed in the long form)
  57. ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
  58. ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
  59. print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
  60. interface = get_my_interface_by_dest_ip(ipv4)
  61. # test IPv6
  62. for _ in range(MAX_RETRIES):
  63. print('Testing UDP on IPv6...')
  64. received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
  65. if received == MESSAGE:
  66. print('OK')
  67. break
  68. else:
  69. raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
  70. dut.expect(MESSAGE)