Sfoglia il codice sorgente

Merge branch 'feature/iperf_pytest_migration' into 'master'

iperf pytest migration

Closes IDFCI-1143

See merge request espressif/esp-idf!21726
Ondrej Kosta 3 anni fa
parent
commit
5da702bab3

+ 15 - 22
.gitlab/ci/target-test.yml

@@ -244,6 +244,13 @@ example_test_pytest_esp32s3_wifi_router:
   needs:
   needs:
     - build_pytest_examples_esp32s3
     - build_pytest_examples_esp32s3
   tags: [ esp32s3, wifi_router ]
   tags: [ esp32s3, wifi_router ]
+example_test_pytest_esp32_wifi_iperf:
+  extends:
+    - .pytest_examples_dir_template
+    - .rules:test:example_test-esp32-wifi
+  needs:
+    - build_pytest_examples_esp32
+  tags: [ esp32, Example_ShieldBox_Basic ]
 
 
 example_test_pytest_esp32_wifi_wlan:
 example_test_pytest_esp32_wifi_wlan:
   extends:
   extends:
@@ -253,6 +260,14 @@ example_test_pytest_esp32_wifi_wlan:
     - build_pytest_examples_esp32
     - build_pytest_examples_esp32
   tags: [ esp32, wifi_wlan ]
   tags: [ esp32, wifi_wlan ]
 
 
+example_test_pytest_esp32_ethernet_router:
+  extends:
+    - .pytest_examples_dir_template
+    - .rules:test:example_test-esp32-ethernet
+  needs:
+    - build_pytest_examples_esp32
+  tags: [ esp32, ethernet_router ]
+
 example_test_pytest_esp32_ethernet_ip101:
 example_test_pytest_esp32_ethernet_ip101:
   extends:
   extends:
     - .pytest_examples_dir_template
     - .pytest_examples_dir_template
@@ -988,22 +1003,6 @@ example_test_001C:
     - ESP32
     - ESP32
     - Example_GENERIC
     - Example_GENERIC
 
 
-example_test_002:
-  extends:
-    - .example_test_esp32_template
-    - .rules:test:example_test-esp32-wifi
-  tags:
-    - ESP32
-    - Example_ShieldBox_Basic
-
-example_test_ethernet_router:
-  extends:
-    - .example_test_esp32_template
-    - .rules:test:example_test-esp32-ethernet
-  tags:
-    - ESP32
-    - ethernet_router
-
 .example_test_003:
 .example_test_003:
   extends: .example_test_esp32_template
   extends: .example_test_esp32_template
   tags:
   tags:
@@ -1107,12 +1106,6 @@ example_test_C6_GENERIC:
     - .test_app_template
     - .test_app_template
     - .rules:test:custom_test-esp32s3
     - .rules:test:custom_test-esp32s3
 
 
-test_app_test_eth:
-  extends: .test_app_esp32_template
-  tags:
-    - ESP32
-    - ethernet_router
-
 .unit_test_template:
 .unit_test_template:
   extends: .target_test_job_template
   extends: .target_test_job_template
   needs: # the assign already needs all the build jobs
   needs: # the assign already needs all the build jobs

+ 2 - 0
conftest.py

@@ -103,6 +103,8 @@ ENV_MARKERS = {
     'wifi_router': 'both the runner and dut connect to the same wifi router',
     'wifi_router': 'both the runner and dut connect to the same wifi router',
     'wifi_high_traffic': 'wifi high traffic runners',
     'wifi_high_traffic': 'wifi high traffic runners',
     'wifi_wlan': 'wifi runner with a wireless NIC',
     'wifi_wlan': 'wifi runner with a wireless NIC',
+    'Example_ShieldBox_Basic': 'basic configuration of the AP and ESP DUT placed in shielded box',
+    'Example_ShieldBox': 'multiple shielded APs connected to shielded ESP DUT via RF cable with programmable attenuator',
     'xtal_26mhz': 'runner with 26MHz xtal on board',
     'xtal_26mhz': 'runner with 26MHz xtal on board',
     'xtal_40mhz': 'runner with 40MHz xtal on board',
     'xtal_40mhz': 'runner with 40MHz xtal on board',
     'external_flash': 'external flash memory connected via VSPI (FSPI)',
     'external_flash': 'external flash memory connected via VSPI (FSPI)',

+ 27 - 39
examples/ethernet/iperf/iperf_test.py → examples/ethernet/iperf/pytest_eth_iperf.py

@@ -1,25 +1,25 @@
+# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Unlicense OR CC0-1.0
 """
 """
 Test case for iperf example.
 Test case for iperf example.
 
 
-This test case might have problem running on windows:
+This test case might have problem running on Windows:
 
 
-1. direct use of `make`
-2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
+- use `sudo killall iperf` to force kill iperf, didn't implement windows version
 
 
 """
 """
 from __future__ import division, unicode_literals
 from __future__ import division, unicode_literals
 
 
 import os
 import os
-import re
 import subprocess
 import subprocess
 
 
-import ttfw_idf
+import pytest
 from common_test_methods import get_host_ip4_by_dest_ip
 from common_test_methods import get_host_ip4_by_dest_ip
 from idf_iperf_test_util import IperfUtility
 from idf_iperf_test_util import IperfUtility
-from tiny_test_fw import TinyFW
+from pytest_embedded import Dut
 
 
 try:
 try:
-    from typing import Any, Tuple
+    from typing import Any, Callable, Tuple
 except ImportError:
 except ImportError:
     # Only used for type annotations
     # Only used for type annotations
     pass
     pass
@@ -29,10 +29,10 @@ NO_BANDWIDTH_LIMIT = -1  # iperf send bandwidth is not limited
 
 
 class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
 class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
     """ iperf test implementation """
     """ iperf test implementation """
-    def __init__(self, dut, config_name, pc_nic_ip, pc_iperf_log_file, test_result=None):   # type: (str, str, str,str, Any) -> None
+    def __init__(self, dut: str, config_name: str, pc_nic_ip: str, pc_iperf_log_file: str, test_result:Any=None) -> None:
         IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'None', 'None', pc_nic_ip, pc_iperf_log_file, test_result)
         IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'None', 'None', pc_nic_ip, pc_iperf_log_file, test_result)
 
 
-    def setup(self):  # type: () -> Tuple[str,int]
+    def setup(self) -> Tuple[str,int]:
         """
         """
         setup iperf test:
         setup iperf test:
 
 
