Przeglądaj źródła

Merge branch 'bugfix/broken_virtualenv' into 'master'

tools: Reinstall virtualenv if it is broken

Closes IDFGH-4859

See merge request espressif/esp-idf!12626
Ivan Grokhotkov 4 lat temu
rodzic
commit
fa995fd92f
2 zmienionych plików z 157 dodań i 129 usunięć
  1. 0 1
      tools/ci/mypy_ignore_list.txt
  2. 157 128
      tools/idf_tools.py

+ 0 - 1
tools/ci/mypy_ignore_list.txt

@@ -239,7 +239,6 @@ tools/idf_py_actions/serial_ext.py
 tools/idf_py_actions/tools.py
 tools/idf_py_actions/uf2_ext.py
 tools/idf_size.py
-tools/idf_tools.py
 tools/idf.py
 tools/kconfig_new/confgen.py
 tools/kconfig_new/confserver.py

+ 157 - 128
tools/idf_tools.py

@@ -55,24 +55,30 @@ import ssl
 import subprocess
 import sys
 import tarfile
-import zipfile
 from collections import OrderedDict, namedtuple
+from ssl import SSLContext  # noqa: F401
+from tarfile import TarFile  # noqa: F401
+from zipfile import ZipFile
 
 try:
-    import typing  # noqa: F401
+    from typing import IO, Callable, Optional, Tuple, Union  # noqa: F401
 except ImportError:
     pass
 
 try:
     from urllib.error import ContentTooShortError
     from urllib.request import urlopen
+    # the following is only for typing annotation
+    from urllib.response import addinfourl  # noqa: F401
 except ImportError:
-    from urllib import ContentTooShortError, urlopen
+    # Python 2
+    from urllib import ContentTooShortError, urlopen  # type: ignore
 
 try:
     from exceptions import WindowsError
 except ImportError:
-    class WindowsError(OSError):
+    # Unix
+    class WindowsError(OSError):  # type: ignore
         pass
 
 
@@ -181,22 +187,22 @@ emyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc=
 
 global_quiet = False
 global_non_interactive = False
-global_idf_path = None  # type: typing.Optional[str]
-global_idf_tools_path = None  # type: typing.Optional[str]
-global_tools_json = None  # type: typing.Optional[str]
+global_idf_path = None  # type: Optional[str]
+global_idf_tools_path = None  # type: Optional[str]
+global_tools_json = None  # type: Optional[str]
 
 
-def fatal(text, *args):
+def fatal(text, *args):  # type: (str, str) -> None
     if not global_quiet:
         sys.stderr.write('ERROR: ' + text + '\n', *args)
 
 
-def warn(text, *args):
+def warn(text, *args):  # type: (str, str) -> None
     if not global_quiet:
         sys.stderr.write('WARNING: ' + text + '\n', *args)
 
 
-def info(text, f=None, *args):
+def info(text, f=None, *args):  # type: (str, Optional[IO[str]], str) -> None
     if not global_quiet:
         if f is None:
             f = sys.stdout
@@ -204,6 +210,7 @@ def info(text, f=None, *args):
 
 
 def run_cmd_check_output(cmd, input_text=None, extra_paths=None):
+    # type: (list[str], Optional[str], Optional[list[str]]) -> bytes
     # If extra_paths is given, locate the executable in one of these directories.
     # Note: it would seem logical to add extra_paths to env[PATH], instead, and let OS do the job of finding the
     # executable for us. However this does not work on Windows: https://bugs.python.org/issue8557.
@@ -223,13 +230,14 @@ def run_cmd_check_output(cmd, input_text=None, extra_paths=None):
                 break
 
     try:
+        input_bytes = None
         if input_text:
-            input_text = input_text.encode()
-        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, input=input_text)
+            input_bytes = input_text.encode()
+        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, input=input_bytes)
         return result.stdout + result.stderr
     except (AttributeError, TypeError):
         p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
-        stdout, stderr = p.communicate(input_text)
+        stdout, stderr = p.communicate(input_bytes)
         if p.returncode != 0:
             try:
                 raise subprocess.CalledProcessError(p.returncode, cmd, stdout, stderr)
@@ -238,7 +246,7 @@ def run_cmd_check_output(cmd, input_text=None, extra_paths=None):
         return stdout + stderr
 
 
-def to_shell_specific_paths(paths_list):
+def to_shell_specific_paths(paths_list):  # type: (list[str]) -> list[str]
     if sys.platform == 'win32':
         paths_list = [p.replace('/', os.path.sep) if os.path.sep in p else p for p in paths_list]
 
@@ -250,7 +258,7 @@ def to_shell_specific_paths(paths_list):
     return paths_list
 
 
