IDFAssignTest.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. """
  4. Command line tool to assign tests to CI test jobs.
  5. """
  6. import argparse
  7. import json
  8. import os
  9. import re
  10. import sys
  11. from copy import deepcopy
  12. import yaml
  13. try:
  14. from yaml import CLoader as Loader
  15. except ImportError:
  16. from yaml import Loader as Loader # type: ignore
  17. import gitlab_api
  18. from tiny_test_fw.Utility import CIAssignTest
  19. try:
  20. from idf_py_actions.constants import PREVIEW_TARGETS, SUPPORTED_TARGETS
  21. except ImportError:
  22. sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
  23. from idf_py_actions.constants import PREVIEW_TARGETS, SUPPORTED_TARGETS
  24. IDF_PATH_FROM_ENV = os.getenv('IDF_PATH', '')
  25. class IDFCaseGroup(CIAssignTest.Group):
  26. BUILD_JOB_NAMES = None
  27. @classmethod
  28. def get_artifact_index_file(cls):
  29. if IDF_PATH_FROM_ENV:
  30. artifact_index_file = os.path.join(IDF_PATH_FROM_ENV, 'artifact_index.json')
  31. else:
  32. artifact_index_file = 'artifact_index.json'
  33. return artifact_index_file
  34. class IDFAssignTest(CIAssignTest.AssignTest):
  35. DEFAULT_FILTER = {
  36. 'category': 'function',
  37. 'ignore': False,
  38. 'supported_in_ci': True,
  39. 'nightly_run': False,
  40. }
  41. def __init__(self, test_case_path, ci_config_file, case_group=IDFCaseGroup):
  42. super(IDFAssignTest, self).__init__(test_case_path, ci_config_file, case_group)
  43. def format_build_log_path(self, parallel_num):
  44. return 'list_job_{}.json'.format(parallel_num)
  45. def create_artifact_index_file(self, project_id=None, pipeline_id=None):
  46. if project_id is None:
  47. project_id = os.getenv('CI_PROJECT_ID')
  48. if pipeline_id is None:
  49. pipeline_id = os.getenv('CI_PIPELINE_ID')
  50. gitlab_inst = gitlab_api.Gitlab(project_id)
  51. artifact_index_list = []
  52. for build_job_name in self.case_group.BUILD_JOB_NAMES:
  53. job_info_list = gitlab_inst.find_job_id(build_job_name, pipeline_id=pipeline_id)
  54. for job_info in job_info_list:
  55. parallel_num = job_info['parallel_num'] or 1 # Could be None if "parallel_num" not defined for the job
  56. raw_data = gitlab_inst.download_artifact(job_info['id'],
  57. [self.format_build_log_path(parallel_num)])[0]
  58. build_info_list = [json.loads(line) for line in raw_data.decode().splitlines()]
  59. for build_info in build_info_list:
  60. build_info['ci_job_id'] = job_info['id']
  61. artifact_index_list.append(build_info)
  62. artifact_index_file = self.case_group.get_artifact_index_file()
  63. with open(artifact_index_file, 'w') as f:
  64. json.dump(artifact_index_list, f)
  65. def search_cases(self, case_filter=None):
  66. _filter = deepcopy(case_filter) if case_filter else {}
  67. if 'NIGHTLY_RUN' in os.environ or 'BOT_LABEL_NIGHTLY_RUN' in os.environ:
  68. _filter.update({'nightly_run': True})
  69. return super().search_cases(_filter)
  70. class ExampleGroup(IDFCaseGroup):
  71. SORT_KEYS = CI_JOB_MATCH_KEYS = ['env_tag', 'target']
  72. EXAMPLE_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
  73. BUILD_JOB_NAMES = ['build_examples_cmake_{}'.format(target) for target in EXAMPLE_TARGETS] # type: ignore
  74. class TestAppsGroup(ExampleGroup):
  75. TEST_APP_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
  76. BUILD_JOB_NAMES = ['build_test_apps_{}'.format(target) for target in TEST_APP_TARGETS] # type: ignore
  77. class ComponentUTGroup(TestAppsGroup):
  78. UNIT_TEST_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
  79. BUILD_JOB_NAMES = ['build_component_ut_{}'.format(target) for target in UNIT_TEST_TARGETS] # type: ignore
  80. class UnitTestGroup(IDFCaseGroup):
  81. SORT_KEYS = ['test environment', 'tags', 'chip_target']
  82. CI_JOB_MATCH_KEYS = ['test environment']
  83. UNIT_TEST_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
  84. BUILD_JOB_NAMES = ['build_esp_idf_tests_cmake_{}'.format(target) for target in UNIT_TEST_TARGETS] # type: ignore
  85. MAX_CASE = 50
  86. ATTR_CONVERT_TABLE = {
  87. 'execution_time': 'execution time'
  88. }
  89. DUT_CLS_NAME = {
  90. 'esp32': 'ESP32DUT',
  91. 'esp32s2': 'ESP32S2DUT',
  92. 'esp32s3': 'ESP32S3DUT',
  93. 'esp32c2': 'ESP32C2DUT',
  94. 'esp32c3': 'ESP32C3DUT',
  95. 'esp8266': 'ESP8266DUT',
  96. }
  97. def __init__(self, case):
  98. super(UnitTestGroup, self).__init__(case)
  99. for tag in self._get_case_attr(case, 'tags'):
  100. self.ci_job_match_keys.add(tag)
  101. @staticmethod
  102. def _get_case_attr(case, attr):
  103. if attr in UnitTestGroup.ATTR_CONVERT_TABLE:
  104. attr = UnitTestGroup.ATTR_CONVERT_TABLE[attr]
  105. return case[attr]
  106. def add_extra_case(self, case):
  107. """ If current group contains all tags required by case, then add succeed """
  108. added = False
  109. if self.accept_new_case():
  110. for key in self.filters:
  111. if self._get_case_attr(case, key) != self.filters[key]:
  112. if key == 'tags':
  113. if set(self._get_case_attr(case, key)).issubset(set(self.filters[key])):
  114. continue
  115. break
  116. else:
  117. self.case_list.append(case)
  118. added = True
  119. return added
  120. def _create_extra_data(self, test_cases, test_function):
  121. """
  122. For unit test case, we need to copy some attributes of test cases into config file.
  123. So unit test function knows how to run the case.
  124. """
  125. case_data = []
  126. for case in test_cases:
  127. one_case_data = {
  128. 'config': self._get_case_attr(case, 'config'),
  129. 'name': self._get_case_attr(case, 'summary'),
  130. 'reset': self._get_case_attr(case, 'reset'),
  131. 'timeout': self._get_case_attr(case, 'timeout'),
  132. }
  133. if test_function in ['run_multiple_devices_cases', 'run_multiple_stage_cases']:
  134. try:
  135. one_case_data['child case num'] = self._get_case_attr(case, 'child case num')
  136. except KeyError as e:
  137. print('multiple devices/stages cases must contains at least two test functions')
  138. print('case name: {}'.format(one_case_data['name']))
  139. raise e
  140. case_data.append(one_case_data)
  141. return case_data
  142. def _divide_case_by_test_function(self):
  143. """
  144. divide cases of current test group by test function they need to use
  145. :return: dict of list of cases for each test functions
  146. """
  147. case_by_test_function = {
  148. 'run_multiple_devices_cases': [],
  149. 'run_multiple_stage_cases': [],
  150. 'run_unit_test_cases': [],
  151. }
  152. for case in self.case_list:
  153. if case['multi_device'] == 'Yes':
  154. case_by_test_function['run_multiple_devices_cases'].append(case)
  155. elif case['multi_stage'] == 'Yes':
  156. case_by_test_function['run_multiple_stage_cases'].append(case)
  157. else:
  158. case_by_test_function['run_unit_test_cases'].append(case)
  159. return case_by_test_function
  160. def output(self):
  161. """
  162. output data for job configs
  163. :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
  164. """
  165. target = self._get_case_attr(self.case_list[0], 'chip_target')
  166. if target:
  167. overwrite = {
  168. 'dut': {
  169. 'package': 'ttfw_idf',
  170. 'class': self.DUT_CLS_NAME[target],
  171. }
  172. }
  173. else:
  174. overwrite = dict()
  175. case_by_test_function = self._divide_case_by_test_function()
  176. output_data = {
  177. # we don't need filter for test function, as UT uses a few test functions for all cases
  178. 'CaseConfig': [
  179. {
  180. 'name': test_function,
  181. 'extra_data': self._create_extra_data(test_cases, test_function),
  182. 'overwrite': overwrite,
  183. } for test_function, test_cases in case_by_test_function.items() if test_cases
  184. ],
  185. }
  186. return output_data
  187. class ExampleAssignTest(IDFAssignTest):
  188. CI_TEST_JOB_PATTERN = re.compile(r'^example_test_.+')
  189. def __init__(self, test_case_path, ci_config_file):
  190. super(ExampleAssignTest, self).__init__(test_case_path, ci_config_file, case_group=ExampleGroup)
  191. class TestAppsAssignTest(IDFAssignTest):
  192. CI_TEST_JOB_PATTERN = re.compile(r'^test_app_test_.+')
  193. def __init__(self, test_case_path, ci_config_file):
  194. super(TestAppsAssignTest, self).__init__(test_case_path, ci_config_file, case_group=TestAppsGroup)
  195. class ComponentUTAssignTest(IDFAssignTest):
  196. CI_TEST_JOB_PATTERN = re.compile(r'^component_ut_test_.+')
  197. def __init__(self, test_case_path, ci_config_file):
  198. super(ComponentUTAssignTest, self).__init__(test_case_path, ci_config_file, case_group=ComponentUTGroup)
  199. class UnitTestAssignTest(IDFAssignTest):
  200. CI_TEST_JOB_PATTERN = re.compile(r'^UT_.+')
  201. def __init__(self, test_case_path, ci_config_file):
  202. super(UnitTestAssignTest, self).__init__(test_case_path, ci_config_file, case_group=UnitTestGroup)
  203. def search_cases(self, case_filter=None):
  204. """
  205. For unit test case, we don't search for test functions.
  206. The unit test cases is stored in a yaml file which is created in job build-idf-test.
  207. """
  208. def find_by_suffix(suffix, path):
  209. res = []
  210. for root, _, files in os.walk(path):
  211. for file in files:
  212. if file.endswith(suffix):
  213. res.append(os.path.join(root, file))
  214. return res
  215. def get_test_cases_from_yml(yml_file):
  216. try:
  217. with open(yml_file) as fr:
  218. raw_data = yaml.load(fr, Loader=Loader)
  219. test_cases = raw_data['test cases']
  220. except (IOError, KeyError):
  221. return []
  222. else:
  223. return test_cases
  224. test_cases = []
  225. for path in self.test_case_paths:
  226. if os.path.isdir(path):
  227. for yml_file in find_by_suffix('.yml', path):
  228. test_cases.extend(get_test_cases_from_yml(yml_file))
  229. elif os.path.isfile(path) and path.endswith('.yml'):
  230. test_cases.extend(get_test_cases_from_yml(path))
  231. else:
  232. print('Test case path is invalid. Should only happen when use @bot to skip unit test.')
  233. # filter keys are lower case. Do map lower case keys with original keys.
  234. try:
  235. key_mapping = {x.lower(): x for x in test_cases[0].keys()}
  236. except IndexError:
  237. key_mapping = dict()
  238. if case_filter:
  239. for key in case_filter:
  240. filtered_cases = []
  241. for case in test_cases:
  242. try:
  243. mapped_key = key_mapping[key]
  244. # bot converts string to lower case
  245. if isinstance(case[mapped_key], str):
  246. _value = case[mapped_key].lower()
  247. else:
  248. _value = case[mapped_key]
  249. if _value in case_filter[key]:
  250. filtered_cases.append(case)
  251. except KeyError:
  252. # case don't have this key, regard as filter success
  253. filtered_cases.append(case)
  254. test_cases = filtered_cases
  255. # sort cases with configs and test functions
  256. # in later stage cases with similar attributes are more likely to be assigned to the same job
  257. # it will reduce the count of flash DUT operations
  258. test_cases.sort(key=lambda x: x['config'] + x['multi_stage'] + x['multi_device'])
  259. return test_cases
  260. if __name__ == '__main__':
  261. parser = argparse.ArgumentParser()
  262. parser.add_argument('case_group', choices=['example_test', 'custom_test', 'unit_test', 'component_ut'])
  263. parser.add_argument('test_case_paths', nargs='+', help='test case folder or file')
  264. parser.add_argument('-c', '--config', default=os.path.join(IDF_PATH_FROM_ENV, '.gitlab', 'ci', 'target-test.yml'),
  265. help='gitlab ci config file')
  266. parser.add_argument('-o', '--output', help='output path of config files')
  267. parser.add_argument('--pipeline_id', '-p', type=int, default=None, help='pipeline_id')
  268. parser.add_argument('--test-case-file-pattern', help='file name pattern used to find Python test case files')
  269. args = parser.parse_args()
  270. SUPPORTED_TARGETS.extend(PREVIEW_TARGETS)
  271. test_case_paths = [os.path.join(IDF_PATH_FROM_ENV, path) if not os.path.isabs(path) else path for path in
  272. args.test_case_paths] # type: ignore
  273. args_list = [test_case_paths, args.config]
  274. if args.case_group == 'example_test':
  275. assigner = ExampleAssignTest(*args_list)
  276. elif args.case_group == 'custom_test':
  277. assigner = TestAppsAssignTest(*args_list)
  278. elif args.case_group == 'unit_test':
  279. assigner = UnitTestAssignTest(*args_list)
  280. elif args.case_group == 'component_ut':
  281. assigner = ComponentUTAssignTest(*args_list)
  282. else:
  283. raise SystemExit(1) # which is impossible
  284. if args.test_case_file_pattern:
  285. assigner.CI_TEST_JOB_PATTERN = re.compile(r'{}'.format(args.test_case_file_pattern))
  286. assigner.assign_cases()
  287. assigner.output_configs(args.output)
  288. assigner.create_artifact_index_file()