pytest_udp_client.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import socket
  5. import pytest
  6. from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
  7. get_my_interface_by_dest_ip)
  8. from pexpect.exceptions import TIMEOUT
  9. from pytest_embedded import Dut
  10. try:
  11. from run_udp_server import UdpServer
  12. except ImportError:
  13. import os
  14. import sys
  15. sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
  16. from run_udp_server import UdpServer
  17. PORT = 3333
  18. MAX_RETRIES = 3
  19. @pytest.mark.esp32
  20. @pytest.mark.esp32s2
  21. @pytest.mark.esp32c2
  22. @pytest.mark.esp32c3
  23. @pytest.mark.esp32s3
  24. @pytest.mark.esp32c6
  25. @pytest.mark.wifi_router
  26. def test_examples_udp_client_ipv4(dut: Dut) -> None:
  27. # Parse IP address of STA
  28. logging.info('Waiting to connect with AP')
  29. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  30. dut.expect('Please input ssid password:')
  31. env_name = 'wifi_router'
  32. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  33. ap_password = get_env_config_variable(env_name, 'ap_password')
  34. dut.write(f'{ap_ssid} {ap_password}')
  35. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  36. print(f'Connected with IPv4={ipv4}')
  37. # test IPv4
  38. with UdpServer(PORT, socket.AF_INET):
  39. server_ip = get_host_ip4_by_dest_ip(ipv4)
  40. print('Connect udp client to server IP={}'.format(server_ip))
  41. for _ in range(MAX_RETRIES):
  42. try:
  43. dut.write(server_ip)
  44. dut.expect('OK: Message from ESP32')
  45. break
  46. except TIMEOUT:
  47. pass
  48. else:
  49. raise ValueError('Failed to send/recv udp packets.')
  50. @pytest.mark.esp32
  51. @pytest.mark.esp32s2
  52. @pytest.mark.esp32c2
  53. @pytest.mark.esp32c3
  54. @pytest.mark.esp32s3
  55. @pytest.mark.esp32c6
  56. @pytest.mark.wifi_router
  57. def test_examples_udp_client_ipv6(dut: Dut) -> None:
  58. # Parse IP address of STA
  59. logging.info('Waiting to connect with AP')
  60. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  61. dut.expect('Please input ssid password:')
  62. env_name = 'wifi_router'
  63. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  64. ap_password = get_env_config_variable(env_name, 'ap_password')
  65. dut.write(f'{ap_ssid} {ap_password}')
  66. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  67. # expect all 8 octets from IPv6 (assumes it's printed in the long form)
  68. ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
  69. ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
  70. print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
  71. interface = get_my_interface_by_dest_ip(ipv4)
  72. # test IPv6
  73. with UdpServer(PORT, socket.AF_INET6):
  74. server_ip = get_host_ip6_by_dest_ip(ipv6, interface)
  75. print('Connect udp client to server IP={}'.format(server_ip))
  76. for _ in range(MAX_RETRIES):
  77. try:
  78. dut.write(server_ip)
  79. dut.expect('OK: Message from ESP32')
  80. break
  81. except TIMEOUT:
  82. pass
  83. else:
  84. raise ValueError('Failed to send/recv udpv6 packets.')