-def get_env_for_extra_paths(extra_paths):
+def get_env_for_extra_paths(extra_paths):  # type: (list[str]) -> dict[str, str]
     """
     Return a copy of environment variables dict, prepending paths listed in extra_paths
     to the PATH environment variable.
@@ -258,13 +266,13 @@ def get_env_for_extra_paths(extra_paths):
     env_arg = os.environ.copy()
     new_path = os.pathsep.join(extra_paths) + os.pathsep + env_arg['PATH']
     if sys.version_info.major == 2:
-        env_arg['PATH'] = new_path.encode('utf8')
+        env_arg['PATH'] = new_path.encode('utf8')  # type: ignore
     else:
         env_arg['PATH'] = new_path
     return env_arg
 
 
-def get_file_size_sha256(filename, block_size=65536):
+def get_file_size_sha256(filename, block_size=65536):  # type: (str, int) -> Tuple[int, str]
     sha256 = hashlib.sha256()
     size = 0
     with open(filename, 'rb') as f:
@@ -274,14 +282,14 @@ def get_file_size_sha256(filename, block_size=65536):
     return size, sha256.hexdigest()
 
 
-def report_progress(count, block_size, total_size):
+def report_progress(count, block_size, total_size):  # type: (int, int, int) -> None
     percent = int(count * block_size * 100 / total_size)
     percent = min(100, percent)
     sys.stdout.write('\r%d%%' % percent)
     sys.stdout.flush()
 
 
-def mkdir_p(path):
+def mkdir_p(path):  # type: (str) -> None
     try:
         os.makedirs(path)
     except OSError as exc:
@@ -289,12 +297,12 @@ def mkdir_p(path):
             raise
 
 
-def unpack(filename, destination):
+def unpack(filename, destination):  # type: (str, str) -> None
     info('Extracting {0} to {1}'.format(filename, destination))
     if filename.endswith(('.tar.gz', '.tgz')):
-        archive_obj = tarfile.open(filename, 'r:gz')
+        archive_obj = tarfile.open(filename, 'r:gz')  # type: Union[TarFile, ZipFile]
     elif filename.endswith('zip'):
-        archive_obj = zipfile.ZipFile(filename)
+        archive_obj = ZipFile(filename)
     else:
         raise NotImplementedError('Unsupported archive type')
     if sys.version_info.major == 2:
@@ -304,7 +312,7 @@ def unpack(filename, destination):
     archive_obj.extractall(destination)
 
 
-def splittype(url):
+def splittype(url):  # type: (str) -> Tuple[Optional[str], str]
     match = re.match('([^/:]+):(.*)', url, re.DOTALL)
     if match:
         scheme, data = match.groups()
@@ -314,13 +322,14 @@ def splittype(url):
 
 # An alternative version of urlretrieve which takes SSL context as an argument
 def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None):
+    # type: (str, str, Optional[Callable[[int, int, int], None]], Optional[bytes], Optional[SSLContext]) -> Tuple[str, addinfourl]
     url_type, path = splittype(url)
 
     # urlopen doesn't have context argument in Python <=2.7.9
     extra_urlopen_args = {}
     if context:
         extra_urlopen_args['context'] = context
-    with contextlib.closing(urlopen(url, data, **extra_urlopen_args)) as fp:
+    with contextlib.closing(urlopen(url, data, **extra_urlopen_args)) as fp:  # type: ignore
         headers = fp.info()
 
         # Just return the local path and the "headers" for file://
@@ -364,7 +373,7 @@ def urlretrieve_ctx(url, filename, reporthook=None, data=None, context=None):
 # https://github.com/espressif/esp-idf/issues/3819#issuecomment-515167118
 # https://github.com/espressif/esp-idf/issues/4063#issuecomment-531490140
 # https://stackoverflow.com/a/43046729
-def rename_with_retry(path_from, path_to):
+def rename_with_retry(path_from, path_to):  # type: (str, str) -> None
     if sys.platform.startswith('win'):
         retry_count = 100
     else:
@@ -380,7 +389,7 @@ def rename_with_retry(path_from, path_to):
             warn('Rename {} to {} failed, retrying...'.format(path_from, path_to))
 
 
-def strip_container_dirs(path, levels):
+def strip_container_dirs(path, levels):  # type: (str, int) -> None
     assert levels > 0
     # move the original directory out of the way (add a .tmp suffix)
     tmp_path = path + '.tmp'
@@ -419,7 +428,7 @@ class DownloadError(RuntimeError):
 
 
 class IDFToolDownload(object):
-    def __init__(self, platform_name, url, size, sha256):
+    def __init__(self, platform_name, url, size, sha256):  # type: (str, str, int, str) -> None
         self.platform_name = platform_name
         self.url = url
         self.size = size
@@ -435,13 +444,13 @@ class IDFToolVersion(object):
 
     STATUS_VALUES = [STATUS_RECOMMENDED, STATUS_SUPPORTED, STATUS_DEPRECATED]
 
-    def __init__(self, version, status):
+    def __init__(self, version, status):  # type: (str, str) -> None
         self.version = version
         self.status = status
-        self.downloads = OrderedDict()
+        self.downloads = OrderedDict()  # type: OrderedDict[str, IDFToolDownload]
         self.latest = False
 
-    def __lt__(self, other):
+    def __lt__(self, other):  # type: (IDFToolVersion) -> bool
         if self.status != other.status:
             return self.status > other.status
         else:
@@ -449,13 +458,15 @@ class IDFToolVersion(object):
                         and other.status == IDFToolVersion.STATUS_RECOMMENDED)
             return self.version < other.version
 
-    def __eq__(self, other):
+    def __eq__(self, other):  # type: (object) -> bool
+        if not isinstance(other, IDFToolVersion):
+            return NotImplemented
         return self.status == other.status and self.version == other.version
 
-    def add_download(self, platform_name, url, size, sha256):
+    def add_download(self, platform_name, url, size, sha256):  # type: (str, str, int, str) -> None
         self.downloads[platform_name] = IDFToolDownload(platform_name, url, size, sha256)
 
-    def get_download_for_platform(self, platform_name):  # type: (str) -> IDFToolDownload
+    def get_download_for_platform(self, platform_name):  # type: (str) -> Optional[IDFToolDownload]
         if platform_name in PLATFORM_FROM_NAME.keys():
             platform_name = PLATFORM_FROM_NAME[platform_name]
         if platform_name in self.downloads.keys():
@@ -465,9 +476,10 @@ class IDFToolVersion(object):
         return None
 
     def compatible_with_platform(self, platform_name=PYTHON_PLATFORM):
+        # type: (str) -> bool
         return self.get_download_for_platform(platform_name) is not None
 
-    def get_supported_platforms(self):  # type: () -> typing.Set[str]
+    def get_supported_platforms(self):  # type: () -> set[str]
         return set(self.downloads.keys())
 
 
@@ -481,7 +493,7 @@ OPTIONS_LIST = ['version_cmd',
                 'license',
                 'strip_container_dirs']
 
-IDFToolOptions = namedtuple('IDFToolOptions', OPTIONS_LIST)
+IDFToolOptions = namedtuple('IDFToolOptions', OPTIONS_LIST)  # type: ignore
 
 
 class IDFTool(object):
@@ -492,16 +504,17 @@ class IDFTool(object):
 
     def __init__(self, name, description, install, info_url, license, version_cmd, version_regex, version_regex_replace=None,
                  strip_container_dirs=0):
+        # type: (str, str, str, str, str, list[str], str, Optional[str], int) -> None
         self.name = name
         self.description = description
-        self.versions = OrderedDict()  # type: typing.Dict[str, IDFToolVersion]
-        self.version_in_path = None
-        self.versions_installed = []
+        self.versions = OrderedDict()  # type: dict[str, IDFToolVersion]
+        self.version_in_path = None  # type: Optional[str]
+        self.versions_installed = []  # type: list[str]
         if version_regex_replace is None:
             version_regex_replace = VERSION_REGEX_REPLACE_DEFAULT
         self.options = IDFToolOptions(version_cmd, version_regex, version_regex_replace,
-                                      [], OrderedDict(), install, info_url, license, strip_container_dirs)
-        self.platform_overrides = []
+                                      [], OrderedDict(), install, info_url, license, strip_container_dirs)  # type: ignore
+        self.platform_overrides = []  # type: list[dict[str, str]]
         self._platform = CURRENT_PLATFORM
         self._update_current_options()
 
@@ -511,38 +524,38 @@ class IDFTool(object):
         result._update_current_options()
         return result
 
-    def _update_current_options(self):
+    def _update_current_options(self):  # type: () -> None
         self._current_options = IDFToolOptions(*self.options)
         for override in self.platform_overrides:
             if self._platform not in override['platforms']:
                 continue
             override_dict = override.copy()
             del override_dict['platforms']
-            self._current_options = self._current_options._replace(**override_dict)
+            self._current_options = self._current_options._replace(**override_dict)  # type: ignore
 
-    def add_version(self, version):
+    def add_version(self, version):  # type: (IDFToolVersion) -> None
         assert(type(version) is IDFToolVersion)
         self.versions[version.version] = version
 
     def get_path(self):  # type: () -> str
-        return os.path.join(global_idf_tools_path, 'tools', self.name)
+        return os.path.join(global_idf_tools_path, 'tools', self.name)  # type: ignore
 
     def get_path_for_version(self, version):  # type: (str) -> str
         assert(version in self.versions)
         return os.path.join(self.get_path(), version)
 
-    def get_export_paths(self, version):  # type: (str) -> typing.List[str]
+    def get_export_paths(self, version):  # type: (str) -> list[str]
         tool_path = self.get_path_for_version(version)
-        return [os.path.join(tool_path, *p) for p in self._current_options.export_paths]
+        return [os.path.join(tool_path, *p) for p in self._current_options.export_paths]  # type: ignore
 
-    def get_export_vars(self, version):  # type: (str) -> typing.Dict[str]
+    def get_export_vars(self, version):  # type: (str) -> dict[str, str]
         """
         Get the dictionary of environment variables to be exported, for the given version.
         Expands:
         - ${TOOL_PATH} => the actual path where the version is installed
         """
         result = {}
-        for k, v in self._current_options.export_vars.items():
+        for k, v in self._current_options.export_vars.items():  # type: ignore
             replace_path = self.get_path_for_version(version).replace('\\', '\\\\')
             v_repl = re.sub(SUBST_TOOL_PATH_REGEX, replace_path, v)
             if v_repl != v:
@@ -550,7 +563,7 @@ class IDFTool(object):
             result[k] = v_repl
         return result
 
-    def check_version(self, extra_paths=None):  # type: (typing.Optional[typing.List[str]]) -> str
+    def check_version(self, extra_paths=None):  # type: (Optional[list[str]]) -> str
         """
         Execute the tool, optionally prepending extra_paths to PATH,
         extract the version string and return it as a result.