@@ -45,65 +45,53 @@ class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
             pass
             pass
         self.dut.write('restart')
         self.dut.write('restart')
         self.dut.expect("Type 'help' to get the list of commands.")
         self.dut.expect("Type 'help' to get the list of commands.")
-        self.dut.expect_any('iperf>', 'esp32>')
-        self.dut.write('ethernet start')
-        dut_ip = self.dut.expect(re.compile(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),'))[0]
+        self.dut.expect('iperf>')
+        dut_ip = self.dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
         rssi = 0
         rssi = 0
         return dut_ip, rssi
         return dut_ip, rssi
 
 
 
 
-@ttfw_idf.idf_example_test(env_tag='ethernet_router')
-def test_ethernet_throughput_basic(env, _):  # type: (Any, Any) -> None
+@pytest.mark.esp32
+@pytest.mark.ethernet_router
+def test_esp_eth_iperf(
+    dut: Dut,
+    log_performance: Callable[[str, object], None],
+    check_performance: Callable[[str, float, str], None],
+) -> None:
     """
     """
     steps: |
     steps: |
       1. test TCP tx rx and UDP tx rx throughput
       1. test TCP tx rx and UDP tx rx throughput
       2. compare with the pre-defined pass standard
       2. compare with the pre-defined pass standard
     """
     """
-    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
 
 
-    # 1. get DUT
-    dut = env.get_dut('iperf', 'examples/ethernet/iperf', dut_class=ttfw_idf.ESP32DUT)
-    dut.start_app()
-    dut.expect_any('iperf>', 'esp32>')
+    # 1. wait for DUT
+    dut.expect_exact('iperf>')
 
 
     # 2. preparing
     # 2. preparing
-    dut.write('ethernet start')
-    dut_ip = dut.expect(re.compile(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),'))[0]
+    pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
+    dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
     pc_nic_ip = get_host_ip4_by_dest_ip(dut_ip)
     pc_nic_ip = get_host_ip4_by_dest_ip(dut_ip)
-
     test_result = {
     test_result = {
         'tcp_tx': IperfUtility.TestResult('tcp', 'tx', 'ethernet'),
         'tcp_tx': IperfUtility.TestResult('tcp', 'tx', 'ethernet'),
         'tcp_rx': IperfUtility.TestResult('tcp', 'rx', 'ethernet'),
         'tcp_rx': IperfUtility.TestResult('tcp', 'rx', 'ethernet'),
         'udp_tx': IperfUtility.TestResult('udp', 'tx', 'ethernet'),
         'udp_tx': IperfUtility.TestResult('udp', 'tx', 'ethernet'),
         'udp_rx': IperfUtility.TestResult('udp', 'rx', 'ethernet'),
         'udp_rx': IperfUtility.TestResult('udp', 'rx', 'ethernet'),
     }
     }
-
     test_utility = IperfTestUtilityEth(dut, 'ethernet', pc_nic_ip, pc_iperf_log_file, test_result)
     test_utility = IperfTestUtilityEth(dut, 'ethernet', pc_nic_ip, pc_iperf_log_file, test_result)
 
 
     # 3. run test for TCP Tx, Rx and UDP Tx, Rx
     # 3. run test for TCP Tx, Rx and UDP Tx, Rx
-
     test_utility.run_test('tcp', 'tx', 0, NO_BANDWIDTH_LIMIT)
     test_utility.run_test('tcp', 'tx', 0, NO_BANDWIDTH_LIMIT)
     test_utility.run_test('tcp', 'rx', 0, NO_BANDWIDTH_LIMIT)
     test_utility.run_test('tcp', 'rx', 0, NO_BANDWIDTH_LIMIT)
     test_utility.run_test('udp', 'tx', 0, 80)
     test_utility.run_test('udp', 'tx', 0, 80)
     test_utility.run_test('udp', 'rx', 0, NO_BANDWIDTH_LIMIT)
     test_utility.run_test('udp', 'rx', 0, NO_BANDWIDTH_LIMIT)
 
 
     # 4. log performance and compare with pass standard
     # 4. log performance and compare with pass standard
-    performance_items = []
     for throughput_type in test_result:
     for throughput_type in test_result:
-        ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
-                                 '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
-        performance_items.append(['{}_throughput'.format(throughput_type),
-                                  '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
+        log_performance('{}_throughput'.format(throughput_type),
+                        '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
 
 
-    # 5. save to report
-    TinyFW.JunitReport.update_performance(performance_items)
     # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
     # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
     for throughput_type in test_result:
     for throughput_type in test_result:
-        ttfw_idf.check_performance('{}_throughput'.format(throughput_type + '_eth'),
-                                   test_result[throughput_type].get_best_throughput(), dut.TARGET)
-
-    env.close_dut('iperf')
-
-
-if __name__ == '__main__':
-    test_ethernet_throughput_basic(env_config_file='EnvConfig.yml')
+        check_performance('{}_throughput'.format(throughput_type + '_eth'),
+                          test_result[throughput_type].get_best_throughput(),
+                          dut.target)

+ 0 - 353
examples/wifi/iperf/iperf_test.py

@@ -1,353 +0,0 @@
-"""
-Test case for iperf example.
-
-This test case might have problem running on windows:
-
-1. direct use of `make`
-2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
-
-The test env Example_ShieldBox do need the following config::
-
-  Example_ShieldBox:
-    ap_list:
-      - ssid: "ssid"
-        password: "password"
-        outlet: 1
-    apc_ip: "192.168.1.88"
-    attenuator_port: "/dev/ttyUSB0"
-    iperf: "/dev/ttyUSB1"
-    apc_ip: "192.168.1.88"
-    pc_nic: "eth0"
-"""
-import os
-import re
-import subprocess
-import time
-
-import ttfw_idf
-from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport
-from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME
-from tiny_test_fw import DUT, TinyFW, Utility
-
-# configurations
-RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
-ATTEN_VALUE_LIST = range(0, 60, 2)
-NO_BANDWIDTH_LIMIT = -1  # iperf send bandwith is not limited
-
-CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
-
-# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
-# Using numbers for config will make this easy.
-# Use default value `99` for config with best performance.
-BEST_PERFORMANCE_CONFIG = '99'
-
-
-class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility):
-    """ iperf test implementation """
-    def __init__(self, dut, softap_dut, config_name, test_result=None):
-        IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result)
-        self.softap_dut = softap_dut
-        self.softap_ip = '192.168.4.1'
-
-    def setup(self):
-        """
-        setup iperf test:
-
-        1. kill current iperf process
-        2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
-        3. scan to get AP RSSI
-        4. connect to AP
-        """
-        self.softap_dut.write('restart')
-        self.softap_dut.expect_any('iperf>', 'esp32>', timeout=30)
-        self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password))
-        self.dut.write('restart')
-        self.dut.expect_any('iperf>', 'esp32>', timeout=30)
-        self.dut.write('scan {}'.format(self.ap_ssid))
-        for _ in range(SCAN_RETRY_COUNT):
-            try:
-                rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
-                                           timeout=SCAN_TIMEOUT)[0])
-                break
-            except DUT.ExpectTimeout:
-                continue
-        else:
-            raise AssertionError('Failed to scan AP')
-        self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
-        dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
-        return dut_ip, rssi
-
-    def _test_once(self, proto, direction):
-        """ do measure once for one type """
-        # connect and scan to get RSSI
-        dut_ip, rssi = self.setup()
-
-        assert direction in ['rx', 'tx']
-        assert proto in ['tcp', 'udp']
-
-        # run iperf test
-        if direction == 'tx':
-            if proto == 'tcp':
-                self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
-                # wait until DUT TCP server created
-                try:
-                    self.softap_dut.expect('iperf tcp server create successfully', timeout=1)
-                except DUT.ExpectTimeout:
-                    # compatible with old iperf example binary
-                    pass
-                self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
-            else:
-                self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
-                self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
-        else:
-            if proto == 'tcp':
-                self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
-                # wait until DUT TCP server created
-                try:
-                    self.dut.expect('iperf tcp server create successfully', timeout=1)
-                except DUT.ExpectTimeout:
-                    # compatible with old iperf example binary
-                    pass
-                self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME))
-            else:
-                self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
-                self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME))
-        time.sleep(60)
-
-        if direction == 'tx':
-            server_raw_data = self.dut.read()
-        else:
-            server_raw_data = self.softap_dut.read()
-        self.dut.write('iperf -a')
-        self.softap_dut.write('iperf -a')
-        self.dut.write('heap')
-        heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
-
-        # return server raw data (for parsing test results) and RSSI
-        return server_raw_data, rssi, heap_size
-
-
-@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
-def test_wifi_throughput_with_different_configs(env, extra_data):
-    """
-    steps: |
-      1. build iperf with specified configs
-      2. test throughput for all routers
-    """
-    pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
-    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
-    ap_info = {
-        'ssid': env.get_variable('ap_ssid'),
-        'password': env.get_variable('ap_password'),
-    }
-
-    config_names_raw = subprocess.check_output(['ls', os.path.dirname(os.path.abspath(__file__))])
-    config_names = CONFIG_NAME_PATTERN.findall(config_names_raw)
-    if not config_names:
-        raise ValueError('no configs found in {}'.format(os.path.dirname(__file__)))
-
-    test_result = dict()
-    sdkconfig_files = dict()
-
-    for config_name in config_names:
-        # 1. get the config
-        sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
-                                                    'sdkconfig.ci.{}'.format(config_name))
-
-        # 2. get DUT and download
-        dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=config_name)
-        dut.start_app()
-        dut.expect_any('iperf>', 'esp32>')
-
-        # 3. run test for each required att value
-        test_result[config_name] = {
-            'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name),
-            'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name),
-            'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name),
-            'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name),
-        }
-
-        test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip,
-                                                     pc_iperf_log_file, test_result[config_name])
-
-        for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
-            test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
-
-        for result_type in test_result[config_name]:
-            summary = str(test_result[config_name][result_type])
-            if summary:
-                Utility.console_log(summary, color='orange')
-
-        # 4. check test results
-        env.close_dut('iperf')
-
-    # 5. generate report
-    report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'Performance',
-                                                                'ThroughputForConfigsReport'),
-                                                   ap_info['ssid'], test_result, sdkconfig_files)
-    report.generate_report()
-
-
-@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
-def test_wifi_throughput_vs_rssi(env, extra_data):
-    """
-    steps: |
-      1. build with best performance config
-      2. switch on one router
-      3. set attenuator value from 0-60 for each router
-      4. test TCP tx rx and UDP tx rx throughput
-    """
-    att_port = env.get_variable('attenuator_port')
-    ap_list = env.get_variable('ap_list')
-    pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
-    apc_ip = env.get_variable('apc_ip')
-    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
-
-    test_result = {
-        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
-        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
-        'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
-        'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
-    }
-
-    # 1. get DUT and download
-    dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
-    dut.start_app()
-    dut.expect_any('iperf>', 'esp32>')
-
-    # 2. run test for each required att value
-    for ap_info in ap_list:
-        test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
-                                                     ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
-
-        PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
-        PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
-        Attenuator.set_att(att_port, 0)
-
-        if not test_utility.wait_ap_power_on():
-            Utility.console_log('[{}] failed to power on, skip testing this AP'
-                                .format(ap_info['ssid']), color='red')
-            continue
-
-        for atten_val in ATTEN_VALUE_LIST:
-            assert Attenuator.set_att(att_port, atten_val) is True
-            try:
-                test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
-            except AssertionError:
-                break
-
-    # 3. check test results
-    env.close_dut('iperf')
-
-    # 4. generate report
-    report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance', 'STAThroughputVsRssiReport'),
-                                               test_result)
-    report.generate_report()
-
-
-@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic',
-                           target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], ci_target=['ESP32'])
-def test_wifi_throughput_basic(env, extra_data):
-    """
-    steps: |
-      1. test TCP tx rx and UDP tx rx throughput
-      2. compare with the pre-defined pass standard
-    """
-    pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
-    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
-    ap_info = {
-        'ssid': env.get_variable('ap_ssid'),
-        'password': env.get_variable('ap_password'),
-    }
-
-    # 1. get DUT
-    dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
-    dut.start_app()
-    dut.expect_any('iperf>', 'esp32>')
-
-    # 2. preparing
-    test_result = {
-        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
-        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
-        'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
-        'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
-    }
-
-    test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
-                                                 pc_nic_ip, pc_iperf_log_file, test_result)
-
-    # 3. run test for TCP Tx, Rx and UDP Tx, Rx
-    for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
-        test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
-
-    # 4. log performance and compare with pass standard
-    performance_items = []
-    for throughput_type in test_result:
-        ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
-                                 '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
-        performance_items.append(['{}_throughput'.format(throughput_type),
-                                  '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
-
-    # 5. save to report
-    TinyFW.JunitReport.update_performance(performance_items)
-    # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
-    for throughput_type in test_result:
-        ttfw_idf.check_performance('{}_throughput'.format(throughput_type),
-                                   test_result[throughput_type].get_best_throughput(), dut.TARGET)
-
-    env.close_dut('iperf')
-
-
-@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
-def test_softap_throughput_vs_rssi(env, extra_data):
-    """
-    steps: |
-      1. build with best performance config
-      2. switch on one router
-      3. set attenuator value from 0-60 for each router
-      4. test TCP tx rx and UDP tx rx throughput
-    """
-    att_port = env.get_variable('attenuator_port')
-
-    test_result = {
-        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
-        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
-        'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
-        'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
-    }
-
-    # 1. get DUT and download
-    softap_dut = env.get_dut('softap_iperf', 'examples/wifi/iperf')
-    softap_dut.start_app()
-    softap_dut.expect_any('iperf>', 'esp32>')
-
-    sta_dut = env.get_dut('sta_iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
-    sta_dut.start_app()
-    sta_dut.expect_any('iperf>', 'esp32>')
-
-    # 2. run test for each required att value
-    test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result)
-
-    Attenuator.set_att(att_port, 0)
-
-    for atten_val in ATTEN_VALUE_LIST:
-        assert Attenuator.set_att(att_port, atten_val) is True
-        try:
-            test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
-        except AssertionError:
-            break
-
-    env.close_dut('softap_iperf')
-    env.close_dut('sta_iperf')
-
-    # 3. generate report
-    report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance',
-                                                            'SoftAPThroughputVsRssiReport'),test_result)
-    report.generate_report()
-
-
-if __name__ == '__main__':
-    # test_wifi_throughput_basic(env_config_file='EnvConfig.yml')
-    # test_wifi_throughput_with_different_configs(env_config_file='EnvConfig.yml')
-    test_wifi_throughput_vs_rssi(env_config_file='EnvConfig.yml', target='ESP32C3')
-    test_softap_throughput_vs_rssi(env_config_file='EnvConfig.yml')

