Forráskód Böngészése

CI: Migrate socket example tests to pytest

Chen Yudong 3 éve
szülő
commit
2d006d488c

+ 0 - 8
.gitlab/ci/target-test.yml

@@ -869,14 +869,6 @@ example_test_001C:
     - ESP32
     - Example_GENERIC
 
-example_test_protocols:
-  extends:
-    - .example_test_esp32_template
-    - .rules:test:example_test-esp32-wifi
-  tags:
-    - ESP32
-    - wifi_router
-
 example_test_002:
   extends:
     - .example_test_esp32_template

+ 0 - 26
examples/protocols/sockets/non_blocking/example_test.py

@@ -1,26 +0,0 @@
-# This example code is in the Public Domain (or CC0 licensed, at your option.)
-
-# Unless required by applicable law or agreed to in writing, this
-# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-# CONDITIONS OF ANY KIND, either express or implied.
-
-# -*- coding: utf-8 -*-
-
-from __future__ import print_function, unicode_literals
-
-import re
-
-import ttfw_idf
-
-
-@ttfw_idf.idf_example_test(env_tag='Example_GENERIC')
-def test_examples_protocol_socket_non_block(env, _):
-    dut = env.get_dut('non_blocking_socket', 'examples/protocols/sockets/non_blocking', dut_class=ttfw_idf.ESP32DUT)
-
-    # start the test and expect the client to receive back it's original data
-    dut.start_app()
-    dut.expect(re.compile(r'nonblocking-socket-client: Received: GET / HTTP/1.1'), timeout=30)
-
-
-if __name__ == '__main__':
-    test_examples_protocol_socket_non_block()

+ 11 - 0
examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py

@@ -0,0 +1,11 @@
+# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Apache-2.0
+
+import pytest
+from pytest_embedded import Dut
+
+
+@pytest.mark.supported_targets
+@pytest.mark.generic
+def test_examples_non_block_socket_localhost(dut: Dut) -> None:
+    dut.expect(r'nonblocking-socket-client: Received: GET / HTTP/1.1', timeout=30)

+ 44 - 57
examples/protocols/sockets/tcp_client/example_test.py → examples/protocols/sockets/tcp_client/pytest_tcp_client.py

@@ -1,28 +1,20 @@
-# This example code is in the Public Domain (or CC0 licensed, at your option.)
+# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Apache-2.0
 
-# Unless required by applicable law or agreed to in writing, this
-# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-# CONDITIONS OF ANY KIND, either express or implied.
-
-# -*- coding: utf-8 -*-
-import os
-import re
+import logging
 import socket
-import sys
 from threading import Event, Thread
 
-import ttfw_idf
+import pytest
 from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
                                  get_my_interface_by_dest_ip)
+from pytest_embedded import Dut
 
-# -----------  Config  ----------
 PORT = 3333
-# -------------------------------
-
 
-class TcpServer:
 
-    def __init__(self, port, family_addr, persist=False):
+class TcpServer(object):
+    def __init__(self, port, family_addr, persist=False):  # type: ignore
         self.port = port
         self.socket = socket.socket(family_addr, socket.SOCK_STREAM)
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -31,7 +23,7 @@ class TcpServer:
         self.persist = persist
         self.family_addr = family_addr
 
-    def __enter__(self):
+    def __enter__(self):  # type: ignore
         try:
             self.socket.bind(('', self.port))
         except socket.error as e:
@@ -44,7 +36,7 @@ class TcpServer:
         self.server_thread.start()
         return self
 
-    def __exit__(self, exc_type, exc_value, traceback):
+    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
         if self.persist:
             sock = socket.socket(self.family_addr, socket.SOCK_STREAM)
             sock.connect(('localhost', self.port))
@@ -55,7 +47,7 @@ class TcpServer:
         self.server_thread.join()
         self.socket.close()
 
-    def run_server(self):
+    def run_server(self) -> None:
         while not self.shutdown.is_set():
             try:
                 conn, address = self.socket.accept()  # accept new connection
@@ -76,54 +68,49 @@ class TcpServer:
                 break
 
 
