| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- """
- Command line tool to assign tests to CI test jobs.
- """
- import argparse
- import errno
- import json
- import os
- import re
- import yaml
- try:
- from yaml import CLoader as Loader
- except ImportError:
- from yaml import Loader as Loader # type: ignore
- import gitlab_api
- from tiny_test_fw.Utility import CIAssignTest
- try:
- from idf_py_actions.constants import PREVIEW_TARGETS, SUPPORTED_TARGETS
- except ImportError:
- SUPPORTED_TARGETS = []
- PREVIEW_TARGETS = []
- IDF_PATH_FROM_ENV = os.getenv('IDF_PATH')
- class IDFCaseGroup(CIAssignTest.Group):
- LOCAL_BUILD_DIR = None
- BUILD_JOB_NAMES = None
- @classmethod
- def get_artifact_index_file(cls):
- assert cls.LOCAL_BUILD_DIR
- if IDF_PATH_FROM_ENV:
- artifact_index_file = os.path.join(IDF_PATH_FROM_ENV, cls.LOCAL_BUILD_DIR, 'artifact_index.json')
- else:
- artifact_index_file = 'artifact_index.json'
- return artifact_index_file
- class IDFAssignTest(CIAssignTest.AssignTest):
- def __init__(self, test_case_path, ci_config_file, case_group=IDFCaseGroup):
- super(IDFAssignTest, self).__init__(test_case_path, ci_config_file, case_group)
- def format_build_log_path(self, parallel_num):
- return '{}/list_job_{}.json'.format(self.case_group.LOCAL_BUILD_DIR, parallel_num)
- def create_artifact_index_file(self, project_id=None, pipeline_id=None):
- if project_id is None:
- project_id = os.getenv('CI_PROJECT_ID')
- if pipeline_id is None:
- pipeline_id = os.getenv('CI_PIPELINE_ID')
- gitlab_inst = gitlab_api.Gitlab(project_id)
- artifact_index_list = []
- for build_job_name in self.case_group.BUILD_JOB_NAMES:
- job_info_list = gitlab_inst.find_job_id(build_job_name, pipeline_id=pipeline_id)
- for job_info in job_info_list:
- parallel_num = job_info['parallel_num'] or 1 # Could be None if "parallel_num" not defined for the job
- raw_data = gitlab_inst.download_artifact(job_info['id'],
- [self.format_build_log_path(parallel_num)])[0]
- build_info_list = [json.loads(line) for line in raw_data.decode().splitlines()]
- for build_info in build_info_list:
- build_info['ci_job_id'] = job_info['id']
- artifact_index_list.append(build_info)
- artifact_index_file = self.case_group.get_artifact_index_file()
- try:
- os.makedirs(os.path.dirname(artifact_index_file))
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise e
- with open(artifact_index_file, 'w') as f:
- json.dump(artifact_index_list, f)
- class ExampleGroup(IDFCaseGroup):
- SORT_KEYS = CI_JOB_MATCH_KEYS = ['env_tag', 'target']
- LOCAL_BUILD_DIR = 'build_examples' # type: ignore
- EXAMPLE_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
- BUILD_JOB_NAMES = ['build_examples_cmake_{}'.format(target) for target in EXAMPLE_TARGETS] # type: ignore
- class TestAppsGroup(ExampleGroup):
- LOCAL_BUILD_DIR = 'build_test_apps'
- TEST_APP_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
- BUILD_JOB_NAMES = ['build_test_apps_{}'.format(target) for target in TEST_APP_TARGETS] # type: ignore
- class ComponentUTGroup(TestAppsGroup):
- LOCAL_BUILD_DIR = 'build_component_ut'
- UNIT_TEST_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
- BUILD_JOB_NAMES = ['build_component_ut_{}'.format(target) for target in UNIT_TEST_TARGETS] # type: ignore
- class UnitTestGroup(IDFCaseGroup):
- SORT_KEYS = ['test environment', 'tags', 'chip_target']
- CI_JOB_MATCH_KEYS = ['test environment']
- LOCAL_BUILD_DIR = 'tools/unit-test-app/builds' # type: ignore
- UNIT_TEST_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS
- BUILD_JOB_NAMES = ['build_esp_idf_tests_cmake_{}'.format(target) for target in UNIT_TEST_TARGETS] # type: ignore
- MAX_CASE = 50
- ATTR_CONVERT_TABLE = {
- 'execution_time': 'execution time'
- }
- DUT_CLS_NAME = {
- 'esp32': 'ESP32DUT',
- 'esp32s2': 'ESP32S2DUT',
- 'esp32s3': 'ESP32S3DUT',
- 'esp32c3': 'ESP32C3DUT',
- 'esp8266': 'ESP8266DUT',
- }
- def __init__(self, case):
- super(UnitTestGroup, self).__init__(case)
- for tag in self._get_case_attr(case, 'tags'):
- self.ci_job_match_keys.add(tag)
- @staticmethod
- def _get_case_attr(case, attr):
- if attr in UnitTestGroup.ATTR_CONVERT_TABLE:
- attr = UnitTestGroup.ATTR_CONVERT_TABLE[attr]
- return case[attr]
- def add_extra_case(self, case):
- """ If current group contains all tags required by case, then add succeed """
- added = False
- if self.accept_new_case():
- for key in self.filters:
- if self._get_case_attr(case, key) != self.filters[key]:
- if key == 'tags':
- if set(self._get_case_attr(case, key)).issubset(set(self.filters[key])):
- continue
- break
- else:
- self.case_list.append(case)
- added = True
- return added
- def _create_extra_data(self, test_cases, test_function):
- """
- For unit test case, we need to copy some attributes of test cases into config file.
- So unit test function knows how to run the case.
- """
- case_data = []
- for case in test_cases:
- one_case_data = {
- 'config': self._get_case_attr(case, 'config'),
- 'name': self._get_case_attr(case, 'summary'),
- 'reset': self._get_case_attr(case, 'reset'),
- 'timeout': self._get_case_attr(case, 'timeout'),
- }
- if test_function in ['run_multiple_devices_cases', 'run_multiple_stage_cases']:
- try:
- one_case_data['child case num'] = self._get_case_attr(case, 'child case num')
- except KeyError as e:
- print('multiple devices/stages cases must contains at least two test functions')
- print('case name: {}'.format(one_case_data['name']))
- raise e
- case_data.append(one_case_data)
- return case_data
- def _divide_case_by_test_function(self):
- """
- divide cases of current test group by test function they need to use
- :return: dict of list of cases for each test functions
- """
- case_by_test_function = {
- 'run_multiple_devices_cases': [],
- 'run_multiple_stage_cases': [],
- 'run_unit_test_cases': [],
- }
- for case in self.case_list:
- if case['multi_device'] == 'Yes':
- case_by_test_function['run_multiple_devices_cases'].append(case)
- elif case['multi_stage'] == 'Yes':
- case_by_test_function['run_multiple_stage_cases'].append(case)
- else:
- case_by_test_function['run_unit_test_cases'].append(case)
- return case_by_test_function
- def output(self):
- """
- output data for job configs
- :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
- """
- target = self._get_case_attr(self.case_list[0], 'chip_target')
- if target:
- overwrite = {
- 'dut': {
- 'package': 'ttfw_idf',
- 'class': self.DUT_CLS_NAME[target],
- }
- }
- else:
- overwrite = dict()
- case_by_test_function = self._divide_case_by_test_function()
- output_data = {
- # we don't need filter for test function, as UT uses a few test functions for all cases
- 'CaseConfig': [
- {
- 'name': test_function,
- 'extra_data': self._create_extra_data(test_cases, test_function),
- 'overwrite': overwrite,
- } for test_function, test_cases in case_by_test_function.items() if test_cases
- ],
- }
- return output_data
- class ExampleAssignTest(IDFAssignTest):
- CI_TEST_JOB_PATTERN = re.compile(r'^example_test_.+')
- def __init__(self, test_case_path, ci_config_file):
- super(ExampleAssignTest, self).__init__(test_case_path, ci_config_file, case_group=ExampleGroup)
- class TestAppsAssignTest(IDFAssignTest):
- CI_TEST_JOB_PATTERN = re.compile(r'^test_app_test_.+')
- def __init__(self, test_case_path, ci_config_file):
- super(TestAppsAssignTest, self).__init__(test_case_path, ci_config_file, case_group=TestAppsGroup)
- class ComponentUTAssignTest(IDFAssignTest):
- CI_TEST_JOB_PATTERN = re.compile(r'^component_ut_test_.+')
- def __init__(self, test_case_path, ci_config_file):
- super(ComponentUTAssignTest, self).__init__(test_case_path, ci_config_file, case_group=ComponentUTGroup)
- class UnitTestAssignTest(IDFAssignTest):
- CI_TEST_JOB_PATTERN = re.compile(r'^UT_.+')
- def __init__(self, test_case_path, ci_config_file):
- super(UnitTestAssignTest, self).__init__(test_case_path, ci_config_file, case_group=UnitTestGroup)
- def search_cases(self, case_filter=None):
- """
- For unit test case, we don't search for test functions.
- The unit test cases is stored in a yaml file which is created in job build-idf-test.
- """
- def find_by_suffix(suffix, path):
- res = []
- for root, _, files in os.walk(path):
- for file in files:
- if file.endswith(suffix):
- res.append(os.path.join(root, file))
- return res
- def get_test_cases_from_yml(yml_file):
- try:
- with open(yml_file) as fr:
- raw_data = yaml.load(fr, Loader=Loader)
- test_cases = raw_data['test cases']
- except (IOError, KeyError):
- return []
- else:
- return test_cases
- test_cases = []
- for path in self.test_case_paths:
- if os.path.isdir(path):
- for yml_file in find_by_suffix('.yml', path):
- test_cases.extend(get_test_cases_from_yml(yml_file))
- elif os.path.isfile(path) and path.endswith('.yml'):
- test_cases.extend(get_test_cases_from_yml(path))
- else:
- print('Test case path is invalid. Should only happen when use @bot to skip unit test.')
- # filter keys are lower case. Do map lower case keys with original keys.
- try:
- key_mapping = {x.lower(): x for x in test_cases[0].keys()}
- except IndexError:
- key_mapping = dict()
- if case_filter:
- for key in case_filter:
- filtered_cases = []
- for case in test_cases:
- try:
- mapped_key = key_mapping[key]
- # bot converts string to lower case
- if isinstance(case[mapped_key], str):
- _value = case[mapped_key].lower()
- else:
- _value = case[mapped_key]
- if _value in case_filter[key]:
- filtered_cases.append(case)
- except KeyError:
- # case don't have this key, regard as filter success
- filtered_cases.append(case)
- test_cases = filtered_cases
- # sort cases with configs and test functions
- # in later stage cases with similar attributes are more likely to be assigned to the same job
- # it will reduce the count of flash DUT operations
- test_cases.sort(key=lambda x: x['config'] + x['multi_stage'] + x['multi_device'])
- return test_cases
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('case_group', choices=['example_test', 'custom_test', 'unit_test', 'component_ut'])
- parser.add_argument('test_case_paths', nargs='+', help='test case folder or file')
- parser.add_argument('-c', '--config', help='gitlab ci config file')
- parser.add_argument('-o', '--output', help='output path of config files')
- parser.add_argument('--pipeline_id', '-p', type=int, default=None, help='pipeline_id')
- parser.add_argument('--test-case-file-pattern', help='file name pattern used to find Python test case files')
- args = parser.parse_args()
- SUPPORTED_TARGETS.extend(PREVIEW_TARGETS)
- test_case_paths = [os.path.join(IDF_PATH_FROM_ENV, path) if not os.path.isabs(path) else path for path in args.test_case_paths] # type: ignore
- args_list = [test_case_paths, args.config]
- if args.case_group == 'example_test':
- assigner = ExampleAssignTest(*args_list)
- elif args.case_group == 'custom_test':
- assigner = TestAppsAssignTest(*args_list)
- elif args.case_group == 'unit_test':
- assigner = UnitTestAssignTest(*args_list)
- elif args.case_group == 'component_ut':
- assigner = ComponentUTAssignTest(*args_list)
- else:
- raise SystemExit(1) # which is impossible
- if args.test_case_file_pattern:
- assigner.CI_TEST_JOB_PATTERN = re.compile(r'{}'.format(args.test_case_file_pattern))
- assigner.assign_cases()
- assigner.output_configs(args.output)
- assigner.create_artifact_index_file()
|