CIAssignUnitTest.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. """
  2. Command line tool to assign unit tests to CI test jobs.
  3. """
  4. import re
  5. import os
  6. import sys
  7. import argparse
  8. import yaml
  9. try:
  10. from yaml import CLoader as Loader
  11. except ImportError:
  12. from yaml import Loader as Loader
  13. try:
  14. from Utility import CIAssignTest
  15. except ImportError:
  16. test_fw_path = os.getenv("TEST_FW_PATH")
  17. if test_fw_path:
  18. sys.path.insert(0, test_fw_path)
  19. from Utility import CIAssignTest
  20. class Group(CIAssignTest.Group):
  21. SORT_KEYS = ["test environment", "tags", "chip_target"]
  22. MAX_CASE = 50
  23. ATTR_CONVERT_TABLE = {
  24. "execution_time": "execution time"
  25. }
  26. CI_JOB_MATCH_KEYS = ["test environment"]
  27. DUT_CLS_NAME = {
  28. "esp32": "ESP32DUT",
  29. "esp32s2beta": "ESP32S2DUT",
  30. "esp8266": "ESP8266DUT",
  31. }
  32. def __init__(self, case):
  33. super(Group, self).__init__(case)
  34. for tag in self._get_case_attr(case, "tags"):
  35. self.ci_job_match_keys.add(tag)
  36. @staticmethod
  37. def _get_case_attr(case, attr):
  38. if attr in Group.ATTR_CONVERT_TABLE:
  39. attr = Group.ATTR_CONVERT_TABLE[attr]
  40. return case[attr]
  41. def add_extra_case(self, case):
  42. """ If current group contains all tags required by case, then add succeed """
  43. added = False
  44. if self.accept_new_case():
  45. for key in self.filters:
  46. if self._get_case_attr(case, key) != self.filters[key]:
  47. if key == "tags":
  48. if self._get_case_attr(case, key).issubset(self.filters[key]):
  49. continue
  50. break
  51. else:
  52. self.case_list.append(case)
  53. added = True
  54. return added
  55. def _create_extra_data(self, test_cases, test_function):
  56. """
  57. For unit test case, we need to copy some attributes of test cases into config file.
  58. So unit test function knows how to run the case.
  59. """
  60. case_data = []
  61. for case in test_cases:
  62. one_case_data = {
  63. "config": self._get_case_attr(case, "config"),
  64. "name": self._get_case_attr(case, "summary"),
  65. "reset": self._get_case_attr(case, "reset"),
  66. "timeout": self._get_case_attr(case, "timeout"),
  67. }
  68. if test_function in ["run_multiple_devices_cases", "run_multiple_stage_cases"]:
  69. try:
  70. one_case_data["child case num"] = self._get_case_attr(case, "child case num")
  71. except KeyError as e:
  72. print("multiple devices/stages cases must contains at least two test functions")
  73. print("case name: {}".format(one_case_data["name"]))
  74. raise e
  75. case_data.append(one_case_data)
  76. return case_data
  77. def _divide_case_by_test_function(self):
  78. """
  79. divide cases of current test group by test function they need to use
  80. :return: dict of list of cases for each test functions
  81. """
  82. case_by_test_function = {
  83. "run_multiple_devices_cases": [],
  84. "run_multiple_stage_cases": [],
  85. "run_unit_test_cases": [],
  86. }
  87. for case in self.case_list:
  88. if case["multi_device"] == "Yes":
  89. case_by_test_function["run_multiple_devices_cases"].append(case)
  90. elif case["multi_stage"] == "Yes":
  91. case_by_test_function["run_multiple_stage_cases"].append(case)
  92. else:
  93. case_by_test_function["run_unit_test_cases"].append(case)
  94. return case_by_test_function
  95. def output(self):
  96. """
  97. output data for job configs
  98. :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
  99. """
  100. target = self._get_case_attr(self.case_list[0], "chip_target")
  101. if target:
  102. overwrite = {
  103. "dut": {
  104. "path": "IDF/IDFDUT.py",
  105. "class": self.DUT_CLS_NAME[target],
  106. }
  107. }
  108. else:
  109. overwrite = dict()
  110. case_by_test_function = self._divide_case_by_test_function()
  111. output_data = {
  112. # we don't need filter for test function, as UT uses a few test functions for all cases
  113. "CaseConfig": [
  114. {
  115. "name": test_function,
  116. "extra_data": self._create_extra_data(test_cases, test_function),
  117. "overwrite": overwrite,
  118. } for test_function, test_cases in case_by_test_function.iteritems() if test_cases
  119. ],
  120. }
  121. return output_data
  122. class UnitTestAssignTest(CIAssignTest.AssignTest):
  123. CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+")
  124. def __init__(self, test_case_path, ci_config_file):
  125. CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group)
  126. def _search_cases(self, test_case_path, case_filter=None):
  127. """
  128. For unit test case, we don't search for test functions.
  129. The unit test cases is stored in a yaml file which is created in job build-idf-test.
  130. """
  131. try:
  132. with open(test_case_path, "r") as f:
  133. raw_data = yaml.load(f, Loader=Loader)
  134. test_cases = raw_data["test cases"]
  135. for case in test_cases:
  136. case["tags"] = set(case["tags"])
  137. except IOError:
  138. print("Test case path is invalid. Should only happen when use @bot to skip unit test.")
  139. test_cases = []
  140. # filter keys are lower case. Do map lower case keys with original keys.
  141. try:
  142. key_mapping = {x.lower(): x for x in test_cases[0].keys()}
  143. except IndexError:
  144. key_mapping = dict()
  145. if case_filter:
  146. for key in case_filter:
  147. filtered_cases = []
  148. for case in test_cases:
  149. try:
  150. mapped_key = key_mapping[key]
  151. # bot converts string to lower case
  152. if isinstance(case[mapped_key], str):
  153. _value = case[mapped_key].lower()
  154. else:
  155. _value = case[mapped_key]
  156. if _value in case_filter[key]:
  157. filtered_cases.append(case)
  158. except KeyError:
  159. # case don't have this key, regard as filter success
  160. filtered_cases.append(case)
  161. test_cases = filtered_cases
  162. # sort cases with configs and test functions
  163. # in later stage cases with similar attributes are more likely to be assigned to the same job
  164. # it will reduce the count of flash DUT operations
  165. test_cases.sort(key=lambda x: x["config"] + x["multi_stage"] + x["multi_device"])
  166. return test_cases
  167. if __name__ == '__main__':
  168. parser = argparse.ArgumentParser()
  169. parser.add_argument("test_case",
  170. help="test case folder or file")
  171. parser.add_argument("ci_config_file",
  172. help="gitlab ci config file")
  173. parser.add_argument("output_path",
  174. help="output path of config files")
  175. args = parser.parse_args()
  176. assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file)
  177. assign_test.assign_cases()
  178. assign_test.output_configs(args.output_path)