+ 418 - 0
examples/wifi/iperf/pytest_iperf.py

@@ -0,0 +1,418 @@
+# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
+# SPDX-License-Identifier: Unlicense OR CC0-1.0
+"""
+Test case for iperf example.
+
+This test case might have problem running on windows:
+
+- use `sudo killall iperf` to force kill iperf, didn't implement windows version
+
+The test env Example_ShieldBox do need the following config::
+
+  Example_ShieldBox:
+    ap_list:
+      - ssid: "ssid"
+        password: "password"
+        outlet: 1
+    apc_ip: "192.168.1.88"
+    attenuator_port: "/dev/ttyUSB0"
+    iperf: "/dev/ttyUSB1"
+    apc_ip: "192.168.1.88"
+    pc_nic: "eth0"
+"""
+
+import logging
+import os
+import re
+import time
+from typing import Any, Callable, Dict, Generator, Tuple
+
+import pexpect
+import pytest
+from common_test_methods import get_env_config_variable, get_host_ip_by_interface
+from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport
+from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME
+from pytest_embedded import Dut
+from pytest_embedded_idf.dut import IdfDut
+
+# configurations
+RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
+ATTEN_VALUE_LIST = range(0, 60, 2)
+NO_BANDWIDTH_LIMIT = -1  # iperf send bandwith is not limited
+
+# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
+# Using numbers for config will make this easy.
+# Use default value `99` for config with best performance.
+BEST_PERFORMANCE_CONFIG = '99'
+
+
+class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility):
+    """ iperf test implementation """
+    def __init__(self, dut:IdfDut, softap_dut:IdfDut, config_name:str, test_result:Any=None) -> None:
+        IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result)
+        self.softap_dut = softap_dut
+        self.softap_ip = '192.168.4.1'
+
+    def setup(self) -> Tuple[str,int]:
+        """
+        setup iperf test:
+
+        1. kill current iperf process
+        2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
+        3. scan to get AP RSSI
+        4. connect to AP
+        """
+        self.softap_dut.write('restart')
+        self.softap_dut.expect_exact("Type 'help' to get the list of commands.")
+        self.softap_dut.expect('iperf>', timeout=30)
+        self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password))
+        self.dut.write('restart')
+        self.dut.expect_exact("Type 'help' to get the list of commands.")
+        self.dut.expect('iperf>', timeout=30)
+        self.dut.write('scan {}'.format(self.ap_ssid))
+        for _ in range(SCAN_RETRY_COUNT):
+            try:
+                rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid),
+                                           timeout=SCAN_TIMEOUT).group(1))
+                break
+            except pexpect.TIMEOUT:
+                continue
+        else:
+            raise AssertionError('Failed to scan AP')
+        self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
+        dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1).decode('utf-8')
+        return dut_ip, rssi
+
+    def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]:
+        """ do measure once for one type """
+        # connect and scan to get RSSI
+        dut_ip, rssi = self.setup()
+
+        assert direction in ['rx', 'tx']
+        assert proto in ['tcp', 'udp']
+
+        # run iperf test
+        if direction == 'tx':
+            if proto == 'tcp':
+                self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
+                # wait until DUT TCP server created
+                try:
+                    self.softap_dut.expect('iperf tcp server create successfully', timeout=1)
+                except pexpect.TIMEOUT:
+                    # compatible with old iperf example binary
+                    pass
+                if bw_limit > 0:
+                    self.dut.write('iperf -c {} -i 1 -t {} -b {}'.format(self.softap_ip, TEST_TIME, bw_limit))
+                else:
+                    self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
+            else:
+                self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
+                if bw_limit > 0:
+                    self.dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(self.softap_ip, TEST_TIME, bw_limit))
+                else:
+                    self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
+        else:
+            if proto == 'tcp':
+                self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
+                # wait until DUT TCP server created
+                try:
+                    self.dut.expect('iperf tcp server create successfully', timeout=1)
+                except pexpect.TIMEOUT:
+                    # compatible with old iperf example binary
+                    pass
+                if bw_limit > 0:
+                    self.softap_dut.write('iperf -c {} -i 1 -t {} -b {}'.format(dut_ip, TEST_TIME, bw_limit))
+                else:
+                    self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME))
+            else:
+                self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
+                if bw_limit > 0:
+                    self.softap_dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(dut_ip, TEST_TIME, bw_limit))
+                else:
+                    self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME))
+        time.sleep(TEST_TIME + 5)
+
+        if direction == 'tx':
+            server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8')
+        else:
+            server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8')
+        self.dut.write('iperf -a')
+        self.softap_dut.write('iperf -a')
+        self.dut.write('heap')
+        heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1)
+
+        # return server raw data (for parsing test results) and RSSI
+        return server_raw_data, rssi, heap_size
+
+
+@pytest.fixture(name='generate_report_different_configs', scope='session')
+def fixture_generate_report_different_configs(
+    session_tempdir:str
+) -> Generator[Callable[[Dict[str, Any], Dict[str, Any], str], None], None, None]:
+    _test_result_dict = dict()
+    _sdkconfig_files_dict = dict()
+    _ap_info = dict()
+
+    def add_config(ap_info:Dict[str, Any], test_result:Dict[str, Any], config_name:str) -> None:
+        """
+        Collects results for each config and stores it to a dictionary
+            Args:
+                ap_info: AP info
+                test_result: test results for a specific config
+                config_name: config name
+        """
+        # need to store the SSID to generate the report in the teardown period
+        # note that the info passed along with the last call of the fixture is used in the teardown period
+        _ap_info['ssid'] = ap_info['ssid']
+
+        _test_result_dict[config_name] = test_result
+        _sdkconfig_files_dict[config_name] = 'sdkconfig.ci.' + config_name
+
+    yield add_config
+
+    # the final report for all config results is generated during fixture's teardown period
+    report = TestReport.ThroughputForConfigsReport(os.path.join(session_tempdir, 'Performance',
+                                                   'ThroughputForConfigsReport'), _ap_info['ssid'],
+                                                   _test_result_dict, _sdkconfig_files_dict)
+    report.generate_report()
+
+
+@pytest.mark.esp32
+@pytest.mark.esp32s2
+@pytest.mark.esp32c3
+@pytest.mark.esp32s3
+@pytest.mark.temp_skip_ci(targets=['esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners (run only for ESP32)')
+@pytest.mark.timeout(1200)
+@pytest.mark.Example_ShieldBox_Basic
+@pytest.mark.parametrize('config', [
+    BEST_PERFORMANCE_CONFIG
+], indirect=True)
+def test_wifi_throughput_basic(
+    dut: Dut,
+    log_performance: Callable[[str, str], None],
+    check_performance: Callable[[str, float, str], None],
+) -> None:
+    """
+    steps: |
+      1. test TCP tx rx and UDP tx rx throughput
+      2. compare with the pre-defined pass standard
+    """
+    # 1. wait for DUT
+    dut.expect('iperf>')
+
+    # 2. preparing
+    env_name = 'Example_ShieldBox_Basic'
+    pc_nic = get_env_config_variable(env_name, 'pc_nic')
+    pc_nic_ip = get_host_ip_by_interface(pc_nic)
+    pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
+    ap_info = {
+        'ssid': get_env_config_variable(env_name, 'ap_ssid'),
+        'password': get_env_config_variable(env_name, 'ap_password'),
+    }
+
+    test_result = {
+        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
+        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
+        'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
+        'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
+    }
+
+    test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
+                                                 pc_nic_ip, pc_iperf_log_file, test_result)
+
+    # 3. run test for TCP Tx, Rx and UDP Tx, Rx
+    for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
+        test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
+
+    # 4. log performance and compare with pass standard
+    for throughput_type in test_result:
+        log_performance('{}_throughput'.format(throughput_type),
+                        '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
+
+    # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
+    for throughput_type in test_result:
+        check_performance('{}_throughput'.format(throughput_type),
+                          test_result[throughput_type].get_best_throughput(), dut.target)
+
+
+@pytest.mark.esp32
+@pytest.mark.esp32s2
+@pytest.mark.esp32c3
+@pytest.mark.esp32s3
+@pytest.mark.temp_skip_ci(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='local stress test')
+@pytest.mark.timeout(1200)
+@pytest.mark.Example_ShieldBox_Basic
+@pytest.mark.parametrize('config', [
+    '00',
+    '01',
+    '02',
+    '03',
+    '04',
+    '05',
+    '06',
+    '07',
+    '99'
+], indirect=True)
+def test_wifi_throughput_with_different_configs(
+    dut: Dut,
+    generate_report_different_configs: Callable[[Dict[str, Any], Dict[str, Any], str], None],
+) -> None:
+    """
+    steps: |
+      1. build iperf with specified configs
+      2. test throughput for all routers
+    """
+    # 1. wait for DUT
+    dut.expect('iperf>')
+
+    # 2. preparing
+    env_name = 'Example_ShieldBox_Basic'
+    pc_nic = get_env_config_variable(env_name, 'pc_nic')
+    pc_nic_ip = get_host_ip_by_interface(pc_nic)
+    pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
+    ap_info = {
+        'ssid': get_env_config_variable(env_name, 'ap_ssid'),
+        'password': get_env_config_variable(env_name, 'ap_password'),
+    }
+
+    found_config = re.search(r'esp32.*\.(\w+)\.', dut.test_case_name)
+    if found_config is not None:
+        config_name = found_config.group(1)
+    else:
+        raise Exception('config name not found')
+
+    # 3. run test for each required att value
+    test_result = {
+        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name),
+        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name),
+        'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name),
+        'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name),
+    }
+    test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip,
+                                                 pc_iperf_log_file, test_result)
+    for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
+        test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
+
+    for result_type in test_result:
+        summary = str(test_result[result_type])
+        if summary:
+            logging.info(summary)
+
+    generate_report_different_configs(ap_info, test_result, config_name)
+
+
+@pytest.mark.esp32
+@pytest.mark.esp32s2
+@pytest.mark.esp32c3
+@pytest.mark.esp32s3
+@pytest.mark.temp_skip(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners')
+@pytest.mark.timeout(3600)
+@pytest.mark.Example_ShieldBox
+@pytest.mark.parametrize('config', [
+    BEST_PERFORMANCE_CONFIG
+], indirect=True)
+def test_wifi_throughput_vs_rssi(
+    dut: Dut,
+    session_tempdir:str,
+) -> None:
+    """
+    steps: |
+      1. build with best performance config
+      2. switch on one router
+      3. set attenuator value from 0-60 for each router
+      4. test TCP tx rx and UDP tx rx throughput
+    """
+    # 1. wait for DUT
+    dut.expect('iperf>')
+
+    # 2. preparing
+    env_name = 'Example_ShieldBox'
+    att_port = get_env_config_variable(env_name, 'attenuator_port')
+    ap_list = get_env_config_variable(env_name, 'ap_list')
+    pc_nic = get_env_config_variable(env_name, 'pc_nic')
+    pc_nic_ip = get_host_ip_by_interface(pc_nic)
+    apc_ip = get_env_config_variable(env_name, 'apc_ip')
+    pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
+
+    test_result = {
+        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
+        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
+        'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
+        'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
+    }
+
+    # 3. run test for each required att value
+    for ap_info in ap_list:
+        test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
+                                                     ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
+        PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
+        PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
+        Attenuator.set_att(att_port, 0)
+        if not test_utility.wait_ap_power_on():
+            logging.error('[{}] failed to power on, skip testing this AP'.format(ap_info['ssid']))
+            continue
+        for atten_val in ATTEN_VALUE_LIST:
+            assert Attenuator.set_att(att_port, atten_val) is True
+            try:
+                test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
+            except AssertionError:
+                break
+
+    # 4. generate report
+    report = TestReport.ThroughputVsRssiReport(os.path.join(session_tempdir, 'Performance', 'STAThroughputVsRssiReport'),
+                                               test_result)
+    report.generate_report()
+
+
+@pytest.mark.esp32
+@pytest.mark.esp32s2
+@pytest.mark.esp32c3
+@pytest.mark.esp32s3
+@pytest.mark.temp_skip(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners')
+@pytest.mark.parametrize('count, config', [
+    (2, BEST_PERFORMANCE_CONFIG),
+], indirect=True)
+def test_softap_throughput_vs_rssi(
+    dut: Tuple[IdfDut, IdfDut],
+    session_tempdir:str,
+) -> None:
+    """
+    steps: |
+      1. build with best performance config
+      2. switch on one router
+      3. set attenuator value from 0-60 for each router
+      4. test TCP tx rx and UDP tx rx throughput
+    """
+    # 1. wait for DUTs
+    softap_dut = dut[0]
+    sta_dut = dut[1]
+    softap_dut.expect('iperf>')
+    sta_dut.expect('iperf>')
+
+    # 2. preparing
+    env_name = 'Example_ShieldBox2'
+    att_port = get_env_config_variable(env_name, 'attenuator_port')
+
+    test_result = {
+        'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
+        'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
+        'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
+        'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
+    }
+
+    # 3. run test for each required att value
+    test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result)
+
+    Attenuator.set_att(att_port, 0)
+
+    for atten_val in ATTEN_VALUE_LIST:
+        assert Attenuator.set_att(att_port, atten_val) is True
+        try:
+            test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
+        except AssertionError:
+            break
+
+    # 4. generate report
+    report = TestReport.ThroughputVsRssiReport(os.path.join(session_tempdir, 'Performance',
+                                                            'SoftAPThroughputVsRssiReport'),test_result)
+    report.generate_report()

+ 47 - 39
tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py

@@ -1,12 +1,14 @@
-# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
 # SPDX-License-Identifier: Apache-2.0
 # SPDX-License-Identifier: Apache-2.0
+import logging
 import os
 import os
 import re
 import re
 import subprocess
 import subprocess
 import time
 import time
 
 
+import pexpect
 from idf_iperf_test_util import LineChart
 from idf_iperf_test_util import LineChart
-from tiny_test_fw import DUT, Utility
+from pytest_embedded import Dut
 
 
 try:
 try:
     from typing import Any, Tuple
     from typing import Any, Tuple
@@ -45,7 +47,7 @@ class TestResult(object):
     RSSI_RANGE = [-x for x in range(10, 100)]
     RSSI_RANGE = [-x for x in range(10, 100)]
     ATT_RANGE = [x for x in range(0, 64)]
     ATT_RANGE = [x for x in range(0, 64)]
 
 
-    def __init__(self, proto, direction, config_name):  # type: (str, str, str) -> None
+    def __init__(self, proto:str, direction:str, config_name:str) -> None:
         self.proto = proto
         self.proto = proto
         self.direction = direction
         self.direction = direction
         self.config_name = config_name
         self.config_name = config_name
@@ -55,7 +57,7 @@ class TestResult(object):
         self.heap_size = INVALID_HEAP_SIZE
         self.heap_size = INVALID_HEAP_SIZE
         self.error_list = []  # type: list[str]
         self.error_list = []  # type: list[str]
 
 
-    def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):  # type: (float, str, int, int, str) -> None
+    def _save_result(self, throughput:float, ap_ssid:str, att:int, rssi:int, heap_size:str) -> None:
         """
         """
         save the test results:
         save the test results:
 
 