@@ -561,7 +574,7 @@ class IDFTool(object):
         """
         # this function can not be called for a different platform
         assert self._platform == CURRENT_PLATFORM
-        cmd = self._current_options.version_cmd
+        cmd = self._current_options.version_cmd  # type: ignore
         try:
             version_cmd_result = run_cmd_check_output(cmd, None, extra_paths)
         except OSError:
@@ -569,27 +582,27 @@ class IDFTool(object):
             raise ToolNotFound('Tool {} not found'.format(self.name))
         except subprocess.CalledProcessError as e:
             raise ToolExecError('Command {} has returned non-zero exit code ({})\n'.format(
-                ' '.join(self._current_options.version_cmd), e.returncode))
+                ' '.join(self._current_options.version_cmd), e.returncode))  # type: ignore
 
         in_str = version_cmd_result.decode('utf-8')
-        match = re.search(self._current_options.version_regex, in_str)
+        match = re.search(self._current_options.version_regex, in_str)  # type: ignore
         if not match:
             return UNKNOWN_VERSION
-        return re.sub(self._current_options.version_regex, self._current_options.version_regex_replace, match.group(0))
+        return re.sub(self._current_options.version_regex, self._current_options.version_regex_replace, match.group(0))  # type: ignore
 
-    def get_install_type(self):
-        return self._current_options.install
+    def get_install_type(self):  # type: () -> Callable[[str], None]
+        return self._current_options.install  # type: ignore
 
-    def compatible_with_platform(self):
+    def compatible_with_platform(self):  # type: () -> bool
         return any([v.compatible_with_platform() for v in self.versions.values()])
 
-    def get_supported_platforms(self):  # type: () -> typing.Set[str]
+    def get_supported_platforms(self):  # type: () -> set[str]
         result = set()
         for v in self.versions.values():
             result.update(v.get_supported_platforms())
         return result
 
-    def get_recommended_version(self):
+    def get_recommended_version(self):  # type: () -> Optional[str]
         recommended_versions = [k for k, v in self.versions.items()
                                 if v.status == IDFToolVersion.STATUS_RECOMMENDED
                                 and v.compatible_with_platform(self._platform)]
@@ -598,7 +611,7 @@ class IDFTool(object):
             return recommended_versions[0]
         return None
 
-    def get_preferred_installed_version(self):
+    def get_preferred_installed_version(self):  # type: () -> Optional[str]
         recommended_versions = [k for k in self.versions_installed
                                 if self.versions[k].status == IDFToolVersion.STATUS_RECOMMENDED
                                 and self.versions[k].compatible_with_platform(self._platform)]
@@ -607,7 +620,7 @@ class IDFTool(object):
             return recommended_versions[0]
         return None
 
-    def find_installed_versions(self):
+    def find_installed_versions(self):  # type: () -> None
         """
         Checks whether the tool can be found in PATH and in global_idf_tools_path.
         Writes results to self.version_in_path and self.versions_installed.
