Просмотр исходного кода

Merge branch 'refactor/add_python_types_hints_to_ci' into 'master'

ci: Add python types hints

See merge request espressif/esp-idf!18726
Roland Dobai 3 лет назад
Родитель
Сommit
2cf1370115

+ 2 - 2
tools/ci/check_artifacts_expire_time.py

@@ -17,14 +17,14 @@ if not IDF_PATH:
 GITLAB_CONFIG_FILE = os.path.join(IDF_PATH, '.gitlab-ci.yml')
 
 
-def check_artifacts_expire_time():
+def check_artifacts_expire_time() -> None:
     with open(GITLAB_CONFIG_FILE, 'r') as f:
         config = yaml.load(f, Loader=yaml.FullLoader)
 
     # load files listed in `include`
     if 'include' in config:
         for _file in config['include']:
-            with open(os.path.join(IDF_PATH, _file)) as f:
+            with open(os.path.join(IDF_PATH or '', _file)) as f:
                 config.update(yaml.load(f, Loader=yaml.FullLoader))
 
     print('expire time for jobs:')

+ 35 - 39
tools/ci/check_callgraph.py

@@ -8,15 +8,11 @@ import argparse
 import os
 import re
 from functools import partial
+from typing import BinaryIO, Callable, Dict, Generator, List, Optional, Tuple
 
 import elftools
 from elftools.elf import elffile
 
-try:
-    from typing import BinaryIO, Callable, Dict, Generator, List, Optional, Tuple
-except ImportError:
-    pass
-
 FUNCTION_REGEX = re.compile(
     r'^;; Function (?P<mangle>.*)\s+\((?P<function>\S+)(,.*)?\).*$'
 )
@@ -25,29 +21,29 @@ SYMBOL_REF_REGEX = re.compile(r'^.*\(symbol_ref[^()]*\("(?P<target>.*)"\).*$')
 
 
 class RtlFunction(object):
-    def __init__(self, name, rtl_filename, tu_filename):
+    def __init__(self, name: str, rtl_filename: str, tu_filename: str) -> None:
         self.name = name
         self.rtl_filename = rtl_filename
         self.tu_filename = tu_filename
-        self.calls = list()  # type: List[str]
-        self.refs = list()  # type: List[str]
+        self.calls: List[str] = list()
+        self.refs: List[str] = list()
         self.sym = None
 
 
 class SectionAddressRange(object):
-    def __init__(self, name, addr, size):  # type: (str, int, int) -> None
+    def __init__(self, name: str, addr: int, size: int) -> None:
         self.name = name
         self.low = addr
         self.high = addr + size
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '{}: 0x{:08x} - 0x{:08x}'.format(self.name, self.low, self.high)
 
-    def contains_address(self, addr):
+    def contains_address(self, addr: int) -> bool:
         return self.low <= addr < self.high
 
 
