pytest_udp_server.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import pytest
  5. from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
  6. from pytest_embedded import Dut
  7. try:
  8. from run_udp_client import udp_client
  9. except ImportError:
  10. import os
  11. import sys
  12. sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
  13. from run_udp_client import udp_client
  14. PORT = 3333
  15. MESSAGE = 'Data to ESP'
  16. MAX_RETRIES = 3
  17. @pytest.mark.esp32
  18. @pytest.mark.esp32s2
  19. @pytest.mark.esp32c2
  20. @pytest.mark.esp32c3
  21. @pytest.mark.esp32s3
  22. @pytest.mark.esp32c6
  23. @pytest.mark.wifi_router
  24. def test_examples_udp_server_ipv4(dut: Dut) -> None:
  25. # Parse IP address of STA
  26. logging.info('Waiting to connect with AP')
  27. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  28. dut.expect('Please input ssid password:')
  29. env_name = 'wifi_router'
  30. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  31. ap_password = get_env_config_variable(env_name, 'ap_password')
  32. dut.write(f'{ap_ssid} {ap_password}')
  33. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  34. print(f'Connected with IPv4={ipv4}')
  35. # test IPv4
  36. for _ in range(MAX_RETRIES):
  37. print('Testing UDP on IPv4...')
  38. received = udp_client(ipv4, PORT, MESSAGE)
  39. if received == MESSAGE:
  40. print('OK')
  41. break
  42. else:
  43. raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
  44. dut.expect(MESSAGE)
  45. @pytest.mark.esp32
  46. @pytest.mark.esp32s2
  47. @pytest.mark.esp32c2
  48. @pytest.mark.esp32c3
  49. @pytest.mark.esp32s3
  50. @pytest.mark.esp32c6
  51. @pytest.mark.wifi_router
  52. def test_examples_udp_server_ipv6(dut: Dut) -> None:
  53. # Parse IP address of STA
  54. logging.info('Waiting to connect with AP')
  55. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  56. dut.expect('Please input ssid password:')
  57. env_name = 'wifi_router'
  58. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  59. ap_password = get_env_config_variable(env_name, 'ap_password')
  60. dut.write(f'{ap_ssid} {ap_password}')
  61. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  62. # expect all 8 octets from IPv6 (assumes it's printed in the long form)
  63. ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
  64. ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
  65. print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
  66. interface = get_my_interface_by_dest_ip(ipv4)
  67. # test IPv6
  68. for _ in range(MAX_RETRIES):
  69. print('Testing UDP on IPv6...')
  70. received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
  71. if received == MESSAGE:
  72. print('OK')
  73. break
  74. else:
  75. raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
  76. dut.expect(MESSAGE)