@@ -70,7 +72,7 @@ class TestResult(object):
 
 
         self.att_rssi_map[ap_ssid][att] = rssi
         self.att_rssi_map[ap_ssid][att] = rssi
 
 
-        def record_throughput(database, key_value):  # type: (dict, int) -> None
+        def record_throughput(database:dict, key_value:int) -> None:
             try:
             try:
                 # we save the larger value for same att
                 # we save the larger value for same att
                 if throughput > database[ap_ssid][key_value]:
                 if throughput > database[ap_ssid][key_value]:
@@ -84,7 +86,7 @@ class TestResult(object):
         if int(heap_size) < self.heap_size:
         if int(heap_size) < self.heap_size:
             self.heap_size = int(heap_size)
             self.heap_size = int(heap_size)
 
 
-    def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):  # type: (str, str, int, int, str) -> float
+    def add_result(self, raw_data:str, ap_ssid:str, att:int, rssi:int, heap_size:str) -> float:
         """
         """
         add result for one test
         add result for one test
 
 
@@ -132,14 +134,14 @@ class TestResult(object):
 
 
         return max_throughput
         return max_throughput
 
 
-    def post_analysis(self):  # type: () -> None
+    def post_analysis(self) -> None:
         """
         """
         some rules need to be checked after we collected all test raw data:
         some rules need to be checked after we collected all test raw data:
 
 
         1. throughput value 30% worse than the next point with lower RSSI
         1. throughput value 30% worse than the next point with lower RSSI
         2. throughput value 30% worse than the next point with larger attenuate
         2. throughput value 30% worse than the next point with larger attenuate
         """
         """
