generate_rules.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. import argparse
  6. import inspect
  7. import os
  8. import sys
  9. from collections import defaultdict
  10. from itertools import product
  11. import yaml
  12. try:
  13. import pygraphviz as pgv
  14. except ImportError: # used when pre-commit, skip generating image
  15. pass
  16. try:
  17. from typing import Union
  18. except ImportError: # used for type hint
  19. pass
  20. IDF_PATH = os.path.abspath(os.getenv('IDF_PATH', os.path.join(os.path.dirname(__file__), '..', '..', '..')))
  21. def _list(str_or_list): # type: (Union[str, list]) -> list
  22. if isinstance(str_or_list, str):
  23. return [str_or_list]
  24. elif isinstance(str_or_list, list):
  25. return str_or_list
  26. else:
  27. raise ValueError('Wrong type: {}. Only supports str or list.'.format(type(str_or_list)))
  28. def _format_nested_dict(_dict, f_tuple): # type: (dict[str, dict], tuple[str, ...]) -> dict[str, dict]
  29. res = {}
  30. for k, v in _dict.items():
  31. k = k.split('__')[0]
  32. if isinstance(v, dict):
  33. v = _format_nested_dict(v, f_tuple)
  34. elif isinstance(v, list):
  35. v = _format_nested_list(v, f_tuple)
  36. elif isinstance(v, str):
  37. v = v.format(*f_tuple)
  38. res[k.format(*f_tuple)] = v
  39. return res
  40. def _format_nested_list(_list, f_tuple): # type: (list[str], tuple[str, ...]) -> list[str]
  41. res = []
  42. for item in _list:
  43. if isinstance(item, list):
  44. item = _format_nested_list(item, f_tuple)
  45. elif isinstance(item, dict):
  46. item = _format_nested_dict(item, f_tuple)
  47. elif isinstance(item, str):
  48. item = item.format(*f_tuple)
  49. res.append(item)
  50. return res
  51. class RulesWriter:
  52. AUTO_GENERATE_MARKER = inspect.cleandoc(r'''
  53. ##################
  54. # Auto Generated #
  55. ##################
  56. ''')
  57. LABEL_TEMPLATE = inspect.cleandoc(r'''
  58. .if-label-{0}: &if-label-{0}
  59. if: '$BOT_LABEL_{1} || $CI_MERGE_REQUEST_LABELS =~ /^(?:[^,\n\r]+,)*{0}(?:,[^,\n\r]+)*$/i'
  60. ''')
  61. RULE_PROTECTED = ' - <<: *if-protected'
  62. RULE_PROTECTED_NO_LABEL = ' - <<: *if-protected-no_label'
  63. RULE_BUILD_ONLY = ' - <<: *if-label-build-only\n' \
  64. ' when: never'
  65. RULE_REVERT_BRANCH = ' - <<: *if-revert-branch\n' \
  66. ' when: never'
  67. RULE_LABEL_TEMPLATE = ' - <<: *if-label-{0}'
  68. RULE_PATTERN_TEMPLATE = ' - <<: *if-dev-push\n' \
  69. ' changes: *patterns-{0}'
  70. SPECIFIC_RULE_TEMPLATE = ' - <<: *{0}'
  71. RULES_TEMPLATE = inspect.cleandoc(r"""
  72. .rules:{0}:
  73. rules:
  74. {1}
  75. """)
  76. KEYWORDS = ['labels', 'patterns']
  77. def __init__(self, rules_yml, depend_yml): # type: (str, str) -> None
  78. self.rules_yml = rules_yml
  79. self.rules_cfg = yaml.load(open(rules_yml), Loader=yaml.FullLoader)
  80. self.full_cfg = yaml.load(open(depend_yml), Loader=yaml.FullLoader)
  81. self.cfg = {k: v for k, v in self.full_cfg.items() if not k.startswith('.')}
  82. self.cfg = self.expand_matrices()
  83. self.rules = self.expand_rules()
  84. self.graph = None
  85. def expand_matrices(self): # type: () -> dict
  86. """
  87. Expand the matrix into different rules
  88. """
  89. res = {}
  90. for k, v in self.cfg.items():
  91. res.update(self._expand_matrix(k, v))
  92. for k, v in self.cfg.items():
  93. if not v:
  94. continue
  95. deploy = v.get('deploy')
  96. if deploy:
  97. for item in _list(deploy):
  98. res['{}-{}'.format(k, item)] = v
  99. return res
  100. @staticmethod
  101. def _expand_matrix(name, cfg): # type: (str, dict) -> dict
  102. """
  103. Expand matrix into multi keys
  104. :param cfg: single rule dict
  105. :return:
  106. """
  107. default = {name: cfg}
  108. if not cfg:
  109. return default
  110. matrices = cfg.pop('matrix', None)
  111. if not matrices:
  112. return default
  113. res = {}
  114. for comb in product(*_list(matrices)):
  115. res.update(_format_nested_dict(default, comb))
  116. return res
  117. def expand_rules(self): # type: () -> dict[str, dict[str, list]]
  118. res = defaultdict(lambda: defaultdict(set)) # type: dict[str, dict[str, set]]
  119. for k, v in self.cfg.items():
  120. if not v:
  121. continue
  122. for vk, vv in v.items():
  123. if vk in self.KEYWORDS:
  124. res[k][vk] = set(_list(vv))
  125. else:
  126. res[k][vk] = vv
  127. for key in self.KEYWORDS: # provide empty set for missing field
  128. if key not in res[k]:
  129. res[k][key] = set()
  130. for k, v in self.cfg.items():
  131. if not v:
  132. continue
  133. if 'included_in' in v:
  134. for item in _list(v['included_in']):
  135. if 'specific_rules' in v:
  136. res[item]['specific_rules'].update(_list(v['specific_rules']))
  137. if 'labels' in v:
  138. res[item]['labels'].update(_list(v['labels']))
  139. if 'patterns' in v:
  140. for _pat in _list(v['patterns']):
  141. # Patterns must be pre-defined
  142. if '.patterns-{}'.format(_pat) not in self.rules_cfg:
  143. print('WARNING: pattern {} not exists'.format(_pat))
  144. continue
  145. res[item]['patterns'].add(_pat)
  146. sorted_res = defaultdict(lambda: defaultdict(list)) # type: dict[str, dict[str, list]]
  147. for k, v in res.items():
  148. for vk, vv in v.items():
  149. sorted_res[k][vk] = sorted(vv)
  150. return sorted_res
  151. def new_labels_str(self): # type: () -> str
  152. _labels = set([])
  153. for k, v in self.cfg.items():
  154. if not v:
  155. continue # shouldn't be possible
  156. labels = v.get('labels')
  157. if not labels:
  158. continue
  159. _labels.update(_list(labels))
  160. labels = sorted(_labels)
  161. res = ''
  162. res += '\n\n'.join([self._format_label(_label) for _label in labels])
  163. return res
  164. @classmethod
  165. def _format_label(cls, label): # type: (str) -> str
  166. return cls.LABEL_TEMPLATE.format(label, cls.bot_label_str(label))
  167. @staticmethod
  168. def bot_label_str(label): # type: (str) -> str
  169. return label.upper().replace('-', '_')
  170. def new_rules_str(self): # type: () -> str
  171. res = []
  172. for k, v in sorted(self.rules.items()):
  173. res.append(self.RULES_TEMPLATE.format(k, self._format_rule(k, v)))
  174. return '\n\n'.join(res)
  175. def _format_rule(self, name, cfg): # type: (str, dict) -> str
  176. _rules = [self.RULE_REVERT_BRANCH]
  177. if name.endswith('-production'):
  178. _rules.append(self.RULE_PROTECTED_NO_LABEL)
  179. else:
  180. if not (name.endswith('-preview') or name.startswith('labels:')):
  181. _rules.append(self.RULE_PROTECTED)
  182. if name.startswith('test:'):
  183. _rules.append(self.RULE_BUILD_ONLY)
  184. for specific_rule in cfg['specific_rules']:
  185. if f'.{specific_rule}' in self.rules_cfg:
  186. _rules.append(self.SPECIFIC_RULE_TEMPLATE.format(specific_rule))
  187. else:
  188. print('WARNING: specific_rule {} not exists'.format(specific_rule))
  189. for label in cfg['labels']:
  190. _rules.append(self.RULE_LABEL_TEMPLATE.format(label))
  191. for pattern in cfg['patterns']:
  192. if '.patterns-{}'.format(pattern) in self.rules_cfg:
  193. _rules.append(self.RULE_PATTERN_TEMPLATE.format(pattern))
  194. else:
  195. print('WARNING: pattern {} not exists'.format(pattern))
  196. return '\n'.join(_rules)
  197. def update_rules_yml(self): # type: () -> bool
  198. with open(self.rules_yml) as fr:
  199. file_str = fr.read()
  200. auto_generate_str = '\n{}\n\n{}\n'.format(self.new_labels_str(), self.new_rules_str())
  201. rest, marker, old = file_str.partition(self.AUTO_GENERATE_MARKER)
  202. if old == auto_generate_str:
  203. return False
  204. else:
  205. print(self.rules_yml, 'has been modified. Please check')
  206. with open(self.rules_yml, 'w') as fw:
  207. fw.write(rest + marker + auto_generate_str)
  208. return True
  209. LABEL_COLOR = 'green'
  210. PATTERN_COLOR = 'cyan'
  211. RULE_COLOR = 'blue'
  212. def build_graph(rules_dict): # type: (dict[str, dict[str, list]]) -> pgv.AGraph
  213. graph = pgv.AGraph(directed=True, rankdir='LR', concentrate=True)
  214. for k, v in rules_dict.items():
  215. if not v:
  216. continue
  217. included_in = v.get('included_in')
  218. if included_in:
  219. for item in _list(included_in):
  220. graph.add_node(k, color=RULE_COLOR)
  221. graph.add_node(item, color=RULE_COLOR)
  222. graph.add_edge(k, item, color=RULE_COLOR)
  223. labels = v.get('labels')
  224. if labels:
  225. for _label in labels:
  226. graph.add_node('label:{}'.format(_label), color=LABEL_COLOR)
  227. graph.add_edge('label:{}'.format(_label), k, color=LABEL_COLOR)
  228. patterns = v.get('patterns')
  229. if patterns:
  230. for _pat in patterns:
  231. graph.add_node('pattern:{}'.format(_pat), color=PATTERN_COLOR)
  232. graph.add_edge('pattern:{}'.format(_pat), k, color=PATTERN_COLOR)
  233. return graph
  234. def output_graph(graph, output_path='output.png'): # type: (pgv.AGraph, str) -> None
  235. graph.layout('dot')
  236. if output_path.endswith('.png'):
  237. img_path = output_path
  238. else:
  239. img_path = os.path.join(output_path, 'output.png')
  240. graph.draw(img_path)
  241. if __name__ == '__main__':
  242. parser = argparse.ArgumentParser(description=__doc__)
  243. parser.add_argument('rules_yml', nargs='?', default=os.path.join(IDF_PATH, '.gitlab', 'ci', 'rules.yml'),
  244. help='rules.yml file path')
  245. parser.add_argument('dependencies_yml', nargs='?', default=os.path.join(IDF_PATH, '.gitlab', 'ci', 'dependencies',
  246. 'dependencies.yml'),
  247. help='dependencies.yml file path')
  248. parser.add_argument('--graph',
  249. help='Specify PNG image output path. Use this argument to generate dependency graph')
  250. args = parser.parse_args()
  251. writer = RulesWriter(args.rules_yml, args.dependencies_yml)
  252. file_modified = writer.update_rules_yml()
  253. if args.graph:
  254. dep_tree_graph = build_graph(writer.rules)
  255. output_graph(dep_tree_graph)
  256. sys.exit(file_modified)