idf_unity_tester.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import time
  4. from multiprocessing import Manager, Process, Semaphore
  5. from multiprocessing.managers import SyncManager
  6. from typing import List, Union
  7. from pexpect.exceptions import TIMEOUT
  8. from pytest_embedded import Dut, unity, utils
  9. from pytest_embedded_idf.dut import UnittestMenuCase
  10. class BaseTester:
  11. """
  12. The base class that providing shared methods
  13. Attributes:
  14. dut (Dut): Object of the Device under test
  15. test_menu (List[UnittestMenuCase]): The list of the cases
  16. retry_times (int): The retry times when failed to start a case
  17. args (Any): Not used
  18. """
  19. # The patterns that indicate the runner is ready come from 'unity_runner.c'
  20. ready_pattern_list = ['Press ENTER to see the list of tests',
  21. 'Enter test for running',
  22. 'Enter next test, or \'enter\' to see menu']
  23. def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
  24. self.retry_times = 30
  25. if isinstance(dut, List):
  26. for item in dut:
  27. if isinstance(item, Dut):
  28. self.dut = item
  29. break
  30. else:
  31. self.dut = dut
  32. for k, v in kwargs.items():
  33. setattr(self, k, v)
  34. if 'test_menu' not in kwargs:
  35. self.get_test_menu()
  36. def get_test_menu(self) -> None:
  37. """
  38. Get the test menu of this test app
  39. Notes:
  40. It will do a hard reset after getting the test menu to ensure
  41. the patterns that indicate the case is ready not taken by the parser.
  42. Please use this function to get the test menu while using this script
  43. """
  44. self.dut.write('')
  45. self.test_menu = self.dut.parse_test_menu()
  46. self.dut.serial.hard_reset()
  47. class NormalCaseTester(BaseTester):
  48. """
  49. Tester of normal type case
  50. Attributes:
  51. dut (Dut): Object of the Device under test
  52. test_menu (List[UnittestMenuCase]): The list of the cases
  53. retry_times (int): The retry times when failed to start a case
  54. args (Any): Not used
  55. """
  56. def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None:
  57. """
  58. Run all normal cases
  59. Args:
  60. reset: whether do a hardware reset before running the case
  61. timeout: timeout in second
  62. """
  63. for case in self.test_menu:
  64. self.run_normal_case(case, reset, timeout=timeout)
  65. def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
  66. """
  67. Run a specific normal case
  68. Notes:
  69. Will skip if the case type is not normal
  70. Args:
  71. case: the specific case that parsed in test menu
  72. reset: whether do a hardware reset before running the case
  73. timeout: timeout in second
  74. """
  75. if case.type == 'normal':
  76. if reset:
  77. self.dut.serial.hard_reset()
  78. self.dut.expect(self.ready_pattern_list, timeout=timeout)
  79. # Retry if write not success
  80. for retry in range(self.retry_times):
  81. self.dut.write(str(case.index))
  82. try:
  83. self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
  84. break
  85. except TIMEOUT as e:
  86. if retry >= self.retry_times - 1:
  87. raise e
  88. self.dut.expect_unity_test_output(timeout=timeout)
  89. class MultiStageCaseTester(BaseTester):
  90. """
  91. Tester of multiple stage type case
  92. Attributes:
  93. dut (Dut): Object of the Device under test
  94. test_menu (List[UnittestMenuCase]): The list of the cases
  95. retry_times (int): The retry times when failed to start a case
  96. args (Any): Not used
  97. """
  98. def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None:
  99. """
  100. Run all multi_stage cases
  101. Args:
  102. reset: whether do a hardware reset before running the case
  103. timeout: timeout in second
  104. """
  105. for case in self.test_menu:
  106. self.run_multi_stage_case(case, reset, timeout=timeout)
  107. def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
  108. """
  109. Run a specific multi_stage case
  110. Notes:
  111. Will skip if the case type is not multi_stage
  112. Args:
  113. case: the specific case that parsed in test menu
  114. reset: whether do a hardware reset before running the case
  115. timeout: timeout in second
  116. """
  117. if case.type == 'multi_stage':
  118. if reset:
  119. self.dut.serial.hard_reset()
  120. for sub_case in case.subcases:
  121. self.dut.expect(self.ready_pattern_list, timeout=timeout)
  122. # Retry if write not success
  123. for retry in range(self.retry_times):
  124. self.dut.write(str(case.index))
  125. try:
  126. self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
  127. break
  128. except TIMEOUT as e:
  129. if retry >= self.retry_times - 1:
  130. raise e
  131. self.dut.write(str(sub_case['index']))
  132. self.dut.expect_unity_test_output(timeout=timeout)
  133. class MultiDevResource:
  134. """
  135. Resources of multi_dev dut
  136. Attributes:
  137. dut (Dut): Object of the Device under test
  138. sem (Semaphore): Semaphore of monitoring whether the case finished
  139. recv_sig (List[str]): The list of received signals from other dut
  140. thread (Process): The thread of monitoring the signals
  141. """
  142. def __init__(self, dut: Dut, manager: SyncManager) -> None:
  143. self.dut = dut
  144. self.sem = Semaphore()
  145. self.recv_sig = manager.list() # type: list[str]
  146. self.process: Process = None # type: ignore
  147. class MultiDevCaseTester(BaseTester):
  148. """
  149. Tester of multi_device case
  150. Attributes:
  151. group (List[MultiDevResource]): The group of the devices' resources
  152. dut (Dut): The first dut, mainly used to get the test menu only
  153. test_menu (List[UnittestMenuCase]): The list of the cases
  154. retry_times (int): The retry times when failed to start a case
  155. """
  156. # The signal pattens come from 'test_utils.c'
  157. SEND_SIGNAL_PREFIX = 'Send signal: '
  158. WAIT_SIGNAL_PREFIX = 'Waiting for signal: '
  159. UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!'
  160. UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!'
  161. def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
  162. """
  163. Create the object for every dut and put them into the group
  164. """
  165. super().__init__(dut, **kwargs)
  166. self._manager = Manager()
  167. self.group: List[MultiDevResource] = []
  168. if isinstance(dut, List):
  169. for item in dut:
  170. if isinstance(item, Dut):
  171. dev_res = MultiDevResource(item, self._manager)
  172. self.group.append(dev_res)
  173. else:
  174. dev_res = MultiDevResource(dut, self._manager)
  175. self.group.append(dev_res)
  176. def _wait_multi_dev_case_finish(self, timeout: int = 60) -> None:
  177. """
  178. Wait until all the sub-cases of this multi_device case finished
  179. """
  180. for d in self.group:
  181. if d.sem.acquire(timeout=timeout):
  182. d.sem.release()
  183. else:
  184. raise TimeoutError('Wait case to finish timeout')
  185. def _start_sub_case_process(self, dev_res: MultiDevResource, case: UnittestMenuCase, sub_case_index: int, timeout: int = 60) -> None:
  186. """
  187. Start the thread monitoring on the corresponding dut of the sub-case
  188. """
  189. # Allocate the kwargs that pass to '_run'
  190. _kwargs = {}
  191. _kwargs['dut'] = dev_res.dut
  192. _kwargs['dev_res'] = dev_res
  193. _kwargs['case'] = case
  194. _kwargs['sub_case_index'] = sub_case_index
  195. _kwargs['timeout'] = timeout
  196. # Create the thread of the sub-case
  197. dev_res.process = Process(target=self._run, kwargs=_kwargs, daemon=True)
  198. dev_res.process.start()
  199. # Process starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
  200. dev_res.sem.acquire()
  201. def _run(self, **kwargs) -> None: # type: ignore
  202. """
  203. The thread target function
  204. Will run for each case on each dut
  205. Call the wrapped function to trigger the case
  206. Then keep listening on the dut for the signal
  207. - If the dut send a signal, it will be put into others' recv_sig
  208. - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires
  209. - If the dut finished running the case, it will quite the loop and terminate the thread
  210. """
  211. signal_pattern_list = [
  212. self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal
  213. self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal
  214. unity.UNITY_SUMMARY_LINE_REGEX, # Means the case finished
  215. ]
  216. dut = kwargs['dut']
  217. dev_res = kwargs['dev_res']
  218. case = kwargs['case']
  219. sub_case_index = kwargs['sub_case_index']
  220. timeout = kwargs['timeout']
  221. # Start the case
  222. dut.expect(self.ready_pattern_list)
  223. # Retry at most 30 times if not write successfully
  224. for retry in range(self.retry_times):
  225. dut.write(str(case.index))
  226. try:
  227. dut.expect_exact('Running {}...'.format(case.name), timeout=10)
  228. break
  229. except TIMEOUT as e:
  230. if retry >= self.retry_times - 1:
  231. dev_res.sem.release()
  232. raise e
  233. dut.write(str(sub_case_index))
  234. # Wait for the specific patterns, only exist when the sub-case finished
  235. while True:
  236. pat = dut.expect(signal_pattern_list, timeout=timeout)
  237. if pat is not None:
  238. match_str = pat.group().decode('utf-8')
  239. # Send a signal
  240. if match_str.find(self.SEND_SIGNAL_PREFIX) >= 0:
  241. send_sig = pat.group(1).decode('utf-8')
  242. for d in self.group:
  243. d.recv_sig.append(send_sig)
  244. # Waiting for a signal
  245. elif match_str.find(self.WAIT_SIGNAL_PREFIX) >= 0:
  246. wait_sig = pat.group(1).decode('utf-8')
  247. while True:
  248. if wait_sig in dev_res.recv_sig:
  249. dev_res.recv_sig.remove(wait_sig)
  250. dut.write('')
  251. break
  252. # Keep waiting the signal
  253. else:
  254. time.sleep(0.1)
  255. # Case finished
  256. elif match_str.find('Tests') >= 0:
  257. log = utils.remove_asci_color_code(dut.pexpect_proc.before)
  258. dut.testsuite.add_unity_test_cases(log)
  259. break
  260. # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish'
  261. dev_res.sem.release()
  262. def run_all_multi_dev_cases(self, reset: bool = False, timeout: int = 60) -> None:
  263. """
  264. Run only multi_device cases
  265. Args:
  266. reset: whether do a hardware reset before running the case
  267. timeout: timeout in second
  268. """
  269. for case in self.test_menu:
  270. # Run multi_device case on every device
  271. self.run_multi_dev_case(case, reset, timeout)
  272. def run_multi_dev_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
  273. """
  274. Run a specific multi_device case
  275. Notes:
  276. Will skip if the case type is not multi_device
  277. Args:
  278. case: the specific case that parsed in test menu
  279. reset: whether do a hardware reset before running the case
  280. timeout: timeout in second
  281. """
  282. if case.type == 'multi_device' and len(self.group) > 1:
  283. if reset:
  284. for dev_res in self.group:
  285. dev_res.dut.serial.hard_reset()
  286. for sub_case in case.subcases:
  287. if isinstance(sub_case['index'], str):
  288. index = int(sub_case['index'], 10)
  289. else:
  290. index = sub_case['index']
  291. self._start_sub_case_process(dev_res=self.group[index - 1], case=case,
  292. sub_case_index=index, timeout=timeout)
  293. # Waiting all the devices to finish their test cases
  294. self._wait_multi_dev_case_finish(timeout=timeout)
  295. class CaseTester(NormalCaseTester, MultiStageCaseTester, MultiDevCaseTester):
  296. """
  297. The Generic tester of all the types
  298. Attributes:
  299. group (List[MultiDevResource]): The group of the devices' resources
  300. dut (Dut): The first dut if there is more than one
  301. test_menu (List[UnittestMenuCase]): The list of the cases
  302. """
  303. def run_all_cases(self, reset: bool = False, timeout: int = 60) -> None:
  304. """
  305. Run all cases
  306. Args:
  307. reset: whether do a hardware reset before running the case
  308. timeout: timeout in second
  309. """
  310. for case in self.test_menu:
  311. self.run_case(case, reset, timeout=timeout)
  312. def run_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
  313. """
  314. Run a specific case
  315. Args:
  316. case: the specific case that parsed in test menu
  317. reset: whether do a hardware reset before running the case
  318. timeout: timeout in second, the case's timeout attribute has a higher priority than this param.
  319. """
  320. _timeout = int(case.attributes.get('timeout', timeout))
  321. if case.type == 'normal':
  322. self.run_normal_case(case, reset, timeout=_timeout)
  323. elif case.type == 'multi_stage':
  324. self.run_multi_stage_case(case, reset, timeout=_timeout)
  325. elif case.type == 'multi_device':
  326. # here we always do a hard reset between test cases
  327. # since the buffer can't be kept between test cases (which run in different processes)
  328. self.run_multi_dev_case(case, reset=True, timeout=_timeout)