-        def analysis_bad_point(data, index_type):  # type: (dict, str) -> None
+        def analysis_bad_point(data:dict, index_type:str) -> None:
             for ap_ssid in data:
             for ap_ssid in data:
                 result_dict = data[ap_ssid]
                 result_dict = data[ap_ssid]
                 index_list = list(result_dict.keys())
                 index_list = list(result_dict.keys())
@@ -160,7 +162,7 @@ class TestResult(object):
         analysis_bad_point(self.throughput_by_rssi, 'rssi')
         analysis_bad_point(self.throughput_by_rssi, 'rssi')
         analysis_bad_point(self.throughput_by_att, 'att')
         analysis_bad_point(self.throughput_by_att, 'att')
 
 
-    def draw_throughput_figure(self, path, ap_ssid, draw_type):  # type: (str, str, str) -> str
+    def draw_throughput_figure(self, path:str, ap_ssid:str, draw_type:str) -> str:
         """
         """
         :param path: folder to save figure. make sure the folder is already created.
         :param path: folder to save figure. make sure the folder is already created.
         :param ap_ssid: ap ssid string or a list of ap ssid string
         :param ap_ssid: ap ssid string or a list of ap ssid string
@@ -189,7 +191,7 @@ class TestResult(object):
                                   data, range_list)
                                   data, range_list)
         return file_name
         return file_name
 
 