@@ -649,7 +662,7 @@ class IDFTool(object):
                 else:
                     self.versions_installed.append(version)
 
-    def download(self, version):
+    def download(self, version):  # type: (str) -> None
         assert(version in self.versions)
         download_obj = self.versions[version].get_download_for_platform(self._platform)
         if not download_obj:
@@ -658,7 +671,7 @@ class IDFTool(object):
 
         url = download_obj.url
         archive_name = os.path.basename(url)
-        local_path = os.path.join(global_idf_tools_path, 'dist', archive_name)
+        local_path = os.path.join(global_idf_tools_path, 'dist', archive_name)  # type: ignore
         mkdir_p(os.path.dirname(local_path))
 
         if os.path.isfile(local_path):
@@ -703,14 +716,14 @@ class IDFTool(object):
             fatal('Failed to download, and retry count has expired')
             raise DownloadError()
 
-    def install(self, version):
+    def install(self, version):  # type: (str) -> None
         # Currently this is called after calling 'download' method, so here are a few asserts
         # for the conditions which should be true once that method is done.
         assert (version in self.versions)
         download_obj = self.versions[version].get_download_for_platform(self._platform)
         assert (download_obj is not None)
         archive_name = os.path.basename(download_obj.url)
-        archive_path = os.path.join(global_idf_tools_path, 'dist', archive_name)
+        archive_path = os.path.join(global_idf_tools_path, 'dist', archive_name)  # type: ignore
         assert (os.path.isfile(archive_path))
         dest_dir = self.get_path_for_version(version)
         if os.path.exists(dest_dir):
