generate_rules.py 11 KB


  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. import argparse
  6. import inspect
  7. import os
  8. import sys
  9. from collections import defaultdict
  10. from itertools import product
  11. import yaml
  12. from check_rules_yml import get_needed_rules
  13. from idf_ci_utils import IDF_PATH
  14. try:
  15. import pygraphviz as pgv
  16. except ImportError: # used when pre-commit, skip generating image
  17. pass
  18. try:
  19. from typing import Union
  20. except ImportError: # used for type hint
  21. pass
  22. def _list(str_or_list): # type: (Union[str, list]) -> list
  23. if isinstance(str_or_list, str):
  24. return [str_or_list]
  25. elif isinstance(str_or_list, list):
  26. return str_or_list
  27. else:
  28. raise ValueError('Wrong type: {}. Only supports str or list.'.format(type(str_or_list)))
  29. def _format_nested_dict(_dict, f_tuple): # type: (dict[str, dict], tuple[str, ...]) -> dict[str, dict]
  30. res = {}
  31. for k, v in _dict.items():
  32. k = k.split('__')[0]
  33. if isinstance(v, dict):
  34. v = _format_nested_dict(v, f_tuple)
  35. elif isinstance(v, list):
  36. v = _format_nested_list(v, f_tuple)
  37. elif isinstance(v, str):
  38. v = v.format(*f_tuple)
  39. res[k.format(*f_tuple)] = v
  40. return res
  41. def _format_nested_list(_list, f_tuple): # type: (list[str], tuple[str, ...]) -> list[str]
  42. res = []
  43. for item in _list:
  44. if isinstance(item, list):
  45. item = _format_nested_list(item, f_tuple)
  46. elif isinstance(item, dict):
  47. item = _format_nested_dict(item, f_tuple)
  48. elif isinstance(item, str):
  49. item = item.format(*f_tuple)
  50. res.append(item)
  51. return res
  52. class RulesWriter:
  53. AUTO_GENERATE_MARKER = inspect.cleandoc(r'''
  54. ##################
  55. # Auto Generated #
  56. ##################
  57. ''')
  58. LABEL_TEMPLATE = inspect.cleandoc(r'''
  59. .if-label-{0}: &if-label-{0}
  60. if: '$BOT_LABEL_{1} || $CI_MERGE_REQUEST_LABELS =~ /^(?:[^,\n\r]+,)*{0}(?:,[^,\n\r]+)*$/i'
  61. ''')
  62. RULE_PROTECTED = ' - <<: *if-protected'
  63. RULE_PROTECTED_NO_LABEL = ' - <<: *if-protected-no_label'
  64. RULE_BUILD_ONLY = ' - <<: *if-label-build-only\n' \
  65. ' when: never'
  66. RULE_REVERT_BRANCH = ' - <<: *if-revert-branch\n' \
  67. ' when: never'
  68. RULE_LABEL_TEMPLATE = ' - <<: *if-label-{0}'
  69. RULE_PATTERN_TEMPLATE = ' - <<: *if-dev-push\n' \
  70. ' changes: *patterns-{0}'
  71. SPECIFIC_RULE_TEMPLATE = ' - <<: *{0}'
  72. RULES_TEMPLATE = inspect.cleandoc(r"""
  73. .rules:{0}:
  74. rules:
  75. {1}
  76. """)
  77. KEYWORDS = ['labels', 'patterns']
  78. def __init__(self, rules_yml, depend_yml): # type: (str, str) -> None
  79. self.rules_yml = rules_yml
  80. self.rules_cfg = yaml.load(open(rules_yml), Loader=yaml.FullLoader)
  81. self.full_cfg = yaml.load(open(depend_yml), Loader=yaml.FullLoader)
  82. self.cfg = {k: v for k, v in self.full_cfg.items() if not k.startswith('.')}
  83. self.cfg = self.expand_matrices()
  84. self.rules = self.expand_rules()
  85. self.graph = None
  86. def expand_matrices(self): # type: () -> dict
  87. """
  88. Expand the matrix into different rules
  89. """
  90. res = {}
  91. for k, v in self.cfg.items():
  92. res.update(self._expand_matrix(k, v))
  93. for k, v in self.cfg.items():
  94. if not v:
  95. continue
  96. deploy = v.get('deploy')
  97. if deploy:
  98. for item in _list(deploy):
  99. res['{}-{}'.format(k, item)] = v
  100. return res
  101. @staticmethod
  102. def _expand_matrix(name, cfg): # type: (str, dict) -> dict
  103. """
  104. Expand matrix into multi keys
  105. :param cfg: single rule dict
  106. :return:
  107. """
  108. default = {name: cfg}
  109. if not cfg:
  110. return default
  111. matrices = cfg.pop('matrix', None)
  112. if not matrices:
  113. return default
  114. res = {}
  115. for comb in product(*_list(matrices)):
  116. res.update(_format_nested_dict(default, comb))
  117. return res
  118. def expand_rules(self): # type: () -> dict[str, dict[str, list]]
  119. res = defaultdict(lambda: defaultdict(set)) # type: dict[str, dict[str, set]]
  120. for k, v in self.cfg.items():
  121. if not v:
  122. continue
  123. for vk, vv in v.items():
  124. if vk in self.KEYWORDS:
  125. res[k][vk] = set(_list(vv))
  126. else:
  127. res[k][vk] = vv
  128. for key in self.KEYWORDS: # provide empty set for missing field
  129. if key not in res[k]:
  130. res[k][key] = set()
  131. for k, v in self.cfg.items():
  132. if not v:
  133. continue
  134. if 'included_in' in v:
  135. for item in _list(v['included_in']):
  136. if 'specific_rules' in v:
  137. res[item]['specific_rules'].update(_list(v['specific_rules']))
  138. if 'labels' in v:
  139. res[item]['labels'].update(_list(v['labels']))
  140. if 'patterns' in v:
  141. for _pat in _list(v['patterns']):
  142. # Patterns must be pre-defined
  143. if '.patterns-{}'.format(_pat) not in self.rules_cfg:
  144. print('WARNING: pattern {} not exists'.format(_pat))
  145. continue
  146. res[item]['patterns'].add(_pat)
  147. sorted_res = defaultdict(lambda: defaultdict(list)) # type: dict[str, dict[str, list]]
  148. for k, v in res.items():
  149. for vk, vv in v.items():
  150. sorted_res[k][vk] = sorted(vv)
  151. return sorted_res
  152. def new_labels_str(self): # type: () -> str
  153. _labels = set([])
  154. for k, v in self.cfg.items():
  155. if not v:
  156. continue # shouldn't be possible
  157. labels = v.get('labels')
  158. if not labels:
  159. continue
  160. _labels.update(_list(labels))
  161. labels = sorted(_labels)
  162. res = ''
  163. res += '\n\n'.join([self._format_label(_label) for _label in labels])
  164. return res
  165. @classmethod
  166. def _format_label(cls, label): # type: (str) -> str
  167. return cls.LABEL_TEMPLATE.format(label, cls.bot_label_str(label))
  168. @staticmethod
  169. def bot_label_str(label): # type: (str) -> str
  170. return label.upper().replace('-', '_')
  171. def new_rules_str(self): # type: () -> str
  172. res = []
  173. for k, v in sorted(self.rules.items()):
  174. if '.rules:' + k not in get_needed_rules():
  175. print(f'WARNING: unused rule: {k}, skipping...')
  176. continue
  177. res.append(self.RULES_TEMPLATE.format(k, self._format_rule(k, v)))
  178. return '\n\n'.join(res)
  179. def _format_rule(self, name, cfg): # type: (str, dict) -> str
  180. _rules = [self.RULE_REVERT_BRANCH]
  181. if name.endswith('-production'):
  182. _rules.append(self.RULE_PROTECTED_NO_LABEL)
  183. else:
  184. if not (name.endswith('-preview') or name.startswith('labels:')):
  185. _rules.append(self.RULE_PROTECTED)
  186. if name.startswith('test:'):
  187. _rules.append(self.RULE_BUILD_ONLY)
  188. for specific_rule in cfg['specific_rules']:
  189. if f'.{specific_rule}' in self.rules_cfg:
  190. _rules.append(self.SPECIFIC_RULE_TEMPLATE.format(specific_rule))
  191. else:
  192. print('WARNING: specific_rule {} not exists'.format(specific_rule))
  193. for label in cfg['labels']:
  194. _rules.append(self.RULE_LABEL_TEMPLATE.format(label))
  195. for pattern in cfg['patterns']:
  196. if '.patterns-{}'.format(pattern) in self.rules_cfg:
  197. _rules.append(self.RULE_PATTERN_TEMPLATE.format(pattern))
  198. else:
  199. print('WARNING: pattern {} not exists'.format(pattern))
  200. return '\n'.join(_rules)
  201. def update_rules_yml(self): # type: () -> bool
  202. with open(self.rules_yml) as fr:
  203. file_str = fr.read()
  204. auto_generate_str = '\n{}\n\n{}\n'.format(self.new_labels_str(), self.new_rules_str())
  205. rest, marker, old = file_str.partition(self.AUTO_GENERATE_MARKER)
  206. if old == auto_generate_str:
  207. return False
  208. else:
  209. print(self.rules_yml, 'has been modified. Please check')
  210. with open(self.rules_yml, 'w') as fw:
  211. fw.write(rest + marker + auto_generate_str)
  212. return True
  213. LABEL_COLOR = 'green'
  214. PATTERN_COLOR = 'cyan'
  215. RULE_COLOR = 'blue'
  216. def build_graph(rules_dict): # type: (dict[str, dict[str, list]]) -> pgv.AGraph
  217. graph = pgv.AGraph(directed=True, rankdir='LR', concentrate=True)
  218. for k, v in rules_dict.items():
  219. if not v:
  220. continue
  221. included_in = v.get('included_in')
  222. if included_in:
  223. for item in _list(included_in):
  224. graph.add_node(k, color=RULE_COLOR)
  225. graph.add_node(item, color=RULE_COLOR)
  226. graph.add_edge(k, item, color=RULE_COLOR)
  227. labels = v.get('labels')
  228. if labels:
  229. for _label in labels:
  230. graph.add_node('label:{}'.format(_label), color=LABEL_COLOR)
  231. graph.add_edge('label:{}'.format(_label), k, color=LABEL_COLOR)
  232. patterns = v.get('patterns')
  233. if patterns:
  234. for _pat in patterns:
  235. graph.add_node('pattern:{}'.format(_pat), color=PATTERN_COLOR)
  236. graph.add_edge('pattern:{}'.format(_pat), k, color=PATTERN_COLOR)
  237. return graph
  238. def output_graph(graph, output_path='output.png'): # type: (pgv.AGraph, str) -> None
  239. graph.layout('dot')
  240. if output_path.endswith('.png'):
  241. img_path = output_path
  242. else:
  243. img_path = os.path.join(output_path, 'output.png')
  244. graph.draw(img_path)
  245. if __name__ == '__main__':
  246. parser = argparse.ArgumentParser(description=__doc__)
  247. parser.add_argument('rules_yml', nargs='?', default=os.path.join(IDF_PATH, '.gitlab', 'ci', 'rules.yml'),
  248. help='rules.yml file path')
  249. parser.add_argument('dependencies_yml', nargs='?', default=os.path.join(IDF_PATH, '.gitlab', 'ci', 'dependencies',
  250. 'dependencies.yml'),
  251. help='dependencies.yml file path')
  252. parser.add_argument('--graph',
  253. help='Specify PNG image output path. Use this argument to generate dependency graph')
  254. args = parser.parse_args()
  255. writer = RulesWriter(args.rules_yml, args.dependencies_yml)
  256. file_modified = writer.update_rules_yml()
  257. if args.graph:
  258. dep_tree_graph = build_graph(writer.rules)
  259. output_graph(dep_tree_graph)
  260. sys.exit(file_modified)