-    def draw_rssi_vs_att_figure(self, path, ap_ssid):  # type: (str, str) -> str
+    def draw_rssi_vs_att_figure(self, path:str, ap_ssid:str) -> str:
         """
         """
         :param path: folder to save figure. make sure the folder is already created.
         :param path: folder to save figure. make sure the folder is already created.
         :param ap_ssid: ap to use
         :param ap_ssid: ap to use
@@ -207,13 +209,13 @@ class TestResult(object):
                                   self.ATT_RANGE)
                                   self.ATT_RANGE)
         return file_name
         return file_name
 
 
-    def get_best_throughput(self):  # type: () -> Any
+    def get_best_throughput(self) -> Any:
         """  get the best throughput during test """
         """  get the best throughput during test """
         best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
         best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
                         for ap_ssid in self.throughput_by_att]
                         for ap_ssid in self.throughput_by_att]
         return max(best_for_aps)
         return max(best_for_aps)
 
 
-    def __str__(self):  # type: () -> str
+    def __str__(self) -> str:
         """
         """
         returns summary for this test:
         returns summary for this test:
 
 
@@ -237,8 +239,8 @@ class TestResult(object):
 class IperfTestUtility(object):
 class IperfTestUtility(object):
     """ iperf test implementation """
     """ iperf test implementation """
 
 