@@ -718,11 +731,11 @@ class IDFTool(object):
             shutil.rmtree(dest_dir)
         mkdir_p(dest_dir)
         unpack(archive_path, dest_dir)
-        if self._current_options.strip_container_dirs:
-            strip_container_dirs(dest_dir, self._current_options.strip_container_dirs)
+        if self._current_options.strip_container_dirs:  # type: ignore
+            strip_container_dirs(dest_dir, self._current_options.strip_container_dirs)  # type: ignore
 
     @staticmethod
-    def check_download_file(download_obj, local_path):
+    def check_download_file(download_obj, local_path):  # type: (IDFToolDownload, str) -> bool
         expected_sha256 = download_obj.sha256
         expected_size = download_obj.size
         file_size, file_sha256 = get_file_size_sha256(local_path)
@@ -735,16 +748,16 @@ class IDFTool(object):
         return True
 
     @classmethod
-    def from_json(cls, tool_dict):
+    def from_json(cls, tool_dict):  # type: (dict[str, Union[str, list[str], dict[str, str]]]) -> IDFTool
         # json.load will return 'str' types in Python 3 and 'unicode' in Python 2
         expected_str_type = type(u'')
 
         # Validate json fields
-        tool_name = tool_dict.get('name')
+        tool_name = tool_dict.get('name')  # type: ignore
         if type(tool_name) is not expected_str_type:
             raise RuntimeError('tool_name is not a string')
 
-        description = tool_dict.get('description')
+        description = tool_dict.get('description')  # type: ignore
         if type(description) is not expected_str_type:
             raise RuntimeError('description is not a string')
 
@@ -764,7 +777,7 @@ class IDFTool(object):
         if type(export_paths) is not list:
             raise RuntimeError('export_paths for tool %s is not a list' % tool_name)
 
-        export_vars = tool_dict.get('export_vars', {})
+        export_vars = tool_dict.get('export_vars', {})  # type: ignore
         if type(export_vars) is not dict:
             raise RuntimeError('export_vars for tool %s is not a mapping' % tool_name)
 
@@ -772,15 +785,15 @@ class IDFTool(object):
         if type(versions) is not list:
             raise RuntimeError('versions for tool %s is not an array' % tool_name)
 
-        install = tool_dict.get('install', False)
+        install = tool_dict.get('install', False)  # type: ignore
         if type(install) is not expected_str_type:
             raise RuntimeError('install for tool %s is not a string' % tool_name)
 
-        info_url = tool_dict.get('info_url', False)
+        info_url = tool_dict.get('info_url', False)  # type: ignore
         if type(info_url) is not expected_str_type:
             raise RuntimeError('info_url for tool %s is not a string' % tool_name)
 
-        license = tool_dict.get('license', False)
+        license = tool_dict.get('license', False)  # type: ignore
         if type(license) is not expected_str_type:
             raise RuntimeError('license for tool %s is not a string' % tool_name)
 
@@ -788,67 +801,67 @@ class IDFTool(object):
         if strip_container_dirs and type(strip_container_dirs) is not int:
             raise RuntimeError('strip_container_dirs for tool %s is not an int' % tool_name)
 
-        overrides_list = tool_dict.get('platform_overrides', [])
+        overrides_list = tool_dict.get('platform_overrides', [])  # type: ignore
         if type(overrides_list) is not list:
             raise RuntimeError('platform_overrides for tool %s is not a list' % tool_name)
 
         # Create the object
-        tool_obj = cls(tool_name, description, install, info_url, license,
-                       version_cmd, version_regex, version_regex_replace,
-                       strip_container_dirs)
+        tool_obj = cls(tool_name, description, install, info_url, license,  # type: ignore
+                       version_cmd, version_regex, version_regex_replace,  # type: ignore
+                       strip_container_dirs)  # type: ignore
 
-        for path in export_paths:
-            tool_obj.options.export_paths.append(path)
+        for path in export_paths:  # type: ignore
+            tool_obj.options.export_paths.append(path)  # type: ignore
 
-        for name, value in export_vars.items():
-            tool_obj.options.export_vars[name] = value
+        for name, value in export_vars.items():  # type: ignore
+            tool_obj.options.export_vars[name] = value  # type: ignore
 
         for index, override in enumerate(overrides_list):
