idf_unity_tester.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import os
  5. import time
  6. from multiprocessing import Manager, Process, Semaphore
  7. from multiprocessing.managers import SyncManager
  8. from typing import List, Union
  9. from pexpect.exceptions import TIMEOUT
  10. from pytest_embedded import Dut, unity, utils
  11. from pytest_embedded_idf.dut import UnittestMenuCase
  12. class BaseTester:
  13. """
  14. The base class that providing shared methods
  15. Attributes:
  16. dut (Dut): Object of the Device under test
  17. test_menu (List[UnittestMenuCase]): The list of the cases
  18. retry_times (int): The retry times when failed to start a case
  19. args (Any): Not used
  20. """
  21. # The patterns that indicate the runner is ready come from 'unity_runner.c'
  22. ready_pattern_list = ['Press ENTER to see the list of tests',
  23. 'Enter test for running',
  24. 'Enter next test, or \'enter\' to see menu']
  25. def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
  26. self.retry_times = 30
  27. if isinstance(dut, List):
  28. for item in dut:
  29. if isinstance(item, Dut):
  30. self.dut = item
  31. break
  32. else:
  33. self.dut = dut
  34. for k, v in kwargs.items():
  35. setattr(self, k, v)
  36. if 'test_menu' not in kwargs:
  37. self.get_test_menu()
  38. def get_test_menu(self) -> None:
  39. """
  40. Get the test menu of this test app
  41. Notes:
  42. It will do a hard reset after getting the test menu to ensure
  43. the patterns that indicate the case is ready not taken by the parser.
  44. Please use this function to get the test menu while using this script
  45. """
  46. self.dut.write('')
  47. self.test_menu = self.dut.parse_test_menu()
  48. self.dut.serial.hard_reset()
  49. class NormalCaseTester(BaseTester):
  50. """
  51. Tester of normal type case
  52. Attributes:
  53. dut (Dut): Object of the Device under test
  54. test_menu (List[UnittestMenuCase]): The list of the cases
  55. retry_times (int): The retry times when failed to start a case
  56. args (Any): Not used
  57. """
  58. def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None:
  59. """
  60. Run all normal cases
  61. Args:
  62. reset: whether do a hardware reset before running the case
  63. timeout: timeout in second
  64. """
  65. for case in self.test_menu:
  66. self.run_normal_case(case, reset, timeout=timeout)
  67. def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
  68. """
  69. Run a specific normal case
  70. Notes:
  71. Will skip if the case type is not normal
  72. Args:
  73. case: the specific case that parsed in test menu
  74. reset: whether do a hardware reset before running the case
  75. timeout: timeout in second
  76. """
  77. if case.type == 'normal':
  78. if reset:
  79. self.dut.serial.hard_reset()
  80. self.dut.expect(self.ready_pattern_list, timeout=timeout)
  81. # Retry if write not success
  82. for retry in range(self.retry_times):
  83. self.dut.write(str(case.index))
  84. try:
  85. self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
  86. break
  87. except TIMEOUT as e:
  88. if retry >= self.retry_times - 1:
  89. raise e
  90. self.dut.expect_unity_test_output(timeout=timeout)
  91. class MultiStageCaseTester(BaseTester):
  92. """
  93. Tester of multiple stage type case
  94. Attributes:
  95. dut (Dut): Object of the Device under test
  96. test_menu (List[UnittestMenuCase]): The list of the cases
  97. retry_times (int): The retry times when failed to start a case
  98. args (Any): Not used
  99. """
  100. def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None:
  101. """
  102. Run all multi_stage cases
  103. Args:
  104. reset: whether do a hardware reset before running the case
  105. timeout: timeout in second
  106. """
  107. for case in self.test_menu:
  108. self.run_multi_stage_case(case, reset, timeout=timeout)
  109. def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
  110. """
  111. Run a specific multi_stage case
  112. Notes:
  113. Will skip if the case type is not multi_stage
  114. Args:
  115. case: the specific case that parsed in test menu
  116. reset: whether do a hardware reset before running the case
  117. timeout: timeout in second
  118. """
  119. if case.type == 'multi_stage':
  120. if reset:
  121. self.dut.serial.hard_reset()
  122. for sub_case in case.subcases:
  123. self.dut.expect(self.ready_pattern_list, timeout=timeout)
  124. # Retry if write not success
  125. for retry in range(self.retry_times):
  126. self.dut.write(str(case.index))
  127. try:
  128. self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
  129. break
  130. except TIMEOUT as e:
  131. if retry >= self.retry_times - 1:
  132. raise e
  133. self.dut.write(str(sub_case['index']))
  134. self.dut.expect_unity_test_output(timeout=timeout)
  135. class MultiDevResource:
  136. """
  137. Resources of multi_dev dut
  138. Attributes:
  139. dut (Dut): Object of the Device under test
  140. sem (Semaphore): Semaphore of monitoring whether the case finished
  141. recv_sig (List[str]): The list of received signals from other dut
  142. thread (Process): The thread of monitoring the signals
  143. """
  144. def __init__(self, dut: Dut, manager: SyncManager) -> None:
  145. self.dut = dut
  146. self.sem = Semaphore()
  147. self.recv_sig = manager.list() # type: list[str]
  148. self.process: Process = None # type: ignore
  149. class MultiDevCaseTester(BaseTester):
  150. """
  151. Tester of multi_device case
  152. Attributes:
  153. group (List[MultiDevResource]): The group of the devices' resources
  154. dut (Dut): The first dut, mainly used to get the test menu only
  155. test_menu (List[UnittestMenuCase]): The list of the cases
  156. retry_times (int): The retry times when failed to start a case
  157. """
  158. # The signal pattens come from 'test_utils.c'
  159. SEND_SIGNAL_PREFIX = 'Send signal: '
  160. WAIT_SIGNAL_PREFIX = 'Waiting for signal: '
  161. UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!'
  162. UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!'
  163. def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
  164. """
  165. Create the object for every dut and put them into the group
  166. """
  167. super().__init__(dut, **kwargs)
  168. self._manager = Manager()
  169. self.group: List[MultiDevResource] = []
  170. if isinstance(dut, List):
  171. for item in dut:
  172. if isinstance(item, Dut):
  173. dev_res = MultiDevResource(item, self._manager)
  174. self.group.append(dev_res)
  175. else:
  176. dev_res = MultiDevResource(dut, self._manager)
  177. self.group.append(dev_res)
  178. def _wait_multi_dev_case_finish(self, timeout: int = 60) -> None:
  179. """
  180. Wait until all the sub-cases of this multi_device case finished
  181. """
  182. for d in self.group:
  183. if d.sem.acquire(timeout=timeout):
  184. d.sem.release()
  185. else:
  186. raise TimeoutError('Wait case to finish timeout')
  187. def _start_sub_case_process(self, dev_res: MultiDevResource, case: UnittestMenuCase, sub_case_index: int, timeout: int = 60) -> None:
  188. """
  189. Start the thread monitoring on the corresponding dut of the sub-case
  190. """
  191. # Allocate the kwargs that pass to '_run'
  192. _kwargs = {}
  193. _kwargs['dut'] = dev_res.dut
  194. _kwargs['dev_res'] = dev_res
  195. _kwargs['case'] = case
  196. _kwargs['sub_case_index'] = sub_case_index
  197. _kwargs['timeout'] = timeout
  198. # Create the thread of the sub-case
  199. dev_res.process = Process(target=self._run, kwargs=_kwargs, daemon=True)
  200. dev_res.process.start()
  201. # Process starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
  202. dev_res.sem.acquire()
  203. def _run(self, **kwargs) -> None: # type: ignore
  204. """
  205. The thread target function
  206. Will run for each case on each dut
  207. Call the wrapped function to trigger the case
  208. Then keep listening on the dut for the signal
  209. - If the dut send a signal, it will be put into others' recv_sig
  210. - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires
  211. - If the dut finished running the case, it will quite the loop and terminate the thread
  212. """
  213. signal_pattern_list = [
  214. self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal
  215. self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal
  216. unity.UNITY_SUMMARY_LINE_REGEX, # Means the case finished
  217. ]
  218. dut = kwargs['dut']
  219. dev_res = kwargs['dev_res']
  220. case = kwargs['case']
  221. sub_case_index = kwargs['sub_case_index']
  222. timeout = kwargs['timeout']
  223. # Start the case
  224. dut.expect(self.ready_pattern_list)
  225. # Retry at most 30 times if not write successfully
  226. for retry in range(self.retry_times):
  227. dut.write(str(case.index))
  228. try:
  229. dut.expect_exact('Running {}...'.format(case.name), timeout=10)
  230. break
  231. except TIMEOUT as e:
  232. if retry >= self.retry_times - 1:
  233. dev_res.sem.release()
  234. raise e
  235. dut.write(str(sub_case_index))
  236. # Wait for the specific patterns, only exist when the sub-case finished
  237. while True:
  238. pat = dut.expect(signal_pattern_list, timeout=timeout)
  239. if pat is not None:
  240. match_str = pat.group().decode('utf-8')
  241. # Send a signal
  242. if match_str.find(self.SEND_SIGNAL_PREFIX) >= 0:
  243. send_sig = pat.group(1).decode('utf-8')
  244. for d in self.group:
  245. d.recv_sig.append(send_sig)
  246. # Waiting for a signal
  247. elif match_str.find(self.WAIT_SIGNAL_PREFIX) >= 0:
  248. wait_sig = pat.group(1).decode('utf-8')
  249. while True:
  250. if wait_sig in dev_res.recv_sig:
  251. dev_res.recv_sig.remove(wait_sig)
  252. dut.write('')
  253. break
  254. # Keep waiting the signal
  255. else:
  256. time.sleep(0.1)
  257. # Case finished
  258. elif match_str.find('Tests') >= 0:
  259. log = utils.remove_asci_color_code(dut.pexpect_proc.before)
  260. dut.testsuite.add_unity_test_cases(log)
  261. break
  262. # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish'
  263. #
  264. # Manually to create the real test case junit report
  265. # The child process attributes won't be reflected to the parent one.
  266. junit_report = os.path.splitext(dut.logfile)[0] + f'_{case.index}_{sub_case_index}.xml'
  267. dut.testsuite.dump(junit_report)
  268. logging.info(f'Created unity output junit report: {junit_report}')
  269. dev_res.sem.release()
  270. def run_all_multi_dev_cases(self, reset: bool = False, timeout: int = 60) -> None:
  271. """
  272. Run only multi_device cases
  273. Args:
  274. reset: whether do a hardware reset before running the case
  275. timeout: timeout in second
  276. """
  277. for case in self.test_menu:
  278. # Run multi_device case on every device
  279. self.run_multi_dev_case(case, reset, timeout)
  280. def run_multi_dev_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
  281. """
  282. Run a specific multi_device case
  283. Notes:
  284. Will skip if the case type is not multi_device
  285. Args:
  286. case: the specific case that parsed in test menu
  287. reset: whether do a hardware reset before running the case
  288. timeout: timeout in second
  289. """
  290. if case.type == 'multi_device' and len(self.group) > 1:
  291. if reset:
  292. for dev_res in self.group:
  293. dev_res.dut.serial.hard_reset()
  294. # delay a few seconds to make sure the duts are ready.
  295. time.sleep(5)
  296. for sub_case in case.subcases:
  297. if isinstance(sub_case['index'], str):
  298. index = int(sub_case['index'], 10)
  299. else:
  300. index = sub_case['index']
  301. self._start_sub_case_process(dev_res=self.group[index - 1], case=case,
  302. sub_case_index=index, timeout=timeout)
  303. # Waiting all the devices to finish their test cases
  304. self._wait_multi_dev_case_finish(timeout=timeout)
  305. class CaseTester(NormalCaseTester, MultiStageCaseTester, MultiDevCaseTester):
  306. """
  307. The Generic tester of all the types
  308. Attributes:
  309. group (List[MultiDevResource]): The group of the devices' resources
  310. dut (Dut): The first dut if there is more than one
  311. test_menu (List[UnittestMenuCase]): The list of the cases
  312. """
  313. def run_all_cases(self, reset: bool = False, timeout: int = 60) -> None:
  314. """
  315. Run all cases
  316. Args:
  317. reset: whether do a hardware reset before running the case
  318. timeout: timeout in second
  319. """
  320. for case in self.test_menu:
  321. self.run_case(case, reset, timeout=timeout)
  322. def run_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
  323. """
  324. Run a specific case
  325. Args:
  326. case: the specific case that parsed in test menu
  327. reset: whether do a hardware reset before running the case
  328. timeout: timeout in second, the case's timeout attribute has a higher priority than this param.
  329. """
  330. _timeout = int(case.attributes.get('timeout', timeout))
  331. if case.type == 'normal':
  332. self.run_normal_case(case, reset, timeout=_timeout)
  333. elif case.type == 'multi_stage':
  334. self.run_multi_stage_case(case, reset, timeout=_timeout)
  335. elif case.type == 'multi_device':
  336. # here we always do a hard reset between test cases
  337. # since the buffer can't be kept between test cases (which run in different processes)
  338. self.run_multi_dev_case(case, reset=True, timeout=_timeout)