idf_unity_tester.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import time
  4. from threading import Semaphore, Thread
  5. from typing import List, Union
  6. from pexpect.exceptions import TIMEOUT
  7. from pytest_embedded import Dut, unity, utils
  8. from pytest_embedded_idf.dut import UnittestMenuCase
  9. class BaseTester:
  10. """
  11. The base class that providing shared methods
  12. Attributes:
  13. dut (Dut): Object of the Device under test
  14. test_menu (List[UnittestMenuCase]): The list of the cases
  15. retry_times (int): The retry times when failed to start a case
  16. args (Any): Not used
  17. """
  18. # The patterns that indicate the runner is ready come from 'unity_runner.c'
  19. ready_pattern_list = ['Press ENTER to see the list of tests',
  20. 'Enter test for running',
  21. 'Enter next test, or \'enter\' to see menu']
  22. def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
  23. self.retry_times = 30
  24. if isinstance(dut, List):
  25. for item in dut:
  26. if isinstance(item, Dut):
  27. self.dut = item
  28. break
  29. else:
  30. self.dut = dut
  31. for k, v in kwargs.items():
  32. setattr(self, k, v)
  33. if 'test_menu' not in kwargs:
  34. self.get_test_menu()
  35. def get_test_menu(self) -> None:
  36. """
  37. Get the test menu of this test app
  38. Notes:
  39. It will do a hard reset after getting the test menu to ensure
  40. the patterns that indicate the case is ready not taken by the parser.
  41. Please use this function to get the test menu while using this script
  42. """
  43. self.dut.write('')
  44. self.test_menu = self.dut.parse_test_menu()
  45. self.dut.serial.hard_reset()
  46. class NormalCaseTester(BaseTester):
  47. """
  48. Tester of normal type case
  49. Attributes:
  50. dut (Dut): Object of the Device under test
  51. test_menu (List[UnittestMenuCase]): The list of the cases
  52. retry_times (int): The retry times when failed to start a case
  53. args (Any): Not used
  54. """
  55. def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None:
  56. """
  57. Run all normal cases
  58. Args:
  59. reset: whether do a hardware reset before running the case
  60. timeout: timeout in second
  61. """
  62. for case in self.test_menu:
  63. self.run_normal_case(case, reset, timeout=timeout)
  64. def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
  65. """
  66. Run a specific normal case
  67. Notes:
  68. Will skip if the case type is not normal
  69. Args:
  70. case: the specific case that parsed in test menu
  71. reset: whether do a hardware reset before running the case
  72. timeout: timeout in second
  73. """
  74. if case.type == 'normal':
  75. if reset:
  76. self.dut.serial.hard_reset()
  77. self.dut.expect(self.ready_pattern_list, timeout=timeout)
  78. # Retry if write not success
  79. for retry in range(self.retry_times):
  80. self.dut.write(str(case.index))
  81. try:
  82. self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
  83. break
  84. except TIMEOUT as e:
  85. if retry >= self.retry_times - 1:
  86. raise e
  87. self.dut.expect_unity_test_output(timeout=timeout)
  88. class MultiStageCaseTester(BaseTester):
  89. """
  90. Tester of multiple stage type case
  91. Attributes:
  92. dut (Dut): Object of the Device under test
  93. test_menu (List[UnittestMenuCase]): The list of the cases
  94. retry_times (int): The retry times when failed to start a case
  95. args (Any): Not used
  96. """
  97. def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None:
  98. """
  99. Run all multi_stage cases
  100. Args:
  101. reset: whether do a hardware reset before running the case
  102. timeout: timeout in second
  103. """
  104. for case in self.test_menu:
  105. self.run_multi_stage_case(case, reset, timeout=timeout)
  106. def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
  107. """
  108. Run a specific multi_stage case
  109. Notes:
  110. Will skip if the case type is not multi_stage
  111. Args:
  112. case: the specific case that parsed in test menu
  113. reset: whether do a hardware reset before running the case
  114. timeout: timeout in second
  115. """
  116. if case.type == 'multi_stage':
  117. if reset:
  118. self.dut.serial.hard_reset()
  119. for sub_case in case.subcases:
  120. self.dut.expect(self.ready_pattern_list, timeout=timeout)
  121. # Retry if write not success
  122. for retry in range(self.retry_times):
  123. self.dut.write(str(case.index))
  124. try:
  125. self.dut.expect_exact(case.name, timeout=1)
  126. break
  127. except TIMEOUT as e:
  128. if retry >= self.retry_times - 1:
  129. raise e
  130. self.dut.write(str(sub_case['index']))
  131. self.dut.expect_unity_test_output(timeout=timeout)
  132. class MultiDevResource:
  133. """
  134. Resources of multi_dev dut
  135. Attributes:
  136. dut (Dut): Object of the Device under test
  137. sem (Semaphore): Semaphore of monitoring whether the case finished
  138. recv_sig (List[str]): The list of received signals from other dut
  139. thread (Thread): The thread of monitoring the signals
  140. """
  141. def __init__(self, dut: Dut) -> None:
  142. self.dut = dut
  143. self.sem = Semaphore()
  144. self.recv_sig: List[str] = []
  145. self.thread: Thread = None # type: ignore
  146. class MultiDevCaseTester(BaseTester):
  147. """
  148. Tester of multi_device case
  149. Attributes:
  150. group (List[MultiDevResource]): The group of the devices' resources
  151. dut (Dut): The first dut, mainly used to get the test menu only
  152. test_menu (List[UnittestMenuCase]): The list of the cases
  153. retry_times (int): The retry times when failed to start a case
  154. """
  155. # The signal pattens come from 'test_utils.c'
  156. SEND_SIGNAL_PREFIX = 'Send signal: '
  157. WAIT_SIGNAL_PREFIX = 'Waiting for signal: '
  158. UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!'
  159. UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!'
  160. def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
  161. """
  162. Create the object for every dut and put them into the group
  163. """
  164. super().__init__(dut, **kwargs)
  165. self.group: List[MultiDevResource] = []
  166. if isinstance(dut, List):
  167. for item in dut:
  168. if isinstance(item, Dut):
  169. dev_res = MultiDevResource(item)
  170. self.group.append(dev_res)
  171. else:
  172. dev_res = MultiDevResource(dut)
  173. self.group.append(dev_res)
  174. def _wait_multi_dev_case_finish(self, timeout: int = 60) -> None:
  175. """
  176. Wait until all the sub-cases of this multi_device case finished
  177. """
  178. for d in self.group:
  179. if d.sem.acquire(timeout=timeout):
  180. d.sem.release()
  181. else:
  182. raise TimeoutError('Wait case to finish timeout')
  183. def _start_sub_case_thread(self, dev_res: MultiDevResource, case: UnittestMenuCase, sub_case_index: int, timeout: int = 60) -> None:
  184. """
  185. Start the thread monitoring on the corresponding dut of the sub-case
  186. """
  187. # Allocate the kwargs that pass to '_run'
  188. _kwargs = {}
  189. _kwargs['dut'] = dev_res.dut
  190. _kwargs['dev_res'] = dev_res
  191. _kwargs['case'] = case
  192. _kwargs['sub_case_index'] = sub_case_index
  193. _kwargs['timeout'] = timeout
  194. # Create the thread of the sub-case
  195. dev_res.thread = Thread(target=self._run, kwargs=_kwargs, daemon=True)
  196. dev_res.thread.start()
  197. # Thread starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
  198. dev_res.sem.acquire()
  199. def _run(self, **kwargs) -> None: # type: ignore
  200. """
  201. The thread target function
  202. Will run for each case on each dut
  203. Call the wrapped function to trigger the case
  204. Then keep listening on the dut for the signal
  205. - If the dut send a signal, it will be put into others' recv_sig
  206. - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires
  207. - If the dut finished running the case, it will quite the loop and terminate the thread
  208. """
  209. signal_pattern_list = [
  210. self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal
  211. self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal
  212. unity.UNITY_SUMMARY_LINE_REGEX, # Means the case finished
  213. ]
  214. dut = kwargs['dut']
  215. dev_res = kwargs['dev_res']
  216. case = kwargs['case']
  217. sub_case_index = kwargs['sub_case_index']
  218. timeout = kwargs['timeout']
  219. # Start the case
  220. dut.expect(self.ready_pattern_list)
  221. # Retry at most 30 times if not write successfully
  222. for retry in range(self.retry_times):
  223. dut.write(str(case.index))
  224. try:
  225. dut.expect_exact(case.name, timeout=1)
  226. break
  227. except TIMEOUT as e:
  228. if retry >= self.retry_times - 1:
  229. dev_res.sem.release()
  230. raise e
  231. dut.write(str(sub_case_index))
  232. # Wait for the specific patterns, only exist when the sub-case finished
  233. while True:
  234. pat = dut.expect(signal_pattern_list, timeout=timeout)
  235. if pat is not None:
  236. match_str = pat.group().decode('utf-8')
  237. # Send a signal
  238. if match_str.find(self.SEND_SIGNAL_PREFIX) >= 0:
  239. send_sig = pat.group(1).decode('utf-8')
  240. for d in self.group:
  241. d.recv_sig.append(send_sig)
  242. # Waiting for a signal
  243. elif match_str.find(self.WAIT_SIGNAL_PREFIX) >= 0:
  244. wait_sig = pat.group(1).decode('utf-8')
  245. while True:
  246. if wait_sig in dev_res.recv_sig:
  247. dev_res.recv_sig.remove(wait_sig)
  248. dut.write('')
  249. break
  250. # Keep waiting the signal
  251. else:
  252. time.sleep(0.1)
  253. # Case finished
  254. elif match_str.find('Tests') >= 0:
  255. log = utils.remove_asci_color_code(dut.pexpect_proc.before)
  256. dut.testsuite.add_unity_test_cases(log)
  257. break
  258. # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish'
  259. dev_res.sem.release()
  260. def run_all_multi_dev_cases(self, reset: bool = False, timeout: int = 60) -> None:
  261. """
  262. Run only multi_device cases
  263. Args:
  264. reset: whether do a hardware reset before running the case
  265. timeout: timeout in second
  266. """
  267. for case in self.test_menu:
  268. # Run multi_device case on every device
  269. self.run_multi_dev_case(case, reset, timeout)
  270. def run_multi_dev_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
  271. """
  272. Run a specific multi_device case
  273. Notes:
  274. Will skip if the case type is not multi_device
  275. Args:
  276. case: the specific case that parsed in test menu
  277. reset: whether do a hardware reset before running the case
  278. timeout: timeout in second
  279. """
  280. if case.type == 'multi_device' and len(self.group) > 1:
  281. if reset:
  282. for dev_res in self.group:
  283. dev_res.dut.serial.hard_reset()
  284. for sub_case in case.subcases:
  285. if isinstance(sub_case['index'], str):
  286. index = int(sub_case['index'], 10)
  287. else:
  288. index = sub_case['index']
  289. self._start_sub_case_thread(dev_res=self.group[index - 1], case=case,
  290. sub_case_index=index, timeout=timeout)
  291. # Waiting all the devices to finish their test cases
  292. self._wait_multi_dev_case_finish(timeout=timeout)
  293. class CaseTester(NormalCaseTester, MultiStageCaseTester, MultiDevCaseTester):
  294. """
  295. The Generic tester of all the types
  296. Attributes:
  297. group (List[MultiDevResource]): The group of the devices' resources
  298. dut (Dut): The first dut if there is more than one
  299. test_menu (List[UnittestMenuCase]): The list of the cases
  300. """
  301. def run_all_cases(self, reset: bool = False, timeout: int = 60) -> None:
  302. """
  303. Run all cases
  304. Args:
  305. reset: whether do a hardware reset before running the case
  306. timeout: timeout in second
  307. """
  308. for case in self.test_menu:
  309. self.run_case(case, reset, timeout=timeout)
  310. def run_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
  311. """
  312. Run a specific case
  313. Args:
  314. case: the specific case that parsed in test menu
  315. reset: whether do a hardware reset before running the case
  316. timeout: timeout in second, the case's timeout attribute has a higher priority than this param.
  317. """
  318. _timeout = int(case.attributes.get('timeout', timeout))
  319. if case.type == 'normal':
  320. self.run_normal_case(case, reset, timeout=_timeout)
  321. elif case.type == 'multi_stage':
  322. self.run_multi_stage_case(case, reset, timeout=_timeout)
  323. elif case.type == 'multi_device':
  324. self.run_multi_dev_case(case, reset, timeout=_timeout)