-            platforms_list = override.get('platforms')
+            platforms_list = override.get('platforms')  # type: ignore
             if type(platforms_list) is not list:
                 raise RuntimeError('platforms for override %d of tool %s is not a list' % (index, tool_name))
 
-            install = override.get('install')
+            install = override.get('install')  # type: ignore
             if install is not None and type(install) is not expected_str_type:
                 raise RuntimeError('install for override %d of tool %s is not a string' % (index, tool_name))
 
-            version_cmd = override.get('version_cmd')
+            version_cmd = override.get('version_cmd')  # type: ignore
             if version_cmd is not None and type(version_cmd) is not list:
                 raise RuntimeError('version_cmd for override %d of tool %s is not a list of strings' %
                                    (index, tool_name))
 
-            version_regex = override.get('version_regex')
+            version_regex = override.get('version_regex')  # type: ignore
             if version_regex is not None and (type(version_regex) is not expected_str_type or not version_regex):
                 raise RuntimeError('version_regex for override %d of tool %s is not a non-empty string' %
                                    (index, tool_name))
 
-            version_regex_replace = override.get('version_regex_replace')
+            version_regex_replace = override.get('version_regex_replace')  # type: ignore
             if version_regex_replace is not None and type(version_regex_replace) is not expected_str_type:
                 raise RuntimeError('version_regex_replace for override %d of tool %s is not a string' %
                                    (index, tool_name))
 
-            export_paths = override.get('export_paths')
+            export_paths = override.get('export_paths')  # type: ignore
             if export_paths is not None and type(export_paths) is not list:
                 raise RuntimeError('export_paths for override %d of tool %s is not a list' % (index, tool_name))
 
-            export_vars = override.get('export_vars')
+            export_vars = override.get('export_vars')  # type: ignore
             if export_vars is not None and type(export_vars) is not dict:
                 raise RuntimeError('export_vars for override %d of tool %s is not a mapping' % (index, tool_name))
-            tool_obj.platform_overrides.append(override)
+            tool_obj.platform_overrides.append(override)  # type: ignore
 
-        recommended_versions = {}
-        for version_dict in versions:
-            version = version_dict.get('name')
+        recommended_versions = {}  # type: dict[str, list[str]]
+        for version_dict in versions:  # type: ignore
+            version = version_dict.get('name')  # type: ignore
             if type(version) is not expected_str_type:
                 raise RuntimeError('version name for tool {} is not a string'.format(tool_name))
 
-            version_status = version_dict.get('status')
+            version_status = version_dict.get('status')  # type: ignore
             if type(version_status) is not expected_str_type and version_status not in IDFToolVersion.STATUS_VALUES:
                 raise RuntimeError('tool {} version {} status is not one of {}', tool_name, version,
                                    IDFToolVersion.STATUS_VALUES)
 
             version_obj = IDFToolVersion(version, version_status)
-            for platform_id, platform_dict in version_dict.items():
+            for platform_id, platform_dict in version_dict.items():  # type: ignore
                 if platform_id in ['name', 'status']:
                     continue
                 if platform_id not in PLATFORM_FROM_NAME.keys():
@@ -875,7 +888,7 @@ class IDFTool(object):
         tool_obj._update_current_options()
         return tool_obj
 
