pytest_udp_client.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import socket
  5. import pytest
  6. from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
  7. get_my_interface_by_dest_ip)
  8. from pexpect.exceptions import TIMEOUT
  9. from pytest_embedded import Dut
  10. try:
  11. from run_udp_server import UdpServer
  12. except ImportError:
  13. import os
  14. import sys
  15. sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
  16. from run_udp_server import UdpServer
  17. PORT = 3333
  18. MAX_RETRIES = 3
  19. @pytest.mark.esp32
  20. @pytest.mark.esp32c3
  21. @pytest.mark.esp32s3
  22. @pytest.mark.wifi_router
  23. def test_examples_udp_client_ipv4(dut: Dut) -> None:
  24. # Parse IP address of STA
  25. logging.info('Waiting to connect with AP')
  26. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  27. dut.expect('Please input ssid password:')
  28. env_name = 'wifi_router'
  29. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  30. ap_password = get_env_config_variable(env_name, 'ap_password')
  31. dut.write(f'{ap_ssid} {ap_password}')
  32. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  33. print(f'Connected with IPv4={ipv4}')
  34. # test IPv4
  35. with UdpServer(PORT, socket.AF_INET):
  36. server_ip = get_host_ip4_by_dest_ip(ipv4)
  37. print('Connect udp client to server IP={}'.format(server_ip))
  38. for _ in range(MAX_RETRIES):
  39. try:
  40. dut.write(server_ip)
  41. dut.expect('OK: Message from ESP32')
  42. break
  43. except TIMEOUT:
  44. pass
  45. else:
  46. raise ValueError('Failed to send/recv udp packets.')
  47. @pytest.mark.esp32
  48. @pytest.mark.esp32c3
  49. @pytest.mark.esp32s3
  50. @pytest.mark.wifi_router
  51. def test_examples_udp_client_ipv6(dut: Dut) -> None:
  52. # Parse IP address of STA
  53. logging.info('Waiting to connect with AP')
  54. if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
  55. dut.expect('Please input ssid password:')
  56. env_name = 'wifi_router'
  57. ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
  58. ap_password = get_env_config_variable(env_name, 'ap_password')
  59. dut.write(f'{ap_ssid} {ap_password}')
  60. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  61. # expect all 8 octets from IPv6 (assumes it's printed in the long form)
  62. ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
  63. ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
  64. print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
  65. interface = get_my_interface_by_dest_ip(ipv4)
  66. # test IPv6
  67. with UdpServer(PORT, socket.AF_INET6):
  68. server_ip = get_host_ip6_by_dest_ip(ipv6, interface)
  69. print('Connect udp client to server IP={}'.format(server_ip))
  70. for _ in range(MAX_RETRIES):
  71. try:
  72. dut.write(server_ip)
  73. dut.expect('OK: Message from ESP32')
  74. break
  75. except TIMEOUT:
  76. pass
  77. else:
  78. raise ValueError('Failed to send/recv udpv6 packets.')