-    def __init__(self, dut, config_name, ap_ssid, ap_password,
-                 pc_nic_ip, pc_iperf_log_file, test_result=None):   # type: (str, str, str, str, str, str, Any) -> None
+    def __init__(self, dut:Dut, config_name:str, ap_ssid:str, ap_password:str,
+                 pc_nic_ip:str, pc_iperf_log_file:str, test_result:Any=None) -> None:
         self.config_name = config_name
         self.config_name = config_name
         self.dut = dut
         self.dut = dut
 
 
@@ -259,7 +261,7 @@ class IperfTestUtility(object):
                 'udp_rx': TestResult('udp', 'rx', config_name),
                 'udp_rx': TestResult('udp', 'rx', config_name),
             }
             }
 
 
-    def setup(self):  # type: (Any) -> Tuple[str,int]
+    def setup(self) -> Tuple[str,int]:
         """
         """
         setup iperf test:
         setup iperf test:
 
 
@@ -274,25 +276,26 @@ class IperfTestUtility(object):
             pass
             pass
         time.sleep(5)
         time.sleep(5)
         self.dut.write('restart')
         self.dut.write('restart')
-        self.dut.expect_any('iperf>', 'esp32>')
+        self.dut.expect_exact("Type 'help' to get the list of commands.")
+        self.dut.expect('iperf>')
         self.dut.write('scan {}'.format(self.ap_ssid))
         self.dut.write('scan {}'.format(self.ap_ssid))
         for _ in range(SCAN_RETRY_COUNT):
         for _ in range(SCAN_RETRY_COUNT):
             try:
             try:
-                rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
-                                           timeout=SCAN_TIMEOUT)[0])
+                rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid),
+                                           timeout=SCAN_TIMEOUT).group(1))
                 break
                 break
-            except DUT.ExpectTimeout:
+            except pexpect.TIMEOUT:
                 continue
                 continue
         else:
         else:
             raise AssertionError('Failed to scan AP')
             raise AssertionError('Failed to scan AP')
         self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
         self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
-        dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
+        dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1)
         return dut_ip, rssi
         return dut_ip, rssi
 
 
-    def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):  # type: (str, str, int, int, int) -> Any
+    def _save_test_result(self, test_case:str, raw_data:str, att:int, rssi:int, heap_size:int) -> Any:
         return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
         return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
 
 
-    def _test_once(self, proto, direction, bw_limit):   # type: (Any, str, str, int) -> Tuple[str, int, int]
+    def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]:
         """ do measure once for one type """
         """ do measure once for one type """
         # connect and scan to get RSSI
         # connect and scan to get RSSI
         dut_ip, rssi = self.setup()
         dut_ip, rssi = self.setup()
@@ -336,9 +339,9 @@ class IperfTestUtility(object):
                     # wait until DUT TCP server created
                     # wait until DUT TCP server created
                     try:
                     try:
                         self.dut.expect('iperf: Socket created', timeout=5)
                         self.dut.expect('iperf: Socket created', timeout=5)
-                    except DUT.ExpectTimeout:
+                    except pexpect.TIMEOUT:
                         # compatible with old iperf example binary
                         # compatible with old iperf example binary
-                        Utility.console_log('create iperf tcp server fail')
+                        logging.info('create iperf tcp server fail')
                     if bw_limit > 0:
                     if bw_limit > 0:
                         process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm',
                         process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm',
                                                     '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
                                                     '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
@@ -356,9 +359,9 @@ class IperfTestUtility(object):
                     # wait until DUT TCP server created
                     # wait until DUT TCP server created
                     try:
                     try:
                         self.dut.expect('iperf: Socket bound', timeout=5)
                         self.dut.expect('iperf: Socket bound', timeout=5)
-                    except DUT.ExpectTimeout:
+                    except pexpect.TIMEOUT:
                         # compatible with old iperf example binary
                         # compatible with old iperf example binary
