idf_ci_utils.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # internal use only for CI
  2. # some CI related util functions
  3. #
  4. # SPDX-FileCopyrightText: 2020-2022 Espressif Systems (Shanghai) CO LTD
  5. # SPDX-License-Identifier: Apache-2.0
  6. #
  7. import io
  8. import logging
  9. import os
  10. import subprocess
  11. import sys
  12. from contextlib import redirect_stdout
  13. from dataclasses import dataclass
  14. from typing import TYPE_CHECKING, Any, List, Optional, Set
  15. if TYPE_CHECKING:
  16. from _pytest.python import Function
  17. IDF_PATH = os.path.abspath(
  18. os.getenv('IDF_PATH', os.path.join(os.path.dirname(__file__), '..', '..'))
  19. )
  20. def get_submodule_dirs(full_path: bool = False) -> List:
  21. """
  22. To avoid issue could be introduced by multi-os or additional dependency,
  23. we use python and git to get this output
  24. :return: List of submodule dirs
  25. """
  26. dirs = []
  27. try:
  28. lines = (
  29. subprocess.check_output(
  30. [
  31. 'git',
  32. 'config',
  33. '--file',
  34. os.path.realpath(os.path.join(IDF_PATH, '.gitmodules')),
  35. '--get-regexp',
  36. 'path',
  37. ]
  38. )
  39. .decode('utf8')
  40. .strip()
  41. .split('\n')
  42. )
  43. for line in lines:
  44. _, path = line.split(' ')
  45. if full_path:
  46. dirs.append(os.path.join(IDF_PATH, path))
  47. else:
  48. dirs.append(path)
  49. except Exception as e: # pylint: disable=W0703
  50. logging.warning(str(e))
  51. return dirs
  52. def _check_git_filemode(full_path): # type: (str) -> bool
  53. try:
  54. stdout = (
  55. subprocess.check_output(['git', 'ls-files', '--stage', full_path])
  56. .strip()
  57. .decode('utf-8')
  58. )
  59. except subprocess.CalledProcessError:
  60. return True
  61. mode = stdout.split(' ', 1)[0] # e.g. 100644 for a rw-r--r--
  62. if any([int(i, 8) & 1 for i in mode[-3:]]):
  63. return True
  64. return False
  65. def is_executable(full_path: str) -> bool:
  66. """
  67. os.X_OK will always return true on windows. Use git to check file mode.
  68. :param full_path: file full path
  69. :return: True is it's an executable file
  70. """
  71. if sys.platform == 'win32':
  72. return _check_git_filemode(full_path)
  73. return os.access(full_path, os.X_OK)
  74. def get_git_files(path: str = IDF_PATH, full_path: bool = False) -> List[str]:
  75. """
  76. Get the result of git ls-files
  77. :param path: path to run git ls-files
  78. :param full_path: return full path if set to True
  79. :return: list of file paths
  80. """
  81. try:
  82. # this is a workaround when using under worktree
  83. # if you're using worktree, when running git commit a new environment variable GIT_DIR would be declared,
  84. # the value should be <origin_repo_path>/.git/worktrees/<worktree name>
  85. # This would effect the return value of `git ls-files`, unset this would use the `cwd`value or its parent
  86. # folder if no `.git` folder found in `cwd`.
  87. workaround_env = os.environ.copy()
  88. workaround_env.pop('GIT_DIR', None)
  89. files = (
  90. subprocess.check_output(['git', 'ls-files'], cwd=path, env=workaround_env)
  91. .decode('utf8')
  92. .strip()
  93. .split('\n')
  94. )
  95. except Exception as e: # pylint: disable=W0703
  96. logging.warning(str(e))
  97. files = []
  98. return [os.path.join(path, f) for f in files] if full_path else files
  99. def is_in_directory(file_path: str, folder: str) -> bool:
  100. return os.path.realpath(file_path).startswith(os.path.realpath(folder) + os.sep)
  101. def to_list(s: Any) -> List[Any]:
  102. if isinstance(s, (set, tuple)):
  103. return list(s)
  104. if isinstance(s, list):
  105. return s
  106. return [s]
  107. @dataclass
  108. class PytestApp:
  109. path: str
  110. target: str
  111. config: str
  112. def __hash__(self) -> int:
  113. return hash((self.path, self.target, self.config))
  114. @dataclass
  115. class PytestCase:
  116. path: str
  117. name: str
  118. apps: Set[PytestApp]
  119. def __hash__(self) -> int:
  120. return hash((self.path, self.name, self.apps))
  121. class PytestCollectPlugin:
  122. def __init__(self, target: str) -> None:
  123. self.target = target
  124. self.cases: List[PytestCase] = []
  125. @staticmethod
  126. def get_param(item: 'Function', key: str, default: Any = None) -> Any:
  127. if not hasattr(item, 'callspec'):
  128. raise ValueError(f'Function {item} does not have params')
  129. return item.callspec.params.get(key, default) or default
  130. def pytest_report_collectionfinish(self, items: List['Function']) -> None:
  131. from pytest_embedded.plugin import parse_multi_dut_args
  132. for item in items:
  133. count = 1
  134. case_path = str(item.path)
  135. case_name = item.originalname
  136. target = self.target
  137. # funcargs is not calculated while collection
  138. if hasattr(item, 'callspec'):
  139. count = item.callspec.params.get('count', 1)
  140. app_paths = to_list(
  141. parse_multi_dut_args(
  142. count,
  143. self.get_param(item, 'app_path', os.path.dirname(case_path)),
  144. )
  145. )
  146. configs = to_list(
  147. parse_multi_dut_args(
  148. count, self.get_param(item, 'config', 'default')
  149. )
  150. )
  151. targets = to_list(
  152. parse_multi_dut_args(count, self.get_param(item, 'target', target))
  153. )
  154. else:
  155. app_paths = [os.path.dirname(case_path)]
  156. configs = ['default']
  157. targets = [target]
  158. case_apps = set()
  159. for i in range(count):
  160. case_apps.add(PytestApp(app_paths[i], targets[i], configs[i]))
  161. self.cases.append(PytestCase(case_path, case_name, case_apps))
  162. def get_pytest_cases(
  163. folder: str, target: str, marker_expr: Optional[str] = None
  164. ) -> List[PytestCase]:
  165. import pytest
  166. from _pytest.config import ExitCode
  167. collector = PytestCollectPlugin(target)
  168. if marker_expr:
  169. marker_expr = f'{target} and ({marker_expr})'
  170. else:
  171. marker_expr = target # target is also a marker
  172. with io.StringIO() as buf:
  173. with redirect_stdout(buf):
  174. res = pytest.main(
  175. ['--collect-only', folder, '-q', '-m', marker_expr], plugins=[collector]
  176. )
  177. if res.value != ExitCode.OK:
  178. if res.value == ExitCode.NO_TESTS_COLLECTED:
  179. print(
  180. f'WARNING: no pytest app found for target {target} under folder {folder}'
  181. )
  182. else:
  183. print(buf.getvalue())
  184. raise RuntimeError('pytest collection failed')
  185. return collector.cases
  186. def get_pytest_app_paths(
  187. folder: str, target: str, marker_expr: Optional[str] = None
  188. ) -> Set[str]:
  189. cases = get_pytest_cases(folder, target, marker_expr)
  190. return set({app.path for case in cases for app in case.apps})