-@ttfw_idf.idf_example_test(env_tag='wifi_router')
-def test_examples_protocol_socket_tcpclient(env, extra_data):
-    """
-    steps:
-      1. join AP
-      2. have the board connect to the server
-      3. send and receive data
-    """
-    dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_client', dut_class=ttfw_idf.ESP32DUT)
-    # check and log bin size
-    binary_file = os.path.join(dut1.app.binary_path, 'tcp_client.bin')
-    bin_size = os.path.getsize(binary_file)
-    ttfw_idf.log_performance('tcp_client_bin_size', '{}KB'.format(bin_size // 1024))
-
-    # start test
-    dut1.start_app()
-    if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
-        dut1.expect('Please input ssid password:')
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_tcp_client_ipv4(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
         env_name = 'wifi_router'
         ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
         ap_password = get_env_config_variable(env_name, 'ap_password')
-        dut1.write(f'{ap_ssid} {ap_password}')
-
-    ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
-    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
-    ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
-    print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    print(f'Connected with IPv4={ipv4}')
 
-    my_interface = get_my_interface_by_dest_ip(ipv4)
     # test IPv4
     with TcpServer(PORT, socket.AF_INET):
         server_ip = get_host_ip4_by_dest_ip(ipv4)
         print('Connect tcp client to server IP={}'.format(server_ip))
-        dut1.write(server_ip)
-        dut1.expect(re.compile(r'OK: Message from ESP32'))
+        dut.write(server_ip)
+        dut.expect('OK: Message from ESP32')
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_tcp_client_ipv6(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
+    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
+    ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
+    print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
+
     # test IPv6
+    my_interface = get_my_interface_by_dest_ip(ipv4)
     with TcpServer(PORT, socket.AF_INET6):
         server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
         print('Connect tcp client to server IP={}'.format(server_ip))
-        dut1.write(server_ip)
-        dut1.expect(re.compile(r'OK: Message from ESP32'))
-
-
-if __name__ == '__main__':
-    if sys.argv[1:] and sys.argv[1].startswith('IPv'):     # if additional arguments provided:
-        # Usage: example_test.py <IPv4|IPv6>
-        family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET
-        with TcpServer(PORT, family_addr, persist=True) as s:
-            print(input('Press Enter stop the server...'))
-    else:
-        test_examples_protocol_socket_tcpclient()
+        dut.write(server_ip)
+        dut.expect('OK: Message from ESP32')

+ 0 - 96
examples/protocols/sockets/tcp_server/example_test.py

@@ -1,96 +0,0 @@
-# This example code is in the Public Domain (or CC0 licensed, at your option.)
-
-# Unless required by applicable law or agreed to in writing, this
-# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-# CONDITIONS OF ANY KIND, either express or implied.
-
-# -*- coding: utf-8 -*-
-
-from __future__ import print_function, unicode_literals
-
-import os
-import re
-import socket
-import sys
-
-import ttfw_idf
-from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
-
-# -----------  Config  ----------
-PORT = 3333
-# -------------------------------
-
-
-def tcp_client(address, payload):
-    for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
-                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
-        family_addr, socktype, proto, canonname, addr = res
-    try:
-        sock = socket.socket(family_addr, socket.SOCK_STREAM)
-        sock.settimeout(60.0)
-    except socket.error as msg:
-        print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1])
-        raise
-    try:
-        sock.connect(addr)
-    except socket.error as msg:
-        print('Could not open socket: ', msg)
-        sock.close()
-        raise
-    sock.sendall(payload.encode())
-    data = sock.recv(1024)
-    if not data:
-        return
-    print('Reply : ' + data.decode())
-    sock.close()
-    return data.decode()
-
-
-@ttfw_idf.idf_example_test(env_tag='wifi_router')
-def test_examples_protocol_socket_tcpserver(env, extra_data):
-    MESSAGE = 'Data to ESP'
-    """
-    steps:
-      1. join AP
-      2. have the board connect to the server
-      3. send and receive data
-    """
-    dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_server', dut_class=ttfw_idf.ESP32DUT)
-    # check and log bin size
-    binary_file = os.path.join(dut1.app.binary_path, 'tcp_server.bin')
-    bin_size = os.path.getsize(binary_file)
-    ttfw_idf.log_performance('tcp_server_bin_size', '{}KB'.format(bin_size // 1024))
-
-    # start test
-    dut1.start_app()
-    if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
-        dut1.expect('Please input ssid password:')
-        env_name = 'wifi_router'
-        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
-        ap_password = get_env_config_variable(env_name, 'ap_password')
-        dut1.write(f'{ap_ssid} {ap_password}')
-
-    ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
-    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
-    ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
-    print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
-
-    interface = get_my_interface_by_dest_ip(ipv4)
-    # test IPv4
-    received = tcp_client(ipv4, MESSAGE)
-    if not received == MESSAGE:
-        raise
-    dut1.expect(MESSAGE)
-    # test IPv6
-    received = tcp_client('{}%{}'.format(ipv6, interface), MESSAGE)
-    if not received == MESSAGE:
-        raise
-    dut1.expect(MESSAGE)
-
-
-if __name__ == '__main__':
-    if sys.argv[2:]:    # if two arguments provided:
-        # Usage: example_test.py <server_address> <message_to_send_to_server>
-        tcp_client(sys.argv[1], sys.argv[2])
-    else:               # otherwise run standard example test as in the CI
-        test_examples_protocol_socket_tcpserver()

+ 88 - 0
examples/protocols/sockets/tcp_server/pytest_tcp_server.py

@@ -0,0 +1,88 @@
+# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import os
+import socket
+import time
+
+import pytest
+from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
+from pytest_embedded import Dut
+
+PORT = 3333
+MESSAGE = 'Data to ESP'
+
+
+def tcp_client(address: str, payload: str) -> str:
+    for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
+                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
+        family_addr, socktype, proto, canonname, addr = res
+    try:
+        sock = socket.socket(family_addr, socket.SOCK_STREAM)
+        sock.settimeout(60.0)
+    except socket.error as msg:
+        print('Could not create socket')
+        print(os.strerror(msg.errno))
+        raise
+    try:
+        sock.connect(addr)
+    except socket.error as e:
+        print('Could not open socket: ' + str(e))
+        sock.close()
+        raise
+    sock.sendall(payload.encode())
+    data = sock.recv(1024)
+    if not data:
+        return ''
+    print('Reply : ' + data.decode())
+    sock.close()
+    return data.decode()
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_tcp_server_ipv4(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    print(f'Connected with IPv4={ipv4}')
+    time.sleep(1)
+
+    # test IPv4
+    received = tcp_client(ipv4, MESSAGE)
+    if not received == MESSAGE:
+        raise
+    dut.expect(MESSAGE)
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_tcp_server_ipv6(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
+    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
+    ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
+    print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
+    time.sleep(1)
+
+    interface = get_my_interface_by_dest_ip(ipv4)
+    # test IPv6
+    received = tcp_client('{}%{}'.format(ipv6, interface), MESSAGE)
+    if not received == MESSAGE:
+        raise
+    dut.expect(MESSAGE)

+ 0 - 138
examples/protocols/sockets/udp_client/example_test.py

@@ -1,138 +0,0 @@
-# This example code is in the Public Domain (or CC0 licensed, at your option.)
-
-# Unless required by applicable law or agreed to in writing, this
-# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-# CONDITIONS OF ANY KIND, either express or implied.
-
-# -*- coding: utf-8 -*-
-import os
-import re
-import socket
-import sys
-from threading import Event, Thread
-
-import ttfw_idf
-from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
-                                 get_my_interface_by_dest_ip)
-from tiny_test_fw.DUT import ExpectTimeout
-
-# -----------  Config  ----------
-PORT = 3333
-# -------------------------------
-
-
-class UdpServer:
-
-    def __init__(self, port, family_addr, persist=False):
-        self.port = port
-        self.family_addr = family_addr
-        self.socket = socket.socket(family_addr, socket.SOCK_DGRAM)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.settimeout(60.0)
-        self.shutdown = Event()
-        self.persist = persist
-
-    def __enter__(self):
-        try:
-            self.socket.bind(('', self.port))
-        except socket.error as e:
-            print('Bind failed:{}'.format(e))
-            raise
-
-        print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr))
-        self.server_thread = Thread(target=self.run_server)
-        self.server_thread.start()
-        return self
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        if self.persist:
-            sock = socket.socket(self.family_addr, socket.SOCK_DGRAM)
-            sock.sendto(b'Stop', ('localhost', self.port))
-            sock.close()
-            self.shutdown.set()
-        self.server_thread.join()
-        self.socket.close()
-
-    def run_server(self):
-        while not self.shutdown.is_set():
-            try:
-                data, addr = self.socket.recvfrom(1024)
-                print(addr)
-                if not data:
-                    return
-                data = data.decode()
-                print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data)
-                reply = 'OK: ' + data
-                self.socket.sendto(reply.encode(), addr)
-            except socket.error as e:
-                print('Running server failed:{}'.format(e))
-                raise
-            if not self.persist:
-                break
-
-
-@ttfw_idf.idf_example_test(env_tag='wifi_router')
-def test_examples_protocol_socket_udpclient(env, extra_data):
-    """
-    steps:
-      1. join AP
-      2. have the board connect to the server
-      3. send and receive data
-    """
-    dut1 = env.get_dut('udp_client', 'examples/protocols/sockets/udp_client', dut_class=ttfw_idf.ESP32DUT)
-    # check and log bin size
-    binary_file = os.path.join(dut1.app.binary_path, 'udp_client.bin')
-    bin_size = os.path.getsize(binary_file)
-    ttfw_idf.log_performance('udp_client_bin_size', '{}KB'.format(bin_size // 1024))
-
-    # start test
-    dut1.start_app()
-    if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
-        dut1.expect('Please input ssid password:')
-        env_name = 'wifi_router'
-        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
-        ap_password = get_env_config_variable(env_name, 'ap_password')
-        dut1.write(f'{ap_ssid} {ap_password}')
-
-    ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
-    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
-    ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
-    print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
-
-    my_interface = get_my_interface_by_dest_ip(ipv4)
-    # test IPv4
-    with UdpServer(PORT, socket.AF_INET):
-        server_ip = get_host_ip4_by_dest_ip(ipv4)
-        print('Connect udp client to server IP={}'.format(server_ip))
-        for _ in range(3):
-            try:
-                dut1.write(server_ip)
-                dut1.expect(re.compile(r'OK: Message from ESP32'))
-                break
-            except ExpectTimeout:
-                pass
-        else:
-            raise ValueError('Failed to send/recv udp packets.')
-    # test IPv6
-    with UdpServer(PORT, socket.AF_INET6):
-        server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
-        print('Connect udp client to server IP={}'.format(server_ip))
-        for _ in range(3):
-            try:
-                dut1.write(server_ip)
-                dut1.expect(re.compile(r'OK: Message from ESP32'))
-                break
-            except ExpectTimeout:
-                pass
-        else:
-            raise ValueError('Failed to send/recv udp packets.')
-
-
-if __name__ == '__main__':
-    if sys.argv[1:] and sys.argv[1].startswith('IPv'):     # if additional arguments provided:
-        # Usage: example_test.py <IPv4|IPv6>
-        family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET
-        with UdpServer(PORT, family_addr, persist=True) as s:
-            print(input('Press Enter stop the server...'))
-    else:
-        test_examples_protocol_socket_udpclient()

+ 127 - 0
examples/protocols/sockets/udp_client/pytest_udp_client.py

@@ -0,0 +1,127 @@
+# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import socket
+from threading import Event, Thread
+
+import pytest
+from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
+                                 get_my_interface_by_dest_ip)
+from pexpect.exceptions import TIMEOUT
+from pytest_embedded import Dut
+
+PORT = 3333
+MAX_RETRIES = 3
+
+
+class UdpServer:
+
+    def __init__(self, port, family_addr, persist=False):  # type: ignore
+        self.port = port
+        self.family_addr = family_addr
+        self.socket = socket.socket(family_addr, socket.SOCK_DGRAM)
+        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.socket.settimeout(60.0)
+        self.shutdown = Event()
+        self.persist = persist
+
+    def __enter__(self):  # type: ignore
+        try:
+            self.socket.bind(('', self.port))
+        except socket.error as e:
+            print('Bind failed:{}'.format(e))
+            raise
+
+        print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr))
+        self.server_thread = Thread(target=self.run_server)
+        self.server_thread.start()
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):  # type: ignore
+        if self.persist:
+            sock = socket.socket(self.family_addr, socket.SOCK_DGRAM)
+            sock.sendto(b'Stop', ('localhost', self.port))
+            sock.close()
+            self.shutdown.set()
+        self.server_thread.join()
+        self.socket.close()
+
+    def run_server(self) -> None:
+        while not self.shutdown.is_set():
+            try:
+                data, addr = self.socket.recvfrom(1024)
+                print(addr)
+                if not data:
+                    return
+                data = data.decode()
+                print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data)
+                reply = 'OK: ' + data
+                self.socket.sendto(reply.encode(), addr)
+            except socket.error as e:
+                print('Running server failed:{}'.format(e))
+                raise
+            if not self.persist:
+                break
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_udp_client_ipv4(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    print(f'Connected with IPv4={ipv4}')
+
+    # test IPv4
+    with UdpServer(PORT, socket.AF_INET):
+        server_ip = get_host_ip4_by_dest_ip(ipv4)
+        print('Connect udp client to server IP={}'.format(server_ip))
+        for _ in range(MAX_RETRIES):
+            try:
+                dut.write(server_ip)
+                dut.expect('OK: Message from ESP32')
+                break
+            except TIMEOUT:
+                pass
+        else:
+            raise ValueError('Failed to send/recv udp packets.')
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_udp_client_ipv6(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
+    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
+    ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
+    print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
+
+    interface = get_my_interface_by_dest_ip(ipv4)
+    # test IPv6
+    with UdpServer(PORT, socket.AF_INET6):
+        server_ip = get_host_ip6_by_dest_ip(ipv6, interface)
+        print('Connect udp client to server IP={}'.format(server_ip))
+        for _ in range(MAX_RETRIES):
+            try:
+                dut.write(server_ip)
+                dut.expect('OK: Message from ESP32')
+                break
+            except TIMEOUT:
+                pass
+        else:
+            raise ValueError('Failed to send/recv udpv6 packets.')

+ 0 - 112
examples/protocols/sockets/udp_server/example_test.py

@@ -1,112 +0,0 @@
-# This example code is in the Public Domain (or CC0 licensed, at your option.)
-
-# Unless required by applicable law or agreed to in writing, this
-# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-# CONDITIONS OF ANY KIND, either express or implied.
-
-# -*- coding: utf-8 -*-
-
-from __future__ import print_function, unicode_literals
-
-import os
-import re
-import socket
-import sys
-
-import ttfw_idf
-from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
-
-# -----------  Config  ----------
-PORT = 3333
-# -------------------------------
-
-
-def udp_client(address, payload):
-    for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
-                                  socket.SOCK_DGRAM, 0, socket.AI_PASSIVE):
-        family_addr, socktype, proto, canonname, addr = res
-    try:
-        sock = socket.socket(family_addr, socket.SOCK_DGRAM)
-        sock.settimeout(20.0)
-    except socket.error as msg:
-        print('Could not create socket')
-        print(os.strerror(msg.errno))
-        raise
-    try:
-        sock.sendto(payload.encode(), addr)
-        reply, addr = sock.recvfrom(128)
-        if not reply:
-            return
-        print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
-    except socket.timeout:
-        print('Socket operation timeout')
-        return str(None)
-    except socket.error as msg:
-        print('Error while sending or receiving data from the socket')
-        print(os.strerror(msg.errno))
-        sock.close()
-        raise
-    return reply.decode()
-
-
-@ttfw_idf.idf_example_test(env_tag='wifi_router')
-def test_examples_protocol_socket_udpserver(env, extra_data):
-    MESSAGE = 'Data to ESP'
-    MAX_RETRIES = 3
-    """
-    steps:
-      1. join AP
-      2. have the board connect to the server
-      3. send and receive data
-    """
-    dut1 = env.get_dut('udp_server', 'examples/protocols/sockets/udp_server', dut_class=ttfw_idf.ESP32DUT)
-    # check and log bin size
-    binary_file = os.path.join(dut1.app.binary_path, 'udp_server.bin')
-    bin_size = os.path.getsize(binary_file)
-    ttfw_idf.log_performance('udp_server_bin_size', '{}KB'.format(bin_size // 1024))
-
-    # start test
-    dut1.start_app()
-    if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
-        dut1.expect('Please input ssid password:')
-        env_name = 'wifi_router'
-        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
-        ap_password = get_env_config_variable(env_name, 'ap_password')
-        dut1.write(f'{ap_ssid} {ap_password}')
-
-    ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
-    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
-    ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
-    print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
-    dut1.expect(re.compile(r'Waiting for data'), timeout=10)
-
-    interface = get_my_interface_by_dest_ip(ipv4)
-    # test IPv4
-    for _ in range(MAX_RETRIES):
-        print('Testing UDP on IPv4...')
-        received = udp_client(ipv4, MESSAGE)
-        if received == MESSAGE:
-            print('OK')
-            break
-    else:
-        raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
-    dut1.expect(MESSAGE)
-
-    # test IPv6
-    for _ in range(MAX_RETRIES):
-        print('Testing UDP on IPv6...')
-        received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE)
-        if received == MESSAGE:
-            print('OK')
-            break
-    else:
-        raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
-    dut1.expect(MESSAGE)
-
-
-if __name__ == '__main__':
-    if sys.argv[2:]:    # if two arguments provided:
-        # Usage: example_test.py <server_address> <message_to_send_to_server>
-        udp_client(sys.argv[1], sys.argv[2])
-    else:               # otherwise run standard example test as in the CI
-        test_examples_protocol_socket_udpserver()

+ 98 - 0
examples/protocols/sockets/udp_server/pytest_udp_server.py

@@ -0,0 +1,98 @@
+# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import os
+import socket
+
+import pytest
+from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
+from pytest_embedded import Dut
+
+PORT = 3333
+MESSAGE = 'Data to ESP'
+MAX_RETRIES = 3
+
+
+def udp_client(address: str, payload: str) -> str:
+    for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
+                                  socket.SOCK_DGRAM, 0, socket.AI_PASSIVE):
+        family_addr, socktype, proto, canonname, addr = res
+    try:
+        sock = socket.socket(family_addr, socket.SOCK_DGRAM)
+        sock.settimeout(20.0)
+    except socket.error as msg:
+        print('Could not create socket')
+        print(os.strerror(msg.errno))
+        raise
+    try:
+        sock.sendto(payload.encode(), addr)
+        reply, addr = sock.recvfrom(128)
+        if not reply:
+            return ''
+        print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
+    except socket.timeout:
+        print('Socket operation timeout')
+        return ''
+    except socket.error as msg:
+        print('Error while sending or receiving data from the socket')
+        print(os.strerror(msg.errno))
+        sock.close()
+        raise
+    return reply.decode()
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_udp_server_ipv4(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    print(f'Connected with IPv4={ipv4}')
+
+    # test IPv4
+    for _ in range(MAX_RETRIES):
+        print('Testing UDP on IPv4...')
+        received = udp_client(ipv4, MESSAGE)
+        if received == MESSAGE:
+            print('OK')
+            break
+    else:
+        raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
+    dut.expect(MESSAGE)
+
+
+@pytest.mark.esp32
+@pytest.mark.wifi_router
+def test_examples_udp_server_ipv6(dut: Dut) -> None:
+    # Parse IP address of STA
+    logging.info('Waiting to connect with AP')
+    if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
+        dut.expect('Please input ssid password:')
+        env_name = 'wifi_router'
+        ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
+        ap_password = get_env_config_variable(env_name, 'ap_password')
+        dut.write(f'{ap_ssid} {ap_password}')
+    ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
+    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
+    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
+    ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
+    print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
+
+    interface = get_my_interface_by_dest_ip(ipv4)
+    # test IPv6
+    for _ in range(MAX_RETRIES):
+        print('Testing UDP on IPv6...')
+        received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE)
+        if received == MESSAGE:
+            print('OK')
+            break
+    else:
+        raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
+    dut.expect(MESSAGE)