| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Apache-2.0
- import logging
- import os
- import time
- from multiprocessing import Manager, Process, Semaphore
- from multiprocessing.managers import SyncManager
- from typing import List, Union
- from pexpect.exceptions import TIMEOUT
- from pytest_embedded import Dut, unity, utils
- from pytest_embedded_idf.dut import UnittestMenuCase
- class BaseTester:
- """
- The base class that providing shared methods
- Attributes:
- dut (Dut): Object of the Device under test
- test_menu (List[UnittestMenuCase]): The list of the cases
- retry_times (int): The retry times when failed to start a case
- args (Any): Not used
- """
- # The patterns that indicate the runner is ready come from 'unity_runner.c'
- ready_pattern_list = ['Press ENTER to see the list of tests',
- 'Enter test for running',
- 'Enter next test, or \'enter\' to see menu']
- def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
- self.retry_times = 30
- if isinstance(dut, List):
- for item in dut:
- if isinstance(item, Dut):
- self.dut = item
- break
- else:
- self.dut = dut
- for k, v in kwargs.items():
- setattr(self, k, v)
- if 'test_menu' not in kwargs:
- self.get_test_menu()
- def get_test_menu(self) -> None:
- """
- Get the test menu of this test app
- Notes:
- It will do a hard reset after getting the test menu to ensure
- the patterns that indicate the case is ready not taken by the parser.
- Please use this function to get the test menu while using this script
- """
- self.dut.write('')
- self.test_menu = self.dut.parse_test_menu()
- self.dut.serial.hard_reset()
- class NormalCaseTester(BaseTester):
- """
- Tester of normal type case
- Attributes:
- dut (Dut): Object of the Device under test
- test_menu (List[UnittestMenuCase]): The list of the cases
- retry_times (int): The retry times when failed to start a case
- args (Any): Not used
- """
- def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None:
- """
- Run all normal cases
- Args:
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- for case in self.test_menu:
- self.run_normal_case(case, reset, timeout=timeout)
- def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
- """
- Run a specific normal case
- Notes:
- Will skip if the case type is not normal
- Args:
- case: the specific case that parsed in test menu
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- if case.type == 'normal':
- if reset:
- self.dut.serial.hard_reset()
- self.dut.expect(self.ready_pattern_list, timeout=timeout)
- # Retry if write not success
- for retry in range(self.retry_times):
- self.dut.write(str(case.index))
- try:
- self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
- break
- except TIMEOUT as e:
- if retry >= self.retry_times - 1:
- raise e
- self.dut.expect_unity_test_output(timeout=timeout)
- class MultiStageCaseTester(BaseTester):
- """
- Tester of multiple stage type case
- Attributes:
- dut (Dut): Object of the Device under test
- test_menu (List[UnittestMenuCase]): The list of the cases
- retry_times (int): The retry times when failed to start a case
- args (Any): Not used
- """
- def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None:
- """
- Run all multi_stage cases
- Args:
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- for case in self.test_menu:
- self.run_multi_stage_case(case, reset, timeout=timeout)
- def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
- """
- Run a specific multi_stage case
- Notes:
- Will skip if the case type is not multi_stage
- Args:
- case: the specific case that parsed in test menu
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- if case.type == 'multi_stage':
- if reset:
- self.dut.serial.hard_reset()
- for sub_case in case.subcases:
- self.dut.expect(self.ready_pattern_list, timeout=timeout)
- # Retry if write not success
- for retry in range(self.retry_times):
- self.dut.write(str(case.index))
- try:
- self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
- break
- except TIMEOUT as e:
- if retry >= self.retry_times - 1:
- raise e
- self.dut.write(str(sub_case['index']))
- self.dut.expect_unity_test_output(timeout=timeout)
- class MultiDevResource:
- """
- Resources of multi_dev dut
- Attributes:
- dut (Dut): Object of the Device under test
- sem (Semaphore): Semaphore of monitoring whether the case finished
- recv_sig (List[str]): The list of received signals from other dut
- thread (Process): The thread of monitoring the signals
- """
- def __init__(self, dut: Dut, manager: SyncManager) -> None:
- self.dut = dut
- self.sem = Semaphore()
- self.recv_sig = manager.list() # type: list[str]
- self.process: Process = None # type: ignore
- class MultiDevCaseTester(BaseTester):
- """
- Tester of multi_device case
- Attributes:
- group (List[MultiDevResource]): The group of the devices' resources
- dut (Dut): The first dut, mainly used to get the test menu only
- test_menu (List[UnittestMenuCase]): The list of the cases
- retry_times (int): The retry times when failed to start a case
- """
- # The signal pattens come from 'test_utils.c'
- SEND_SIGNAL_PREFIX = 'Send signal: '
- WAIT_SIGNAL_PREFIX = 'Waiting for signal: '
- UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!'
- UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!'
- def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
- """
- Create the object for every dut and put them into the group
- """
- super().__init__(dut, **kwargs)
- self._manager = Manager()
- self.group: List[MultiDevResource] = []
- if isinstance(dut, List):
- for item in dut:
- if isinstance(item, Dut):
- dev_res = MultiDevResource(item, self._manager)
- self.group.append(dev_res)
- else:
- dev_res = MultiDevResource(dut, self._manager)
- self.group.append(dev_res)
- def _wait_multi_dev_case_finish(self, timeout: int = 60) -> None:
- """
- Wait until all the sub-cases of this multi_device case finished
- """
- for d in self.group:
- if d.sem.acquire(timeout=timeout):
- d.sem.release()
- else:
- raise TimeoutError('Wait case to finish timeout')
- def _start_sub_case_process(self, dev_res: MultiDevResource, case: UnittestMenuCase, sub_case_index: int, timeout: int = 60) -> None:
- """
- Start the thread monitoring on the corresponding dut of the sub-case
- """
- # Allocate the kwargs that pass to '_run'
- _kwargs = {}
- _kwargs['dut'] = dev_res.dut
- _kwargs['dev_res'] = dev_res
- _kwargs['case'] = case
- _kwargs['sub_case_index'] = sub_case_index
- _kwargs['timeout'] = timeout
- # Create the thread of the sub-case
- dev_res.process = Process(target=self._run, kwargs=_kwargs, daemon=True)
- dev_res.process.start()
- # Process starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
- dev_res.sem.acquire()
- def _run(self, **kwargs) -> None: # type: ignore
- """
- The thread target function
- Will run for each case on each dut
- Call the wrapped function to trigger the case
- Then keep listening on the dut for the signal
- - If the dut send a signal, it will be put into others' recv_sig
- - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires
- - If the dut finished running the case, it will quite the loop and terminate the thread
- """
- signal_pattern_list = [
- self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal
- self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal
- unity.UNITY_SUMMARY_LINE_REGEX, # Means the case finished
- ]
- dut = kwargs['dut']
- dev_res = kwargs['dev_res']
- case = kwargs['case']
- sub_case_index = kwargs['sub_case_index']
- timeout = kwargs['timeout']
- # Start the case
- dut.expect(self.ready_pattern_list)
- # Retry at most 30 times if not write successfully
- for retry in range(self.retry_times):
- dut.write(str(case.index))
- try:
- dut.expect_exact('Running {}...'.format(case.name), timeout=10)
- break
- except TIMEOUT as e:
- if retry >= self.retry_times - 1:
- dev_res.sem.release()
- raise e
- dut.write(str(sub_case_index))
- # Wait for the specific patterns, only exist when the sub-case finished
- while True:
- pat = dut.expect(signal_pattern_list, timeout=timeout)
- if pat is not None:
- match_str = pat.group().decode('utf-8')
- # Send a signal
- if match_str.find(self.SEND_SIGNAL_PREFIX) >= 0:
- send_sig = pat.group(1).decode('utf-8')
- for d in self.group:
- d.recv_sig.append(send_sig)
- # Waiting for a signal
- elif match_str.find(self.WAIT_SIGNAL_PREFIX) >= 0:
- wait_sig = pat.group(1).decode('utf-8')
- while True:
- if wait_sig in dev_res.recv_sig:
- dev_res.recv_sig.remove(wait_sig)
- dut.write('')
- break
- # Keep waiting the signal
- else:
- time.sleep(0.1)
- # Case finished
- elif match_str.find('Tests') >= 0:
- log = utils.remove_asci_color_code(dut.pexpect_proc.before)
- dut.testsuite.add_unity_test_cases(log)
- break
- # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish'
- #
- # Manually to create the real test case junit report
- # The child process attributes won't be reflected to the parent one.
- junit_report = os.path.splitext(dut.logfile)[0] + f'_{case.index}_{sub_case_index}.xml'
- dut.testsuite.dump(junit_report)
- logging.info(f'Created unity output junit report: {junit_report}')
- dev_res.sem.release()
- def run_all_multi_dev_cases(self, reset: bool = False, timeout: int = 60) -> None:
- """
- Run only multi_device cases
- Args:
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- for case in self.test_menu:
- # Run multi_device case on every device
- self.run_multi_dev_case(case, reset, timeout)
- def run_multi_dev_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
- """
- Run a specific multi_device case
- Notes:
- Will skip if the case type is not multi_device
- Args:
- case: the specific case that parsed in test menu
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- if case.type == 'multi_device' and len(self.group) > 1:
- if reset:
- for dev_res in self.group:
- dev_res.dut.serial.hard_reset()
- # delay a few seconds to make sure the duts are ready.
- time.sleep(5)
- for sub_case in case.subcases:
- if isinstance(sub_case['index'], str):
- index = int(sub_case['index'], 10)
- else:
- index = sub_case['index']
- self._start_sub_case_process(dev_res=self.group[index - 1], case=case,
- sub_case_index=index, timeout=timeout)
- # Waiting all the devices to finish their test cases
- self._wait_multi_dev_case_finish(timeout=timeout)
- class CaseTester(NormalCaseTester, MultiStageCaseTester, MultiDevCaseTester):
- """
- The Generic tester of all the types
- Attributes:
- group (List[MultiDevResource]): The group of the devices' resources
- dut (Dut): The first dut if there is more than one
- test_menu (List[UnittestMenuCase]): The list of the cases
- """
- def run_all_cases(self, reset: bool = False, timeout: int = 60) -> None:
- """
- Run all cases
- Args:
- reset: whether do a hardware reset before running the case
- timeout: timeout in second
- """
- for case in self.test_menu:
- self.run_case(case, reset, timeout=timeout)
- def run_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
- """
- Run a specific case
- Args:
- case: the specific case that parsed in test menu
- reset: whether do a hardware reset before running the case
- timeout: timeout in second, the case's timeout attribute has a higher priority than this param.
- """
- _timeout = int(case.attributes.get('timeout', timeout))
- if case.type == 'normal':
- self.run_normal_case(case, reset, timeout=_timeout)
- elif case.type == 'multi_stage':
- self.run_multi_stage_case(case, reset, timeout=_timeout)
- elif case.type == 'multi_device':
- # here we always do a hard reset between test cases
- # since the buffer can't be kept between test cases (which run in different processes)
- self.run_multi_dev_case(case, reset=True, timeout=_timeout)
|