-    def to_json(self):
+    def to_json(self):  # type: ignore
         versions_array = []
         for version, version_obj in self.versions.items():
             version_json = {
@@ -912,19 +925,19 @@ class IDFTool(object):
         return tool_json
 
 
-def load_tools_info():  # type: () -> typing.Dict[str, IDFTool]
+def load_tools_info():  # type: () -> dict[str, IDFTool]
     """
     Load tools metadata from tools.json, return a dictionary: tool name - tool info
     """
     tool_versions_file_name = global_tools_json
 
-    with open(tool_versions_file_name, 'r') as f:
+    with open(tool_versions_file_name, 'r') as f:  # type: ignore
         tools_info = json.load(f)
 
-    return parse_tools_info_json(tools_info)
+    return parse_tools_info_json(tools_info)  # type: ignore
 
 
-def parse_tools_info_json(tools_info):
+def parse_tools_info_json(tools_info):  # type: ignore
     """
     Parse and validate the dictionary obtained by loading the tools.json file.
     Returns a dictionary of tools (key: tool name, value: IDFTool object).
@@ -945,7 +958,7 @@ def parse_tools_info_json(tools_info):
     return tools_dict
 
 
-def dump_tools_json(tools_info):
+def dump_tools_json(tools_info):  # type: ignore
     tools_array = []
     for tool_name, tool_obj in tools_info.items():
         tool_json = tool_obj.to_json()
@@ -954,10 +967,10 @@ def dump_tools_json(tools_info):
     return json.dumps(file_json, indent=2, separators=(',', ': '), sort_keys=True)
 
 
-def get_python_env_path():
+def get_python_env_path():  # type: () -> Tuple[str, str, str]
     python_ver_major_minor = '{}.{}'.format(sys.version_info.major, sys.version_info.minor)
 
-    version_file_path = os.path.join(global_idf_path, 'version.txt')
+    version_file_path = os.path.join(global_idf_path, 'version.txt')  # type: ignore
     if os.path.exists(version_file_path):
         with open(version_file_path, 'r') as version_file:
             idf_version_str = version_file.read()
@@ -970,12 +983,12 @@ def get_python_env_path():
             idf_version_str = ''
     match = re.match(r'^v([0-9]+\.[0-9]+).*', idf_version_str)
     if match:
-        idf_version = match.group(1)
+        idf_version = match.group(1)  # type: Optional[str]
     else:
         idf_version = None
         # fallback when IDF is a shallow clone
         try:
-            with open(os.path.join(global_idf_path, 'components', 'esp_common', 'include', 'esp_idf_version.h')) as f:
+            with open(os.path.join(global_idf_path, 'components', 'esp_common', 'include', 'esp_idf_version.h')) as f:  # type: ignore
                 m = re.search(r'^#define\s+ESP_IDF_VERSION_MAJOR\s+(\d+).+?^#define\s+ESP_IDF_VERSION_MINOR\s+(\d+)',
                               f.read(), re.DOTALL | re.MULTILINE)
                 if m:
@@ -989,7 +1002,7 @@ def get_python_env_path():
         fatal('IDF version cannot be determined')
         raise SystemExit(1)
 
-    idf_python_env_path = os.path.join(global_idf_tools_path, 'python_env',
+    idf_python_env_path = os.path.join(global_idf_tools_path, 'python_env',  # type: ignore
                                        'idf{}_py{}_env'.format(idf_version, python_ver_major_minor))
 
     if sys.platform == 'win32':
@@ -1005,7 +1018,7 @@ def get_python_env_path():
     return idf_python_env_path, idf_python_export_path, virtualenv_python
 
 
-def action_list(args):
+def action_list(args):  # type: ignore
     tools_info = load_tools_info()
     for name, tool in tools_info.items():
         if tool.get_install_type() == IDFTool.INSTALL_NEVER:
@@ -1017,14 +1030,14 @@ def action_list(args):
         if not versions_for_platform:
             info('  (no versions compatible with platform {})'.format(PYTHON_PLATFORM))
             continue
-        versions_sorted = sorted(versions_for_platform.keys(), key=tool.versions.get, reverse=True)
+        versions_sorted = sorted(versions_for_platform.keys(), key=tool.versions.get, reverse=True)  # type: ignore
         for version in versions_sorted:
             version_obj = tool.versions[version]
             info('  - {} ({}{})'.format(version, version_obj.status,
                                         ', installed' if version in tool.versions_installed else ''))
 
 
-def action_check(args):
+def action_check(args):  # type: ignore
     tools_info = load_tools_info()
     not_found_list = []
     info('Checking for installed tools...')
@@ -1050,7 +1063,7 @@ def action_check(args):
         raise SystemExit(1)
 
 
-def action_export(args):
+def action_export(args):  # type: ignore
     tools_info = load_tools_info()
     all_tools_found = True
     export_vars = {}
@@ -1063,7 +1076,7 @@ def action_export(args):
         if tool.version_in_path:
             if tool.version_in_path not in tool.versions:
                 # unsupported version
-                if args.prefer_system:
+                if args.prefer_system:  # type: ignore
                     warn('using an unsupported version of tool {} found in PATH: {}'.format(
                         tool.name, tool.version_in_path))
                     continue
@@ -1172,12 +1185,12 @@ def action_export(args):
         raise SystemExit(1)
 
 
-def apply_url_mirrors(args, tool_download_obj):
+def apply_url_mirrors(args, tool_download_obj):  # type: ignore
     apply_mirror_prefix_map(args, tool_download_obj)
     apply_github_assets_option(tool_download_obj)
 
 
-def apply_mirror_prefix_map(args, tool_download_obj):
+def apply_mirror_prefix_map(args, tool_download_obj):  # type: ignore
     """Rewrite URL for given tool_obj, given tool_version, and current platform,
        if --mirror-prefix-map flag or IDF_MIRROR_PREFIX_MAP environment variable is given.
     """
@@ -1204,7 +1217,7 @@ def apply_mirror_prefix_map(args, tool_download_obj):
                 break
 
 
-def apply_github_assets_option(tool_download_obj):
+def apply_github_assets_option(tool_download_obj):  # type: ignore
     """ Rewrite URL for given tool_obj if the download URL is an https://github.com/ URL and the variable
     IDF_GITHUB_ASSETS is set. The github.com part of the URL will be replaced.
     """
@@ -1230,7 +1243,7 @@ def apply_github_assets_option(tool_download_obj):
         tool_download_obj.url = new_url
 
 
-def action_download(args):
+def action_download(args):  # type: ignore
     tools_info = load_tools_info()
     tools_spec = args.tools
 
@@ -1277,9 +1290,9 @@ def action_download(args):
         tool_obj.download(tool_version)
 
 
-def action_install(args):
+def action_install(args):  # type: ignore
     tools_info = load_tools_info()
-    tools_spec = args.tools
+    tools_spec = args.tools  # type: ignore
     if not tools_spec or 'required' in tools_spec:
         tools_spec = [k for k, v in tools_info.items() if v.get_install_type() == IDFTool.INSTALL_ALWAYS]
         info('Installing tools: {}'.format(', '.join(tools_spec)))
@@ -1319,7 +1332,7 @@ def action_install(args):
         tool_obj.install(tool_version)
 
 
-def get_wheels_dir():
+def get_wheels_dir():  # type: () -> Optional[str]
     tools_info = load_tools_info()
     wheels_package_name = 'idf-python-wheels'
     if wheels_package_name not in tools_info:
@@ -1334,15 +1347,31 @@ def get_wheels_dir():
     return wheels_dir
 
 
-def action_install_python_env(args):
+def action_install_python_env(args):  # type: ignore
+    reinstall = args.reinstall
     idf_python_env_path, _, virtualenv_python = get_python_env_path()
 
     is_virtualenv = hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix)
-    if is_virtualenv and (not os.path.exists(idf_python_env_path) or args.reinstall):
+    if is_virtualenv and (not os.path.exists(idf_python_env_path) or reinstall):
         fatal('This script was called from a virtual environment, can not create a virtual environment again')
         raise SystemExit(1)
 
-    if args.reinstall and os.path.exists(idf_python_env_path):
+    if os.path.exists(virtualenv_python):
+        try:
+            subprocess.check_call([virtualenv_python, '--version'], stdout=sys.stdout, stderr=sys.stderr)
+        except subprocess.CalledProcessError:
+            # At this point we can reinstall the virtual environment if it is non-functional. This can happen at least
+            # when the Python interpreter was removed which was used to create the virtual environment.
+            reinstall = True
+
+        try:
+            subprocess.check_call([virtualenv_python, '-m', 'pip', '--version'], stdout=sys.stdout, stderr=sys.stderr)
+        except subprocess.CalledProcessError:
+            warn('PIP is not available in the virtual environment.')
+            # Reinstallation of the virtual environment could help if PIP was installed for the main Python
+            reinstall = True
+
+    if reinstall and os.path.exists(idf_python_env_path):
         warn('Removing the existing Python environment in {}'.format(idf_python_env_path))
         shutil.rmtree(idf_python_env_path)
 
@@ -1376,7 +1405,7 @@ def action_install_python_env(args):
     subprocess.check_call(run_args, stdout=sys.stdout, stderr=sys.stderr)
 
 
-def action_add_version(args):
+def action_add_version(args):  # type: ignore
     tools_info = load_tools_info()
     tool_name = args.tool
     tool_obj = tools_info.get(tool_name)
@@ -1420,7 +1449,7 @@ def action_add_version(args):
     info('Wrote output to {}'.format(args.output))
 
 
-def action_rewrite(args):
+def action_rewrite(args):  # type: ignore
     tools_info = load_tools_info()
     json_str = dump_tools_json(tools_info)
     if not args.output:
@@ -1431,7 +1460,7 @@ def action_rewrite(args):
     info('Wrote output to {}'.format(args.output))
 
 
-def action_validate(args):
+def action_validate(args):  # type: ignore
     try:
         import jsonschema
     except ImportError:
@@ -1447,11 +1476,11 @@ def action_validate(args):
     # on failure, this will raise an exception with a fairly verbose diagnostic message
 
 
-def action_gen_doc(args):
+def action_gen_doc(args):  # type: ignore
     f = args.output
     tools_info = load_tools_info()
 
-    def print_out(text):
+    def print_out(text):  # type: (str) -> None
         f.write(text + '\n')
 
     print_out('.. |zwsp| unicode:: U+200B')
@@ -1528,7 +1557,7 @@ More info: {info_url}
     print_out('')
 
 
-def main(argv):
+def main(argv):  # type: (list[str]) -> None
     parser = argparse.ArgumentParser()
 
     parser.add_argument('--quiet', help='Don\'t output diagnostic messages to stdout/stderr', action='store_true')
@@ -1628,7 +1657,7 @@ def main(argv):
 
     if sys.version_info.major == 2:
         try:
-            global_idf_tools_path.decode('ascii')
+            global_idf_tools_path.decode('ascii')  # type: ignore
         except UnicodeDecodeError:
             fatal('IDF_TOOLS_PATH contains non-ASCII characters: {}'.format(global_idf_tools_path) +
                   '\nThis is not supported yet with Python 2. ' +