plugin.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import os
  5. import typing as t
  6. from xml.etree import ElementTree as ET
  7. import pytest
  8. from _pytest.config import ExitCode
  9. from _pytest.main import Session
  10. from _pytest.python import Function
  11. from _pytest.runner import CallInfo
  12. from pytest_embedded import Dut
  13. from pytest_embedded.plugin import parse_multi_dut_args
  14. from pytest_embedded.utils import find_by_suffix, to_list
  15. from pytest_ignore_test_results.ignore_results import ChildCase, ChildCasesStashKey
  16. from .constants import DEFAULT_SDKCONFIG, PREVIEW_TARGETS, SUPPORTED_TARGETS, PytestApp, PytestCase
  17. from .utils import format_case_id, merge_junit_files
  18. IDF_PYTEST_EMBEDDED_KEY = pytest.StashKey['IdfPytestEmbedded']()
  19. ITEM_FAILED_CASES_KEY = pytest.StashKey[list]()
  20. ITEM_FAILED_KEY = pytest.StashKey[bool]()
  21. class IdfPytestEmbedded:
  22. UNITY_RESULT_MAPPINGS = {
  23. 'PASS': 'passed',
  24. 'FAIL': 'failed',
  25. 'IGNORE': 'skipped',
  26. }
  27. def __init__(
  28. self,
  29. target: str,
  30. sdkconfig: t.Optional[str] = None,
  31. apps_list: t.Optional[t.List[str]] = None,
  32. ):
  33. # CLI options to filter the test cases
  34. self.target = target.lower()
  35. self.sdkconfig = sdkconfig
  36. self.apps_list = apps_list
  37. self.cases: t.List[PytestCase] = []
  38. @staticmethod
  39. def get_param(item: Function, key: str, default: t.Any = None) -> t.Any:
  40. # implement like this since this is a limitation of pytest, couldn't get fixture values while collecting
  41. # https://github.com/pytest-dev/pytest/discussions/9689
  42. if not hasattr(item, 'callspec'):
  43. return default
  44. return item.callspec.params.get(key, default) or default
  45. def item_to_pytest_case(self, item: Function) -> PytestCase:
  46. count = 1
  47. case_path = str(item.path)
  48. case_name = item.originalname
  49. target = self.target
  50. # funcargs is not calculated while collection
  51. if hasattr(item, 'callspec'):
  52. count = item.callspec.params.get('count', 1)
  53. app_paths = to_list(
  54. parse_multi_dut_args(
  55. count,
  56. self.get_param(item, 'app_path', os.path.dirname(case_path)),
  57. )
  58. )
  59. configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', 'default')))
  60. targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', target)))
  61. else:
  62. app_paths = [os.path.dirname(case_path)]
  63. configs = ['default']
  64. targets = [target]
  65. case_apps = set()
  66. for i in range(count):
  67. case_apps.add(PytestApp(app_paths[i], targets[i], configs[i]))
  68. return PytestCase(
  69. case_path,
  70. case_name,
  71. case_apps,
  72. self.target,
  73. item,
  74. )
  75. @pytest.hookimpl(tryfirst=True)
  76. def pytest_sessionstart(self, session: Session) -> None:
  77. # same behavior for vanilla pytest-embedded '--target'
  78. session.config.option.target = self.target
  79. @pytest.hookimpl(tryfirst=True)
  80. def pytest_collection_modifyitems(self, items: t.List[Function]) -> None:
  81. item_to_case: t.Dict[Function, PytestCase] = {}
  82. # Add Markers to the test cases
  83. for item in items:
  84. # generate PytestCase for each item
  85. case = self.item_to_pytest_case(item)
  86. item_to_case[item] = case
  87. # set default timeout 10 minutes for each case
  88. if 'timeout' not in item.keywords:
  89. item.add_marker(pytest.mark.timeout(10 * 60))
  90. # add markers for special markers
  91. if 'supported_targets' in item.keywords:
  92. for _target in SUPPORTED_TARGETS:
  93. item.add_marker(_target)
  94. if 'preview_targets' in item.keywords:
  95. for _target in PREVIEW_TARGETS:
  96. item.add_marker(_target)
  97. if 'all_targets' in item.keywords:
  98. for _target in [*SUPPORTED_TARGETS, *PREVIEW_TARGETS]:
  99. item.add_marker(_target)
  100. # add 'xtal_40mhz' tag as a default tag for esp32c2 target
  101. # only add this marker for esp32c2 cases
  102. if self.target == 'esp32c2' and 'esp32c2' in case.target_markers and 'xtal_26mhz' not in case.all_markers:
  103. item.add_marker('xtal_40mhz')
  104. # Filter the test cases
  105. filtered_items = []
  106. for item in items:
  107. case = item_to_case[item]
  108. # filter by "nightly_run" marker
  109. if os.getenv('INCLUDE_NIGHTLY_RUN') == '1':
  110. # Do not filter nightly_run cases
  111. pass
  112. elif os.getenv('NIGHTLY_RUN') == '1':
  113. if not case.is_nightly_run:
  114. logging.debug(
  115. 'Skipping test case %s because of this test case is not a nightly run test case', item.name
  116. )
  117. continue
  118. else:
  119. if case.is_nightly_run:
  120. logging.debug(
  121. 'Skipping test case %s because of this test case is a nightly run test case', item.name
  122. )
  123. continue
  124. # filter by target
  125. if self.target not in case.target_markers:
  126. continue
  127. if self.target in case.skipped_targets:
  128. continue
  129. # filter by sdkconfig
  130. if self.sdkconfig:
  131. if self.get_param(item, 'config', DEFAULT_SDKCONFIG) != self.sdkconfig:
  132. continue
  133. # filter by apps_list, skip the test case if not listed
  134. # should only be used in CI
  135. if self.apps_list is not None:
  136. bin_not_found = False
  137. for case_app in case.apps:
  138. # in ci, always use build_<target>_<config> as build dir
  139. binary_path = os.path.join(case_app.path, f'build_{case_app.target}_{case_app.config}')
  140. if binary_path not in self.apps_list:
  141. logging.info(
  142. 'Skipping test case %s because binary path %s is not listed in app info list files',
  143. item.name,
  144. binary_path,
  145. )
  146. bin_not_found = True
  147. break
  148. if bin_not_found:
  149. continue
  150. # finally!
  151. filtered_items.append(item)
  152. # sort the test cases with (app folder, config)
  153. items[:] = sorted(
  154. filtered_items,
  155. key=lambda x: (os.path.dirname(x.path), self.get_param(x, 'config', DEFAULT_SDKCONFIG))
  156. )
  157. def pytest_report_collectionfinish(self, items: t.List[Function]) -> None:
  158. for item in items:
  159. self.cases.append(self.item_to_pytest_case(item))
  160. def pytest_custom_test_case_name(self, item: Function) -> str:
  161. return item.funcargs.get('test_case_name', item.nodeid) # type: ignore
  162. def pytest_runtest_makereport(self, item: Function, call: CallInfo[None]) -> None:
  163. if call.when == 'call':
  164. target = item.funcargs['target']
  165. config = item.funcargs['config']
  166. is_qemu = item.get_closest_marker('qemu') is not None
  167. dut: t.Union[Dut, t.Tuple[Dut]] = item.funcargs['dut'] # type: ignore
  168. if isinstance(dut, (list, tuple)):
  169. res = []
  170. for i, _dut in enumerate(dut):
  171. res.extend(
  172. [
  173. ChildCase(
  174. format_case_id(target, config, case.name + f' {i}', is_qemu=is_qemu),
  175. self.UNITY_RESULT_MAPPINGS[case.result],
  176. )
  177. for case in _dut.testsuite.testcases
  178. ]
  179. )
  180. item.config.stash[ChildCasesStashKey] = {item.nodeid: res}
  181. else:
  182. item.config.stash[ChildCasesStashKey] = {
  183. item.nodeid: [
  184. ChildCase(
  185. format_case_id(target, config, case.name, is_qemu=is_qemu),
  186. self.UNITY_RESULT_MAPPINGS[case.result],
  187. )
  188. for case in dut.testsuite.testcases
  189. ]
  190. }
  191. @pytest.hookimpl(trylast=True)
  192. def pytest_runtest_teardown(self, item: Function) -> None:
  193. """
  194. Modify the junit reports. Format the unity c test case names.
  195. """
  196. tempdir: t.Optional[str] = item.funcargs.get('test_case_tempdir') # type: ignore
  197. if not tempdir:
  198. return
  199. junits = find_by_suffix('.xml', tempdir)
  200. if not junits:
  201. return
  202. if len(junits) > 1:
  203. merge_junit_files(junits, os.path.join(tempdir, 'dut.xml'))
  204. junits = [os.path.join(tempdir, 'dut.xml')]
  205. # unity cases
  206. is_qemu = item.get_closest_marker('qemu') is not None
  207. target = item.funcargs['target']
  208. config = item.funcargs['config']
  209. for junit in junits:
  210. xml = ET.parse(junit)
  211. testcases = xml.findall('.//testcase')
  212. for case in testcases:
  213. # modify the junit files
  214. new_case_name = format_case_id(target, config, case.attrib['name'], is_qemu=is_qemu)
  215. case.attrib['name'] = new_case_name
  216. if 'file' in case.attrib:
  217. case.attrib['file'] = case.attrib['file'].replace('/IDF/', '') # our unity test framework
  218. xml.write(junit)
  219. def pytest_sessionfinish(self, session: Session, exitstatus: int) -> None:
  220. if exitstatus != 0:
  221. if exitstatus == ExitCode.NO_TESTS_COLLECTED:
  222. session.exitstatus = 0