-                        Utility.console_log('create iperf udp server fail')
+                        logging.info('create iperf udp server fail')
                     if bw_limit > 0:
                     if bw_limit > 0:
                         process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm',
                         process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm',
                                                     '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
                                                     '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
@@ -379,10 +382,13 @@ class IperfTestUtility(object):
                             else:
                             else:
                                 process.terminate()
                                 process.terminate()
 
 
-            server_raw_data = self.dut.read()
+            server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8')
             with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
             with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
                 pc_raw_data = f.read()
                 pc_raw_data = f.read()
 
 
+        if os.path.exists(PC_IPERF_TEMP_LOG_FILE):
+            os.remove(PC_IPERF_TEMP_LOG_FILE)
+
         # save PC iperf logs to console
         # save PC iperf logs to console
         with open(self.pc_iperf_log_file, 'a+') as f:
         with open(self.pc_iperf_log_file, 'a+') as f:
             f.write('## [{}] `{}`\r\n##### {}'
             f.write('## [{}] `{}`\r\n##### {}'
@@ -391,18 +397,19 @@ class IperfTestUtility(object):
                             time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
                             time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
             f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
             f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
         self.dut.write('heap')
         self.dut.write('heap')
-        heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
+        heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1)
 
 
         # return server raw data (for parsing test results) and RSSI
         # return server raw data (for parsing test results) and RSSI
         return server_raw_data, rssi, heap_size
         return server_raw_data, rssi, heap_size
 
 
-    def run_test(self, proto, direction, atten_val, bw_limit):   # type: (str, str, int, int) -> None
+    def run_test(self, proto:str, direction:str, atten_val:int, bw_limit:int) -> None:
         """
         """
         run test for one type, with specified atten_value and save the test result
         run test for one type, with specified atten_value and save the test result
 
 
         :param proto: tcp or udp
         :param proto: tcp or udp
         :param direction: tx or rx
         :param direction: tx or rx
         :param atten_val: attenuate value
         :param atten_val: attenuate value
+        :param bw_limit: bandwidth limit
         """
         """
         rssi = FAILED_TO_SCAN_RSSI
         rssi = FAILED_TO_SCAN_RSSI
         heap_size = INVALID_HEAP_SIZE
         heap_size = INVALID_HEAP_SIZE
@@ -411,32 +418,33 @@ class IperfTestUtility(object):
             throughput = self._save_test_result('{}_{}'.format(proto, direction),
             throughput = self._save_test_result('{}_{}'.format(proto, direction),
                                                 server_raw_data, atten_val,
                                                 server_raw_data, atten_val,
                                                 rssi, heap_size)
                                                 rssi, heap_size)
-            Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}'
-                                .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
+            logging.info('[{}][{}_{}][{}][{}]: {:.02f}'
+                         .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
             self.lowest_rssi_scanned = min(self.lowest_rssi_scanned, rssi)
             self.lowest_rssi_scanned = min(self.lowest_rssi_scanned, rssi)
         except (ValueError, IndexError):
         except (ValueError, IndexError):
             self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
             self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
-            Utility.console_log('Fail to get throughput results.')
+            logging.info('Fail to get throughput results.')
         except AssertionError:
         except AssertionError:
             self.fail_to_scan += 1
             self.fail_to_scan += 1
-            Utility.console_log('Fail to scan AP.')
+            logging.info('Fail to scan AP.')
 
 
-    def run_all_cases(self, atten_val, bw_limit):   # type: (int, int) -> None
+    def run_all_cases(self, atten_val:int, bw_limit:int) -> None:
         """
         """
         run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
         run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
 
 
         :param atten_val: attenuate value
         :param atten_val: attenuate value
+        :param bw_limit: bandwidth limit
         """
         """
         self.run_test('tcp', 'tx', atten_val, bw_limit)
         self.run_test('tcp', 'tx', atten_val, bw_limit)
         self.run_test('tcp', 'rx', atten_val, bw_limit)
         self.run_test('tcp', 'rx', atten_val, bw_limit)
         self.run_test('udp', 'tx', atten_val, bw_limit)
         self.run_test('udp', 'tx', atten_val, bw_limit)
         self.run_test('udp', 'rx', atten_val, bw_limit)
         self.run_test('udp', 'rx', atten_val, bw_limit)
         if self.fail_to_scan > 10:
         if self.fail_to_scan > 10:
-            Utility.console_log(
+            logging.info(
                 'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned))
                 'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned))
             raise AssertionError
             raise AssertionError
 
 
-    def wait_ap_power_on(self):    # type: (Any) -> bool
+    def wait_ap_power_on(self) -> bool:
         """
         """
         AP need to take sometime to power on. It changes for different APs.
         AP need to take sometime to power on. It changes for different APs.
         This method will scan to check if the AP powers on.
         This method will scan to check if the AP powers on.
@@ -444,15 +452,15 @@ class IperfTestUtility(object):
         :return: True or False
         :return: True or False
         """
         """
         self.dut.write('restart')
         self.dut.write('restart')
-        self.dut.expect_any('iperf>', 'esp32>')
+        self.dut.expect('iperf>')
         for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
         for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
             try:
             try:
                 self.dut.write('scan {}'.format(self.ap_ssid))
                 self.dut.write('scan {}'.format(self.ap_ssid))
-                self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
+                self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid),
                                 timeout=SCAN_TIMEOUT)
                                 timeout=SCAN_TIMEOUT)
                 ret = True
                 ret = True
                 break
                 break
-            except DUT.ExpectTimeout:
+            except pexpect.TIMEOUT:
                 pass
                 pass
         else:
         else:
             ret = False
             ret = False

+ 3 - 0
tools/requirements/requirements.pytest.txt

@@ -20,5 +20,8 @@ dbus-python; sys_platform == 'linux'
 protobuf
 protobuf
 paho-mqtt
 paho-mqtt
 
 
+# iperf_test_util
+pyecharts
+
 # for twai tests, communicate with socket can device (e.g. Canable)
 # for twai tests, communicate with socket can device (e.g. Canable)
 python-can
 python-can

+ 0 - 3
tools/requirements/requirements.ttfw.txt

@@ -20,9 +20,6 @@ future
 dbus-python; sys_platform == 'linux'
 dbus-python; sys_platform == 'linux'
 pygobject; sys_platform != 'win32'
 pygobject; sys_platform != 'win32'
 
 
-# iperf_test_util
-pyecharts
-
 # esp_prov
 # esp_prov
 bleak
 bleak
 # future  # addressed before under ble
 # future  # addressed before under ble