-TARGET_SECTIONS = {
+TARGET_SECTIONS: Dict[str, List[SectionAddressRange]] = {
     'esp32': [
         SectionAddressRange('.rom.text', 0x40000000, 0x70000),
         SectionAddressRange('.rom.rodata', 0x3ff96000, 0x9018)
@@ -60,20 +56,20 @@ TARGET_SECTIONS = {
         SectionAddressRange('.rom.text', 0x40000000, 0x568d0),
         SectionAddressRange('.rom.rodata', 0x3ff071c0, 0x8e30)
     ]
-}  # type: Dict[str, List[SectionAddressRange]]
+}
 
 
 class Symbol(object):
-    def __init__(self, name, addr, local, filename, section):  # type: (str, int, bool, Optional[str], Optional[str]) -> None
+    def __init__(self, name: str, addr: int, local: bool, filename: Optional[str], section: Optional[str]) -> None:
         self.name = name
         self.addr = addr
         self.local = local
         self.filename = filename
         self.section = section
-        self.refers_to = list()  # type: List[Symbol]
-        self.referred_from = list()  # type: List[Symbol]
+        self.refers_to: List[Symbol] = list()
+        self.referred_from: List[Symbol] = list()
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '{} @0x{:08x} [{}]{} {}'.format(
             self.name,
             self.addr,
@@ -84,11 +80,11 @@ class Symbol(object):
 
 
 class Reference(object):
-    def __init__(self, from_sym, to_sym):  # type: (Symbol, Symbol) -> None
+    def __init__(self, from_sym: Symbol, to_sym: Symbol) -> None:
         self.from_sym = from_sym
         self.to_sym = to_sym
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '{} @0x{:08x} ({}) -> {} @0x{:08x} ({})'.format(
             self.from_sym.name,
             self.from_sym.addr,
@@ -100,13 +96,13 @@ class Reference(object):
 
 
 class ElfInfo(object):
-    def __init__(self, elf_file):  # type: (BinaryIO) -> None
+    def __init__(self, elf_file: BinaryIO) -> None:
         self.elf_file = elf_file
         self.elf_obj = elffile.ELFFile(self.elf_file)
         self.section_ranges = self._load_sections()
         self.symbols = self._load_symbols()
 
-    def _load_symbols(self):  # type: () -> List[Symbol]
+    def _load_symbols(self) -> List[Symbol]:
         symbols = []
         for s in self.elf_obj.iter_sections():
             if not isinstance(s, elftools.elf.sections.SymbolTableSection):
@@ -130,7 +126,7 @@ class ElfInfo(object):
                     )
         return symbols
 
-    def _load_sections(self):  # type: () -> List[SectionAddressRange]
+    def _load_sections(self) -> List[SectionAddressRange]:
         result = []
         for segment in self.elf_obj.iter_segments():
             if segment['p_type'] == 'PT_LOAD':
@@ -149,22 +145,22 @@ class ElfInfo(object):
 
         return result
 
-    def symbols_by_name(self, name):  # type: (str) -> List[Symbol]
+    def symbols_by_name(self, name: str) -> List['Symbol']:
         res = []
         for sym in self.symbols:
             if sym.name == name:
                 res.append(sym)
         return res
 
-    def section_for_addr(self, sym_addr):  # type: (int) -> Optional[str]
+    def section_for_addr(self, sym_addr: int) -> Optional[str]:
         for sar in self.section_ranges:
             if sar.contains_address(sym_addr):
                 return sar.name
         return None
 
 
-def load_rtl_file(rtl_filename, tu_filename, functions):  # type: (str, str, List[RtlFunction]) -> None
-    last_function = None  # type: Optional[RtlFunction]
+def load_rtl_file(rtl_filename: str, tu_filename: str, functions: List[RtlFunction]) -> None:
+    last_function: Optional[RtlFunction] = None
     for line in open(rtl_filename):
         # Find function definition
         match = re.match(FUNCTION_REGEX, line)
@@ -192,7 +188,7 @@ def load_rtl_file(rtl_filename, tu_filename, functions):  # type: (str, str, Lis
                 continue
 
 
-def rtl_filename_matches_sym_filename(rtl_filename, symbol_filename):  # type: (str, str) -> bool
+def rtl_filename_matches_sym_filename(rtl_filename: str, symbol_filename: str) -> bool:
     # Symbol file names (from ELF debug info) are short source file names, without path: "cpu_start.c".
     # RTL file names are paths relative to the build directory, e.g.:
     # "build/esp-idf/esp_system/CMakeFiles/__idf_esp_system.dir/port/cpu_start.c.234r.expand"
@@ -211,7 +207,7 @@ class SymbolNotFound(RuntimeError):
     pass
 
 
-def find_symbol_by_name(name, elfinfo, local_func_matcher):  # type: (str, ElfInfo, Callable[[Symbol], bool]) -> Optional[Symbol]
+def find_symbol_by_name(name: str, elfinfo: ElfInfo, local_func_matcher: Callable[[Symbol], bool]) -> Optional[Symbol]:
     """
     Find an ELF symbol for the given name.
     local_func_matcher is a callback function which checks is the candidate local symbol is suitable.
@@ -238,7 +234,7 @@ def find_symbol_by_name(name, elfinfo, local_func_matcher):  # type: (str, ElfIn
         return local_candidate or global_candidate
 
 
-def match_local_source_func(rtl_filename, sym):  # type: (str, Symbol) -> bool
+def match_local_source_func(rtl_filename: str, sym: Symbol) -> bool:
     """
     Helper for match_rtl_funcs_to_symbols, checks if local symbol sym is a good candidate for the
     reference source (caller), based on the RTL file name.
@@ -247,7 +243,7 @@ def match_local_source_func(rtl_filename, sym):  # type: (str, Symbol) -> bool
     return rtl_filename_matches_sym_filename(rtl_filename, sym.filename)
 
 
-def match_local_target_func(rtl_filename, sym_from, sym):  # type: (str, Symbol, Symbol) -> bool
+def match_local_target_func(rtl_filename: str, sym_from: Symbol, sym: Symbol) -> bool:
     """
     Helper for match_rtl_funcs_to_symbols, checks if local symbol sym is a good candidate for the
     reference target (callee or referenced data), based on RTL filename of the source symbol
@@ -263,9 +259,9 @@ def match_local_target_func(rtl_filename, sym_from, sym):  # type: (str, Symbol,
         return rtl_filename_matches_sym_filename(rtl_filename, sym.filename)
 
 
-def match_rtl_funcs_to_symbols(rtl_functions, elfinfo):  # type: (List[RtlFunction], ElfInfo) -> Tuple[List[Symbol], List[Reference]]
-    symbols = []  # type: List[Symbol]
-    refs = []  # type: List[Reference]
+def match_rtl_funcs_to_symbols(rtl_functions: List[RtlFunction], elfinfo: ElfInfo) -> Tuple[List[Symbol], List[Reference]]:
+    symbols: List[Symbol] = []
+    refs: List[Reference] = []
 
     # General idea:
     # - iterate over RTL functions.
@@ -308,17 +304,17 @@ def match_rtl_funcs_to_symbols(rtl_functions, elfinfo):  # type: (List[RtlFuncti
     return symbols, refs
 
 
-def get_symbols_and_refs(rtl_list, elf_file):  # type: (List[str], BinaryIO) -> Tuple[List[Symbol], List[Reference]]
+def get_symbols_and_refs(rtl_list: List[str], elf_file: BinaryIO) -> Tuple[List[Symbol], List[Reference]]:
     elfinfo = ElfInfo(elf_file)
 
-    rtl_functions = []  # type: List[RtlFunction]
+    rtl_functions: List[RtlFunction] = []
     for file_name in rtl_list:
         load_rtl_file(file_name, file_name, rtl_functions)
 
     return match_rtl_funcs_to_symbols(rtl_functions, elfinfo)
 
 
-def list_refs_from_to_sections(refs, from_sections, to_sections):  # type: (List[Reference], List[str], List[str]) -> int
+def list_refs_from_to_sections(refs: List[Reference], from_sections: List[str], to_sections: List[str]) -> int:
     found = 0
     for ref in refs:
         if (not from_sections or ref.from_sym.section in from_sections) and \
@@ -328,7 +324,7 @@ def list_refs_from_to_sections(refs, from_sections, to_sections):  # type: (List
     return found
 
 
-def find_files_recursive(root_path, ext):  # type: (str, str) -> Generator[str, None, None]
+def find_files_recursive(root_path: str, ext: str) -> Generator[str, None, None]:
     for root, _, files in os.walk(root_path):
         for basename in files:
             if basename.endswith(ext):
@@ -336,7 +332,7 @@ def find_files_recursive(root_path, ext):  # type: (str, str) -> Generator[str,
                 yield filename
 
 
-def main():
+def main() -> None:
     parser = argparse.ArgumentParser()
 
     parser.add_argument(
@@ -379,7 +375,7 @@ def main():
     args = parser.parse_args()
     if args.rtl_list:
         with open(args.rtl_list, 'r') as rtl_list_file:
-            rtl_list = [line.strip for line in rtl_list_file]
+            rtl_list = [line.strip() for line in rtl_list_file]
     else:
         if not args.rtl_dir:
             raise RuntimeError('Either --rtl-list or --rtl-dir must be specified')

+ 13 - 12
tools/ci/check_codeowners.py

@@ -10,6 +10,7 @@ import os
 import re
 import subprocess
 import sys
+from typing import List, Optional
 
 from idf_ci_utils import IDF_PATH
 
@@ -17,7 +18,7 @@ CODEOWNERS_PATH = os.path.join(IDF_PATH, '.gitlab', 'CODEOWNERS')
 CODEOWNER_GROUP_PREFIX = '@esp-idf-codeowners/'
 
 
-def get_all_files():
+def get_all_files() -> List[str]:
     """
     Get list of all file paths in the repository.
     """
@@ -25,7 +26,7 @@ def get_all_files():
     return subprocess.check_output(['git', 'ls-files'], cwd=IDF_PATH).decode('utf-8').strip().split('\n')
 
 
-def pattern_to_regex(pattern):
+def pattern_to_regex(pattern: str) -> str:
     """
     Convert the CODEOWNERS path pattern into a regular expression string.
     """
@@ -59,14 +60,14 @@ def pattern_to_regex(pattern):
     return re_pattern
 
 
-def files_by_regex(all_files, regex):
+def files_by_regex(all_files: List, regex: re.Pattern) -> List:
     """
     Return all files in the repository matching the given regular expresion.
     """
     return [file for file in all_files if regex.search('/' + file)]
 
 
-def files_by_pattern(all_files, pattern=None):
+def files_by_pattern(all_files: list, pattern: Optional[str]=None) -> List:
     """
     Return all the files in the repository matching the given CODEOWNERS pattern.
     """
@@ -76,7 +77,7 @@ def files_by_pattern(all_files, pattern=None):
     return files_by_regex(all_files, re.compile(pattern_to_regex(pattern)))
 
 
-def action_identify(args):
+def action_identify(args: argparse.Namespace) -> None:
     best_match = []
     all_files = get_all_files()
     with open(CODEOWNERS_PATH) as f:
@@ -94,7 +95,7 @@ def action_identify(args):
         print(owner)
 
 
-def action_test_pattern(args):
+def action_test_pattern(args: argparse.Namespace) -> None:
     re_pattern = pattern_to_regex(args.pattern)
 
     if args.regex:
@@ -106,10 +107,10 @@ def action_test_pattern(args):
         print(f)
 
 
-def action_ci_check(args):
+def action_ci_check(args: argparse.Namespace) -> None:
     errors = []
 
-    def add_error(msg):
+    def add_error(msg: str) -> None:
         errors.append('{}:{}: {}'.format(CODEOWNERS_PATH, line_no, msg))
 
     all_files = get_all_files()
@@ -158,7 +159,7 @@ def action_ci_check(args):
         raise SystemExit(1)
 
 
-def in_order(prev, current):
+def in_order(prev: str, current: str) -> bool:
     """
     Return True if the ordering is correct for these two lines ('prev' should be before 'current').
 
@@ -172,10 +173,10 @@ def in_order(prev, current):
     if not prev:
         return True  # first element in file
 
-    def is_separator(c):
+    def is_separator(c: str) -> bool:
         return c in '-_/'  # ignore differences between separators for ordering purposes
 
-    def is_wildcard(c):
+    def is_wildcard(c: str) -> bool:
         return c in '?*'
 
     # looping until we see a different character
@@ -192,7 +193,7 @@ def in_order(prev, current):
     return len(current) >= len(prev)
 
 
-def main():
+def main() -> None:
     parser = argparse.ArgumentParser(
         sys.argv[0], description='Internal helper script for working with the CODEOWNERS file.'
     )

+ 0 - 1
tools/ci/check_copyright_ignore.txt

@@ -2027,7 +2027,6 @@ tools/ble/lib_ble_client.py
 tools/ble/lib_gap.py
 tools/ble/lib_gatt.py
 tools/catch/catch.hpp
-tools/esp_app_trace/test/sysview/blink.c
 tools/find_build_apps/__init__.py
 tools/find_build_apps/cmake.py
 tools/find_build_apps/common.py

+ 10 - 5
tools/ci/check_deprecated_kconfigs.py

@@ -9,6 +9,7 @@ import argparse
 import os
 import sys
 from io import open
+from typing import Set, Tuple
 
 from check_kconfigs import valid_directory
 from idf_ci_utils import get_submodule_dirs
@@ -19,11 +20,11 @@ FILES_TO_CHECK = ('sdkconfig.ci', 'sdkconfig.defaults')
 # ignored directories (makes sense only when run on IDF_PATH)
 # Note: IGNORE_DIRS is a tuple in order to be able to use it directly with the startswith() built-in function which
 # accepts tuples but no lists.
-IGNORE_DIRS = (
+IGNORE_DIRS: Tuple = (
 )
 
 
-def _parse_path(path, sep=None):
+def _parse_path(path: os.PathLike[str], sep: str=None) -> Set:
     ret = set()
     with open(path, 'r', encoding='utf-8') as f:
         for line in f:
@@ -33,13 +34,13 @@ def _parse_path(path, sep=None):
     return ret
 
 
-def _valid_directory(path):
+def _valid_directory(path: os.PathLike[str]) -> os.PathLike[str]:
     if not os.path.isdir(path):
         raise argparse.ArgumentTypeError('{} is not a valid directory!'.format(path))
     return path
 
 
-def main():
+def check() -> int:
     parser = argparse.ArgumentParser(description='Kconfig options checker')
     parser.add_argument('files', nargs='*',
                         help='Kconfig files')
@@ -102,5 +103,9 @@ def main():
     return 0
 
 
+def main() -> None:
+    sys.exit(check())
+
+
 if __name__ == '__main__':
-    sys.exit(main())
+    main()

+ 6 - 5
tools/ci/check_executables.py

@@ -6,6 +6,7 @@
 import argparse
 import os
 import sys
+from typing import Iterable, List
 
 try:
     from idf_ci_utils import is_executable
@@ -15,7 +16,7 @@ except ImportError:
     from idf_ci_utils import is_executable
 
 
-def _strip_each_item(iterable):
+def _strip_each_item(iterable: Iterable) -> List:
     res = []
     for item in iterable:
         if item:
@@ -28,7 +29,7 @@ EXECUTABLE_LIST_FN = os.path.join(IDF_PATH, 'tools/ci/executable-list.txt')
 known_executables = _strip_each_item(open(EXECUTABLE_LIST_FN).readlines())
 
 
-def check_executable_list():
+def check_executable_list() -> int:
     ret = 0
     for index, fn in enumerate(known_executables):
         if not os.path.exists(os.path.join(IDF_PATH, fn)):
@@ -37,7 +38,7 @@ def check_executable_list():
     return ret
 
 
-def check_executables(files):
+def check_executables(files: List) -> int:
     ret = 0
     for fn in files:
         fn_executable = is_executable(fn)
@@ -51,7 +52,7 @@ def check_executables(files):
     return ret
 
 
-def main():
+def check() -> int:
     parser = argparse.ArgumentParser()
     parser.add_argument('--action', choices=['executables', 'list'], required=True,
                         help='if "executables", pass all your executables to see if it\'s in the list.'
@@ -70,4 +71,4 @@ def main():
 
 
 if __name__ == '__main__':
-    sys.exit(main())
+    sys.exit(check())

+ 34 - 31
tools/ci/check_public_headers.py

@@ -18,6 +18,7 @@ import subprocess
 import tempfile
 from io import open
 from threading import Event, Thread
+from typing import List, Optional, Set, Tuple, Union
 
 
 class HeaderFailed(Exception):
@@ -26,28 +27,28 @@ class HeaderFailed(Exception):
 
 
 class HeaderFailedSdkconfig(HeaderFailed):
-    def __str__(self):
+    def __str__(self) -> str:
         return 'Sdkconfig Error'
 
 
 class HeaderFailedBuildError(HeaderFailed):
-    def __str__(self):
+    def __str__(self) -> str:
         return 'Header Build Error'
 
 
 class HeaderFailedCppGuardMissing(HeaderFailed):
-    def __str__(self):
+    def __str__(self) -> str:
         return 'Header Missing C++ Guard'
 
 
 class HeaderFailedContainsCode(HeaderFailed):
-    def __str__(self):
+    def __str__(self) -> str:
         return 'Header Produced non-zero object'
 
 
 #   Creates a temp file and returns both output as a string and a file name
 #
-def exec_cmd_to_temp_file(what, suffix=''):
+def exec_cmd_to_temp_file(what: List, suffix: str='') -> Tuple[int, str, str, str, str]:
     out_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
     rc, out, err, cmd = exec_cmd(what, out_file)
     with open(out_file.name, 'r', encoding='utf-8') as f:
@@ -55,12 +56,12 @@ def exec_cmd_to_temp_file(what, suffix=''):
     return rc, out, err, out_file.name, cmd
 
 
-def exec_cmd(what, out_file=subprocess.PIPE):
+def exec_cmd(what: List, out_file: Union[tempfile._TemporaryFileWrapper[bytes], int]=subprocess.PIPE) -> Tuple[int, str, str, str]:
     p = subprocess.Popen(what, stdin=subprocess.PIPE, stdout=out_file, stderr=subprocess.PIPE)
-    output, err = p.communicate()
+    output_b, err_b = p.communicate()
     rc = p.returncode
-    output = output.decode('utf-8') if output is not None else None
-    err = err.decode('utf-8') if err is not None else None
+    output: str = output_b.decode('utf-8') if output is not None else ''
+    err: str = err_b.decode('utf-8') if err is not None else ''
     return rc, output, err, ' '.join(what)
 
 
@@ -74,11 +75,11 @@ class PublicHeaderChecker:
     PREPROC_OUT_DIFFERENT_WITH_EXT_C_HDR_OK = 6    # -> Both preprocessors produce different, non-zero output with extern "C" (header seems OK)
     PREPROC_OUT_DIFFERENT_NO_EXT_C_HDR_FAILED = 7  # -> Both preprocessors produce different, non-zero output without extern "C" (header fails)
 
-    def log(self, message, debug=False):
+    def log(self, message: str, debug: bool=False) -> None:
         if self.verbose or debug:
             print(message)
 
-    def __init__(self, verbose=False, jobs=1, prefix=None):
+    def __init__(self, verbose: bool=False, jobs: int=1, prefix: Optional[str]=None) -> None:
         self.gcc = '{}gcc'.format(prefix)
         self.gpp = '{}g++'.format(prefix)
         self.verbose = verbose
@@ -89,26 +90,26 @@ class PublicHeaderChecker:
         self.error_orphan_kconfig = re.compile(r'#error CONFIG_VARS_USED_WHILE_SDKCONFIG_NOT_INCLUDED')
         self.kconfig_macro = re.compile(r'\bCONFIG_[A-Z0-9_]+')
         self.assembly_nocode = r'^\s*(\.file|\.text|\.ident|\.option|\.attribute).*$'
-        self.check_threads = []
+        self.check_threads: List[Thread] = []
 
-        self.job_queue = queue.Queue()
-        self.failed_queue = queue.Queue()
+        self.job_queue: queue.Queue = queue.Queue()
+        self.failed_queue: queue.Queue = queue.Queue()
         self.terminate = Event()
 
-    def __enter__(self):
+    def __enter__(self) -> 'PublicHeaderChecker':
         for i in range(self.jobs):
             t = Thread(target=self.check_headers, args=(i, ))
             self.check_threads.append(t)
             t.start()
         return self
 
-    def __exit__(self, exc_type, exc_value, traceback):
+    def __exit__(self, exc_type: str, exc_value: str, traceback: str) -> None:
         self.terminate.set()
         for t in self.check_threads:
             t.join()
 
     # thread function process incoming header file from a queue
-    def check_headers(self, num):
+    def check_headers(self, num: int) -> None:
         while not self.terminate.is_set():
             if not self.job_queue.empty():
                 task = self.job_queue.get()
@@ -125,10 +126,10 @@ class PublicHeaderChecker:
                         self.terminate.set()
                         raise
 
-    def get_failed(self):
+    def get_failed(self) -> List:
         return list(self.failed_queue.queue)
 
-    def join(self):
+    def join(self) -> None:
         for t in self.check_threads:
             while t.is_alive() and not self.terminate.is_set():
                 t.join(1)  # joins with timeout to respond to keyboard interrupt
@@ -147,7 +148,7 @@ class PublicHeaderChecker:
     #   - Fail the test if the preprocessor outputs are the same (but with some code)
     #   - If outputs different, pass the test
     # 4) If header passed the steps 1) and 3) test that it produced zero assembly code
-    def check_one_header(self, header, num):
+    def check_one_header(self, header: str, num: int) -> None:
         res = self.preprocess_one_header(header, num)
         if res == self.COMPILE_ERR_REF_CONFIG_HDR_FAILED:
             raise HeaderFailedSdkconfig()
@@ -173,7 +174,7 @@ class PublicHeaderChecker:
                 if temp_header:
                     os.unlink(temp_header)
 
-    def compile_one_header(self, header):
+    def compile_one_header(self, header: str) -> None:
         rc, out, err, cmd = exec_cmd([self.gcc, '-S', '-o-', '-include', header, self.main_c] + self.include_dir_flags)
         if rc == 0:
             if not re.sub(self.assembly_nocode, '', out, flags=re.M).isspace():
@@ -184,7 +185,7 @@ class PublicHeaderChecker:
         self.log('\nCompilation command failed:\n{}\n'.format(cmd), True)
         raise HeaderFailedBuildError()
 
-    def preprocess_one_header(self, header, num, ignore_sdkconfig_issue=False):
+    def preprocess_one_header(self, header: str, num: int, ignore_sdkconfig_issue: bool=False) -> int:
         all_compilation_flags = ['-w', '-P', '-E', '-DESP_PLATFORM', '-include', header, self.main_c] + self.include_dir_flags
         if not ignore_sdkconfig_issue:
             # just strip commnets to check for CONFIG_... macros
@@ -232,8 +233,10 @@ class PublicHeaderChecker:
                 pass
 
     # Get compilation data from an example to list all public header files
-    def list_public_headers(self, ignore_dirs, ignore_files, only_dir=None):
+    def list_public_headers(self, ignore_dirs: List, ignore_files: Union[List, Set], only_dir: str=None) -> None:
         idf_path = os.getenv('IDF_PATH')
+        if idf_path is None:
+            raise RuntimeError("Environment variable 'IDF_PATH' wasn't set.")
         project_dir = os.path.join(idf_path, 'examples', 'get-started', 'blink')
         build_dir = tempfile.mkdtemp()
         sdkconfig = os.path.join(build_dir, 'sdkconfig')
@@ -283,22 +286,22 @@ class PublicHeaderChecker:
         self.include_dir_flags = include_dir_flags
         ignore_files = set(ignore_files)
         # processes public include files, removing ignored files
-        for f in all_include_files:
-            rel_path_file = os.path.relpath(f, idf_path)
+        for file_name in all_include_files:
+            rel_path_file = os.path.relpath(file_name, idf_path)
             if any([os.path.commonprefix([d, rel_path_file]) == d for d in ignore_dirs]):
-                self.log('{} - file ignored (inside ignore dir)'.format(f))
+                self.log('{} - file ignored (inside ignore dir)'.format(file_name))
                 continue
             if rel_path_file in ignore_files:
-                self.log('{} - file ignored'.format(f))
+                self.log('{} - file ignored'.format(file_name))
                 continue
-            files_to_check.append(f)
+            files_to_check.append(file_name)
         # removes duplicates and places headers to a work queue
-        for f in set(files_to_check):
-            self.job_queue.put(f)
+        for file_name in set(files_to_check):
+            self.job_queue.put(file_name)
         self.job_queue.put(None)  # to indicate the last job
 
 
-def check_all_headers():
+def check_all_headers() -> None:
     parser = argparse.ArgumentParser('Public header checker file', formatter_class=argparse.RawDescriptionHelpFormatter, epilog='''\
     Tips for fixing failures reported by this script
     ------------------------------------------------

+ 14 - 10
tools/ci/check_readme_links.py

@@ -16,6 +16,7 @@ import urllib.error
 import urllib.request
 from collections import defaultdict, namedtuple
 from pathlib import Path
+from typing import List
 
 EXCLUDE_DOCS_LIST = ['examples/peripherals/secure_element/atecc608_ecdsa/components/esp-cryptoauthlib/cryptoauthlib/**']
 
@@ -26,28 +27,28 @@ Link = namedtuple('Link', ['file', 'url'])
 
 
 class ReadmeLinkError(Exception):
-    def __init__(self, file, url):
+    def __init__(self, file: str, url: str) -> None:
         self.file = file
         self.url = url
 
 
 class RelativeLinkError(ReadmeLinkError):
-    def __str__(self):
+    def __str__(self) -> str:
         return 'Relative link error, file - {} not found, linked from {}'.format(self.url, self.file)
 
 
 class UrlLinkError(ReadmeLinkError):
-    def __init__(self, file, url, error_code):
+    def __init__(self, file: str, url: str, error_code: str):
         self.error_code = error_code
         super().__init__(file, url)
 
-    def __str__(self):
+    def __str__(self) -> str:
         files = [str(f) for f in self.file]
         return 'URL error, url - {} in files - {} is not accessible, request returned {}'.format(self.url, ', '.join(files), self.error_code)
 
 
 # we do not want a failed test just due to bad network conditions, for non 404 errors we simply print a warning
-def check_url(url, files, timeout):
+def check_url(url: str, files: str, timeout: float) -> None:
     try:
         with urllib.request.urlopen(url, timeout=timeout):
             return
@@ -60,7 +61,7 @@ def check_url(url, files, timeout):
         print('Unable to access {}, err = {}'.format(url, str(e)))
 
 
-def check_web_links(web_links):
+def check_web_links(web_links: defaultdict) -> List:
 
     with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
         errors = []
@@ -74,7 +75,7 @@ def check_web_links(web_links):
         return errors
 
 
-def check_file_links(file_links):
+def check_file_links(file_links: List) -> List:
     errors = []
 
     for link in file_links:
@@ -87,10 +88,13 @@ def check_file_links(file_links):
     return errors
 
 
-def get_md_links(folder):
+def get_md_links(folder: str) -> List:
     MD_LINK_RE = r'\[.+?\]\((.+?)(#.+)?\)'
 
-    idf_path = Path(os.getenv('IDF_PATH'))
+    idf_path_str = os.getenv('IDF_PATH')
+    if idf_path_str is None:
+        raise RuntimeError("Environment variable 'IDF_PATH' wasn't set.")
+    idf_path = Path(idf_path_str)
     links = []
 
     for path in (idf_path / folder).rglob('*.md'):
@@ -110,7 +114,7 @@ def get_md_links(folder):
     return links
 
 
-def check_readme_links(args):
+def check_readme_links(args: argparse.Namespace) -> int:
 
     links = get_md_links('examples')
     print('Found {} links'.format(len(links)))

+ 11 - 11
tools/ci/check_rules_yml.py

@@ -12,7 +12,7 @@ import os
 import re
 import sys
 from copy import deepcopy
-from typing import List
+from typing import Any, Dict, List, Optional, Set, Union
 
 import yaml
 from idf_ci_utils import IDF_PATH
@@ -20,20 +20,20 @@ from idf_ci_utils import IDF_PATH
 ROOT_YML_FP = os.path.join(IDF_PATH, '.gitlab-ci.yml')
 
 
-def load_yaml(file_path):
+def load_yaml(file_path: str) -> Any:
     return yaml.load(open(file_path), Loader=yaml.FullLoader)
 
 
 class YMLConfig:
-    def __init__(self, root_yml_file_path):
-        self._config = None
-        self._all_extends = None
+    def __init__(self, root_yml_file_path: str) -> None:
+        self._config: Optional[Dict] = None
+        self._all_extends: Optional[Set] = None
 
         self.root_yml = load_yaml(root_yml_file_path)
         assert self.root_yml
 
     @staticmethod
-    def _list(str_or_list):
+    def _list(str_or_list: Union[str, List]) -> List:
         if isinstance(str_or_list, str):
             return [str_or_list]
         if isinstance(str_or_list, list):
@@ -43,7 +43,7 @@ class YMLConfig:
         )
 
     @property
-    def config(self):
+    def config(self) -> Dict:
         if self._config:
             return self._config
 
@@ -54,7 +54,7 @@ class YMLConfig:
         return self._config
 
     @property
-    def all_extends(self):
+    def all_extends(self) -> Set:
         if self._all_extends:
             return self._all_extends
 
@@ -67,7 +67,7 @@ class YMLConfig:
         self._all_extends = res
         return self._all_extends
 
-    def exists(self, key):
+    def exists(self, key: str) -> bool:
         if key in self.all_extends:
             return True
         return False
@@ -76,7 +76,7 @@ class YMLConfig:
 YML_CONFIG = YMLConfig(ROOT_YML_FP)
 
 
-def validate_needed_rules(rules_yml):
+def validate_needed_rules(rules_yml: os.PathLike[str]) -> int:
     res = 0
     needed_rules = deepcopy(YML_CONFIG.all_extends)
     with open(rules_yml) as fr:
@@ -114,7 +114,7 @@ def parse_submodule_paths(
     return res
 
 
-def validate_submodule_patterns():
+def validate_submodule_patterns() -> int:
     submodule_paths = sorted(['.gitmodules'] + parse_submodule_paths())
     submodule_paths_in_patterns = sorted(
         YML_CONFIG.config.get('.patterns-submodule', [])

+ 4 - 3
tools/ci/check_tools_files_patterns.py

@@ -7,19 +7,20 @@ import argparse
 import os
 import sys
 from pathlib import Path
+from typing import Set, Tuple
 
 import yaml
 from idf_ci_utils import IDF_PATH, get_git_files
 
 
-def check(pattern_yml, exclude_list):
+def check(pattern_yml: str, exclude_list: str) -> Tuple[Set, Set]:
     rules_dict = yaml.load(open(pattern_yml), Loader=yaml.FullLoader)
     rules_patterns_set = set()
     for k, v in rules_dict.items():
         if k.startswith('.pattern') and k != '.patterns-python-files' and isinstance(v, list):
             rules_patterns_set.update(v)
 
-    rules_files_set = set()
+    rules_files_set: Set = set()
     idf_path = Path(IDF_PATH)
     for pat in rules_patterns_set:
         rules_files_set.update(idf_path.glob(pat))
@@ -30,7 +31,7 @@ def check(pattern_yml, exclude_list):
         if pat:
             exclude_patterns_set.add(pat)
 
-    exclude_files_set = set()
+    exclude_files_set: Set = set()
     for pat in exclude_patterns_set:
         exclude_files_set.update(idf_path.glob(pat))
 

+ 10 - 9
tools/ci/checkout_project_ref.py

@@ -10,27 +10,28 @@ import json
 import os
 import re
 import subprocess
+from typing import List
 
 IDF_GIT_DESCRIBE_PATTERN = re.compile(r'^v(\d)\.(\d)')
 RETRY_COUNT = 3
 
 
-def get_customized_project_revision(proj_name):
+def get_customized_project_revision(proj_name: str) -> str:
     """
     get customized project revision defined in bot message
     """
     revision = ''
-    customized_project_revisions = os.getenv('BOT_CUSTOMIZED_REVISION')
-    if customized_project_revisions:
-        customized_project_revisions = json.loads(customized_project_revisions)
-    try:
-        revision = customized_project_revisions[proj_name.lower()]
-    except (KeyError, TypeError):
-        pass
+    customized_project_revisions_file = os.getenv('BOT_CUSTOMIZED_REVISION')
+    if customized_project_revisions_file:
+        customized_project_revisions = json.loads(customized_project_revisions_file)
+        try:
+            revision = customized_project_revisions[proj_name.lower()]
+        except (KeyError, TypeError):
+            pass
     return revision
 
 
-def target_branch_candidates(proj_name):
+def target_branch_candidates(proj_name: str) -> List:
     """
     :return: a list of target branch candidates, from highest priority to lowest priority.
     """

+ 30 - 29
tools/ci/ci_fetch_submodule.py

@@ -11,6 +11,7 @@ import re
 import shutil
 import subprocess
 import time
+from typing import Any, List
 
 import gitlab_api
 
@@ -28,27 +29,26 @@ class SubModule(object):
 
     GIT_LS_TREE_OUTPUT_PATTERN = re.compile(r'\d+\s+commit\s+([0-9a-f]+)\s+')
 
-    def __init__(self, gitlab_inst, path, url):
+    def __init__(self, gitlab_inst: gitlab_api.Gitlab, path: str, url: str) -> None:
         self.path = path
         self.url = url
         self.gitlab_inst = gitlab_inst
         self.project_id = self._get_project_id(url)
         self.commit_id = self._get_commit_id(path)
 
-    def _get_commit_id(self, path):
-        output = subprocess.check_output(['git', 'ls-tree', 'HEAD', path])
-        output = output.decode()
+    def _get_commit_id(self, path: str) -> str:
+        output = subprocess.check_output(['git', 'ls-tree', 'HEAD', path]).decode()
         # example output: 160000 commit d88a262fbdf35e5abb372280eb08008749c3faa0	components/esp_wifi/lib
         match = self.GIT_LS_TREE_OUTPUT_PATTERN.search(output)
-        return match.group(1)
+        return match.group(1) if match is not None else ''
 
-    def _get_project_id(self, url):
+    def _get_project_id(self, url: str) -> Any:
         base_name = os.path.basename(url)
         project_id = self.gitlab_inst.get_project_id(os.path.splitext(base_name)[0],  # remove .git
                                                      namespace='espressif')
         return project_id
 
-    def download_archive(self):
+    def download_archive(self) -> None:
         print('Update submodule: {}: {}'.format(self.path, self.commit_id))
         path_name = self.gitlab_inst.download_archive(self.commit_id, SUBMODULE_ARCHIVE_TEMP_FOLDER,
                                                       self.project_id, SUBMODULE_ARCHIVE_CACHE_DIR)
@@ -58,33 +58,34 @@ class SubModule(object):
         shutil.move(renamed_path, os.path.dirname(self.path))
 
 
-def update_submodule(git_module_file, submodules_to_update):
+def update_submodule(git_module_file: str, submodules_to_update: List) -> None:
     gitlab_inst = gitlab_api.Gitlab()
     submodules = []
     with open(git_module_file, 'r') as f:
         data = f.read()
     match = SUBMODULE_PATTERN.search(data)
-    while True:
-        next_match = SUBMODULE_PATTERN.search(data, pos=match.end())
-        if next_match:
-            end_pos = next_match.start()
-        else:
-            end_pos = len(data)
-        path_match = PATH_PATTERN.search(data, pos=match.end(), endpos=end_pos)
-        url_match = URL_PATTERN.search(data, pos=match.end(), endpos=end_pos)
-        path = path_match.group(1)
-        url = url_match.group(1)
-
-        filter_result = True
-        if submodules_to_update:
-            if path not in submodules_to_update:
-                filter_result = False
-        if filter_result:
-            submodules.append(SubModule(gitlab_inst, path, url))
-
-        match = next_match
-        if not match:
-            break
+    if match is not None:
+        while True:
+            next_match = SUBMODULE_PATTERN.search(data, pos=match.end())
+            if next_match:
+                end_pos = next_match.start()
+            else:
+                end_pos = len(data)
+            path_match = PATH_PATTERN.search(data, pos=match.end(), endpos=end_pos)
+            url_match = URL_PATTERN.search(data, pos=match.end(), endpos=end_pos)
+            path = path_match.group(1) if path_match is not None else ''
+            url = url_match.group(1) if url_match is not None else ''
+
+            filter_result = True
+            if submodules_to_update:
+                if path not in submodules_to_update:
+                    filter_result = False
+            if filter_result:
+                submodules.append(SubModule(gitlab_inst, path, url))
+
+            match = next_match
+            if not match:
+                break
 
     shutil.rmtree(SUBMODULE_ARCHIVE_TEMP_FOLDER, ignore_errors=True)
 

+ 13 - 11
tools/ci/deploy_docs.py

@@ -14,11 +14,12 @@ import stat
 import subprocess
 import sys
 import tarfile
+from typing import Any, List, Tuple
 
 import packaging.version
 
 
-def env(variable, default=None):
+def env(variable: str, default: str=None) -> str:
     """ Shortcut to return the expanded version of an environment variable """
     return os.path.expandvars(os.environ.get(variable, default) if default else os.environ[variable])
 
@@ -28,7 +29,7 @@ sys.path.append(os.path.join(env('IDF_PATH'), 'docs'))
 from sanitize_version import sanitize_version  # noqa
 
 
-def main():
+def main() -> None:
     # if you get KeyErrors on the following lines, it's probably because you're not running in Gitlab CI
     git_ver = env('GIT_VER')  # output of git describe --always
     ci_ver = env('CI_COMMIT_REF_NAME', git_ver)  # branch or tag we're building for (used for 'release' & URL)
@@ -87,8 +88,8 @@ def main():
         deploy('stable', tarball_path, docs_path, docs_server)
 
 
-def deploy(version, tarball_path, docs_path, docs_server):
-    def run_ssh(commands):
+def deploy(version: str, tarball_path: str, docs_path: str, docs_server: str) -> None:
+    def run_ssh(commands: List) -> None:
         """ Log into docs_server and run a sequence of commands using ssh """
         print('Running ssh: {}'.format(commands))
         subprocess.run(['ssh', '-o', 'BatchMode=yes', docs_server, '-x', ' && '.join(commands)], check=True)
@@ -110,7 +111,7 @@ def deploy(version, tarball_path, docs_path, docs_server):
     # another thing made much more complex by the directory structure putting language before version...
 
 
-def build_doc_tarball(version, git_ver, build_dir):
+def build_doc_tarball(version: str, git_ver: str, build_dir: str) -> Tuple[str, List]:
     """ Make a tar.gz archive of the docs, in the directory structure used to deploy as
         the given version """
     version_paths = []
@@ -126,7 +127,8 @@ def build_doc_tarball(version, git_ver, build_dir):
     # add symlink for stable and latest and adds them to PDF blob
     symlinks = create_and_add_symlinks(version, git_ver, pdfs)
 
-    def not_sources_dir(ti):
+    def not_sources_dir(ti: Any) -> Any:
+        print(type(ti))
         """ Filter the _sources directories out of the tarballs """
         if ti.name.endswith('/_sources'):
             return None
@@ -171,7 +173,7 @@ def build_doc_tarball(version, git_ver, build_dir):
     return (os.path.abspath(tarball_path), version_paths)
 
 
-def create_and_add_symlinks(version, git_ver, pdfs):
+def create_and_add_symlinks(version: str, git_ver: str, pdfs: List) -> List:
     """ Create symbolic links for PDFs for 'latest' and 'stable' releases """
 
     symlinks = []
@@ -187,7 +189,7 @@ def create_and_add_symlinks(version, git_ver, pdfs):
     return symlinks
 
 
-def is_stable_version(version):
+def is_stable_version(version: str) -> bool:
     """ Heuristic for whether this is the latest stable release """
     if not version.startswith('v'):
         return False  # branch name
@@ -197,11 +199,11 @@ def is_stable_version(version):
     git_out = subprocess.check_output(['git', 'tag', '-l']).decode('utf-8')
 
     versions = [v.strip() for v in git_out.split('\n')]
-    versions = [v for v in versions if re.match(r'^v[\d\.]+$', v)]  # include vX.Y.Z only
+    versions = [v for v in versions if re.match(r'^v[\d\.]+$', v.strip())]  # include vX.Y.Z only
 
-    versions = [packaging.version.parse(v) for v in versions]
+    versions_pack = [packaging.version.parse(v) for v in versions]
 
-    max_version = max(versions)
+    max_version = max(versions_pack)
 
     if max_version.public != version[1:]:
         print('Stable version is v{}. This version is {}.'.format(max_version.public, version))

+ 1 - 1
tools/ci/envsubst.py

@@ -9,7 +9,7 @@ import os
 import sys
 
 
-def main():
+def main() -> None:
     # Sanitize environment variables
     vars_to_remove = []
     for var_name in os.environ.keys():

+ 0 - 14
tools/ci/mypy_ignore_list.txt

@@ -141,21 +141,7 @@ examples/wifi/iperf/iperf_test.py
 tools/ble/lib_ble_client.py
 tools/ble/lib_gap.py
 tools/ble/lib_gatt.py
-tools/ci/check_artifacts_expire_time.py
-tools/ci/check_callgraph.py
-tools/ci/check_codeowners.py
-tools/ci/check_deprecated_kconfigs.py
-tools/ci/check_executables.py
 tools/ci/check_kconfigs.py
-tools/ci/check_public_headers.py
-tools/ci/check_readme_links.py
-tools/ci/check_rules_yml.py
-tools/ci/check_tools_files_patterns.py
-tools/ci/checkout_project_ref.py
-tools/ci/ci_fetch_submodule.py
-tools/ci/deploy_docs.py
-tools/ci/envsubst.py
-tools/ci/normalize_clangtidy_path.py
 tools/ci/python_packages/idf_http_server_test/adder.py
 tools/ci/python_packages/idf_http_server_test/client.py
 tools/ci/python_packages/idf_http_server_test/test.py