|
|
@@ -1,24 +1,19 @@
|
|
|
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
-import asyncio
|
|
|
import os
|
|
|
import re
|
|
|
import subprocess
|
|
|
import sys
|
|
|
-from asyncio.subprocess import Process
|
|
|
from io import open
|
|
|
-from types import FunctionType
|
|
|
-from typing import Any, Dict, List, Optional, TextIO, Tuple
|
|
|
+from typing import Any, List
|
|
|
|
|
|
import click
|
|
|
-import yaml
|
|
|
-from idf_monitor_base.output_helpers import yellow_print
|
|
|
|
|
|
from .constants import GENERATORS
|
|
|
from .errors import FatalError
|
|
|
|
|
|
|
|
|
-def executable_exists(args: List) -> bool:
|
|
|
+def executable_exists(args):
|
|
|
try:
|
|
|
subprocess.check_output(args)
|
|
|
return True
|
|
|
@@ -27,7 +22,7 @@ def executable_exists(args: List) -> bool:
|
|
|
return False
|
|
|
|
|
|
|
|
|
-def realpath(path: str) -> str:
|
|
|
+def realpath(path):
|
|
|
"""
|
|
|
Return the cannonical path with normalized case.
|
|
|
|
|
|
@@ -37,7 +32,7 @@ def realpath(path: str) -> str:
|
|
|
return os.path.normcase(os.path.realpath(path))
|
|
|
|
|
|
|
|
|
-def _idf_version_from_cmake() -> Optional[str]:
|
|
|
+def _idf_version_from_cmake():
|
|
|
version_path = os.path.join(os.environ['IDF_PATH'], 'tools/cmake/version.cmake')
|
|
|
regex = re.compile(r'^\s*set\s*\(\s*IDF_VERSION_([A-Z]{5})\s+(\d+)')
|
|
|
ver = {}
|
|
|
@@ -55,17 +50,17 @@ def _idf_version_from_cmake() -> Optional[str]:
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def get_target(path: str, sdkconfig_filename: str='sdkconfig') -> Optional[str]:
|
|
|
+def get_target(path, sdkconfig_filename='sdkconfig'):
|
|
|
path = os.path.join(path, sdkconfig_filename)
|
|
|
return get_sdkconfig_value(path, 'CONFIG_IDF_TARGET')
|
|
|
|
|
|
|
|
|
-def idf_version() -> Optional[str]:
|
|
|
+def idf_version():
|
|
|
"""Print version of ESP-IDF"""
|
|
|
|
|
|
# Try to get version from git:
|
|
|
try:
|
|
|
- version: Optional[str] = subprocess.check_output([
|
|
|
+ version = subprocess.check_output([
|
|
|
'git',
|
|
|
'--git-dir=%s' % os.path.join(os.environ['IDF_PATH'], '.git'),
|
|
|
'--work-tree=%s' % os.environ['IDF_PATH'],
|
|
|
@@ -79,180 +74,56 @@ def idf_version() -> Optional[str]:
|
|
|
return version
|
|
|
|
|
|
|
|
|
-def print_hints(*filenames: str) -> None:
|
|
|
- """Getting output files and printing hints on how to resolve errors based on the output."""
|
|
|
- with open(os.path.join(os.path.dirname(__file__), 'hints.yml'), 'r') as file:
|
|
|
- hints = yaml.safe_load(file)
|
|
|
- for file_name in filenames:
|
|
|
- with open(file_name, 'r') as file:
|
|
|
- output = ' '.join(line.strip() for line in file if line.strip())
|
|
|
- for hint in hints:
|
|
|
- try:
|
|
|
- match = re.compile(hint['re']).findall(output)
|
|
|
- except KeyError:
|
|
|
- raise KeyError("Argument 're' missing in {}. Check hints.yml file.".format(hint))
|
|
|
- except re.error as e:
|
|
|
- raise re.error('{} from hints.yml have {} problem. Check hints.yml file.'.format(hint['re'], e))
|
|
|
- if match:
|
|
|
- extra_info = ', '.join(match) if hint.get('match_to_output', '') else ''
|
|
|
- try:
|
|
|
- yellow_print(' '.join(['HINT:', hint['hint'].format(extra_info)]))
|
|
|
- except KeyError:
|
|
|
- raise KeyError("Argument 'hint' missing in {}. Check hints.yml file.".format(hint))
|
|
|
-
|
|
|
-
|
|
|
-def fit_text_in_terminal(out: str) -> str:
|
|
|
- """Fit text in terminal, if the string is not fit replace center with `...`"""
|
|
|
- space_for_dots = 3 # Space for "..."
|
|
|
- terminal_width, _ = os.get_terminal_size()
|
|
|
- if terminal_width <= space_for_dots:
|
|
|
- # if the wide of the terminal is too small just print dots
|
|
|
- return '.' * terminal_width
|
|
|
- if len(out) >= terminal_width:
|
|
|
- elide_size = (terminal_width - space_for_dots) // 2
|
|
|
- # cut out the middle part of the output if it does not fit in the terminal
|
|
|
- return '...'.join([out[:elide_size], out[len(out) - elide_size:]])
|
|
|
- return out
|
|
|
-
|
|
|
-
|
|
|
-class RunTool:
|
|
|
- def __init__(self, tool_name: str, args: List, cwd: str, env: Dict=None, custom_error_handler: FunctionType=None, build_dir: str=None,
|
|
|
- hints: bool=False, force_progression: bool=False) -> None:
|
|
|
- self.tool_name = tool_name
|
|
|
- self.args = args
|
|
|
- self.cwd = cwd
|
|
|
- self.env = env
|
|
|
- self.custom_error_handler = custom_error_handler
|
|
|
- # build_dir sets by tools that do not use build directory as cwd
|
|
|
- self.build_dir = build_dir or cwd
|
|
|
- self.hints = hints
|
|
|
- self.force_progression = force_progression
|
|
|
-
|
|
|
- def __call__(self) -> None:
|
|
|
- def quote_arg(arg: str) -> str:
|
|
|
- """ Quote the `arg` with whitespace in them because it can cause problems when we call it from a subprocess."""
|
|
|
- if re.match(r"^(?![\'\"]).*\s.*", arg):
|
|
|
- return ''.join(["'", arg, "'"])
|
|
|
- return arg
|
|
|
-
|
|
|
- self.args = [str(arg) for arg in self.args]
|
|
|
- display_args = ' '.join(quote_arg(arg) for arg in self.args)
|
|
|
- print('Running %s in directory %s' % (self.tool_name, quote_arg(self.cwd)))
|
|
|
- print('Executing "%s"...' % str(display_args))
|
|
|
-
|
|
|
- env_copy = dict(os.environ)
|
|
|
- env_copy.update(self.env or {})
|
|
|
-
|
|
|
- process, stderr_output_file, stdout_output_file = asyncio.run(self.run_command(self.args, env_copy))
|
|
|
- if process.returncode == 0:
|
|
|
- return
|
|
|
-
|
|
|
- if self.custom_error_handler:
|
|
|
- self.custom_error_handler(process.returncode, stderr_output_file, stdout_output_file)
|
|
|
- return
|
|
|
-
|
|
|
- if stderr_output_file and stdout_output_file:
|
|
|
- print_hints(stderr_output_file, stdout_output_file)
|
|
|
- raise FatalError('{} failed with exit code {}, output of the command is in the {} and {}'.format(self.tool_name, process.returncode,
|
|
|
- stderr_output_file, stdout_output_file))
|
|
|
-
|
|
|
- raise FatalError('{} failed with exit code {}'.format(self.tool_name, process.returncode))
|
|
|
-
|
|
|
- async def run_command(self, cmd: List, env_copy: Dict) -> Tuple[Process, Optional[str], Optional[str]]:
|
|
|
- """ Run the `cmd` command with capturing stderr and stdout from that function and return returncode
|
|
|
- and of the command, the id of the process, paths to captured output """
|
|
|
- if not self.hints:
|
|
|
- p = await asyncio.create_subprocess_exec(*cmd, env=env_copy, cwd=self.cwd)
|
|
|
- await p.wait() # added for avoiding None returncode
|
|
|
- return p, None, None
|
|
|
- log_dir_name = 'log'
|
|
|
- try:
|
|
|
- os.mkdir(os.path.join(self.build_dir, log_dir_name))
|
|
|
- except FileExistsError:
|
|
|
- pass
|
|
|
+def run_tool(tool_name, args, cwd, env=dict(), custom_error_handler=None):
|
|
|
+ def quote_arg(arg):
|
|
|
+ " Quote 'arg' if necessary "
|
|
|
+ if ' ' in arg and not (arg.startswith('"') or arg.startswith("'")):
|
|
|
+ return "'" + arg + "'"
|
|
|
+ return arg
|
|
|
+
|
|
|
+ args = [str(arg) for arg in args]
|
|
|
+ display_args = ' '.join(quote_arg(arg) for arg in args)
|
|
|
+ print('Running %s in directory %s' % (tool_name, quote_arg(cwd)))
|
|
|
+ print('Executing "%s"...' % str(display_args))
|
|
|
+
|
|
|
+ env_copy = dict(os.environ)
|
|
|
+ env_copy.update(env)
|
|
|
+
|
|
|
+ if sys.version_info[0] < 3:
|
|
|
+ # The subprocess lib cannot accept environment variables as "unicode". Convert to str.
|
|
|
+ # This encoding step is required only in Python 2.
|
|
|
+ for (key, val) in env_copy.items():
|
|
|
+ if not isinstance(val, str):
|
|
|
+ env_copy[key] = val.encode(sys.getfilesystemencoding() or 'utf-8')
|
|
|
+
|
|
|
+ try:
|
|
|
# Note: we explicitly pass in os.environ here, as we may have set IDF_PATH there during startup
|
|
|
- # limit was added for avoiding error in idf.py confserver
|
|
|
- p = await asyncio.create_subprocess_exec(*cmd, env=env_copy, limit=1024 * 128, cwd=self.cwd, stdout=asyncio.subprocess.PIPE,
|
|
|
- stderr=asyncio.subprocess.PIPE, )
|
|
|
- stderr_output_file = os.path.join(self.build_dir, log_dir_name, f'idf_py_stderr_output_{p.pid}')
|
|
|
- stdout_output_file = os.path.join(self.build_dir, log_dir_name, f'idf_py_stdout_output_{p.pid}')
|
|
|
- if p.stderr and p.stdout: # it only to avoid None type in p.std
|
|
|
- await asyncio.gather(
|
|
|
- self.read_and_write_stream(p.stderr, stderr_output_file, sys.stderr),
|
|
|
- self.read_and_write_stream(p.stdout, stdout_output_file))
|
|
|
- await p.wait() # added for avoiding None returncode
|
|
|
- return p, stderr_output_file, stdout_output_file
|
|
|
-
|
|
|
- async def read_and_write_stream(self, input_stream: asyncio.StreamReader, output_filename: str,
|
|
|
- output_stream: TextIO=sys.stdout) -> None:
|
|
|
- """read the output of the `input_stream` and then write it into `output_filename` and `output_stream`"""
|
|
|
- def delete_ansi_escape(text: str) -> str:
|
|
|
- ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
|
- return ansi_escape.sub('', text)
|
|
|
-
|
|
|
- def prepare_for_print(out: bytes) -> str:
|
|
|
- # errors='ignore' is here because some chips produce some garbage bytes
|
|
|
- result = out.decode(errors='ignore')
|
|
|
- if not output_stream.isatty():
|
|
|
- # delete escape sequence if we printing in environments where ANSI coloring is disabled
|
|
|
- return delete_ansi_escape(result)
|
|
|
- return result
|
|
|
-
|
|
|
- def print_progression(output: str) -> None:
|
|
|
- # Print a new line on top of the previous line
|
|
|
- sys.stdout.write('\x1b[K')
|
|
|
- print('\r', end='')
|
|
|
- print(fit_text_in_terminal(output.strip('\n\r')), end='', file=output_stream)
|
|
|
+ subprocess.check_call(args, env=env_copy, cwd=cwd)
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
+ if custom_error_handler:
|
|
|
+ custom_error_handler(e)
|
|
|
+ else:
|
|
|
+ raise FatalError('%s failed with exit code %d' % (tool_name, e.returncode))
|
|
|
|
|
|
- try:
|
|
|
- with open(output_filename, 'w') as output_file:
|
|
|
- while True:
|
|
|
- out = await input_stream.readline()
|
|
|
- if not out:
|
|
|
- break
|
|
|
- output = prepare_for_print(out)
|
|
|
- output_file.write(output)
|
|
|
-
|
|
|
- # print output in progression way but only the progression related (that started with '[') and if verbose flag is not set
|
|
|
- if self.force_progression and output[0] == '[' and '-v' not in self.args:
|
|
|
- print_progression(output)
|
|
|
- else:
|
|
|
- print(output, end='', file=output_stream)
|
|
|
- except (RuntimeError, EnvironmentError) as e:
|
|
|
- yellow_print('WARNING: The exception {} was raised and we can\'t capture all your {} and '
|
|
|
- 'hints on how to resolve errors can be not accurate.'.format(e, output_stream.name.strip('<>')))
|
|
|
-
|
|
|
-
|
|
|
-def run_tool(*args: Any, **kwargs: Any) -> None:
|
|
|
- # Added in case some one use run_tool externally in a idf.py extensions
|
|
|
- return RunTool(*args, **kwargs)()
|
|
|
-
|
|
|
-
|
|
|
-def run_target(target_name: str, args: 'PropertyDict', env: Optional[Dict]=None,
|
|
|
- custom_error_handler: FunctionType=None, force_progression: bool=False, hints: bool=False) -> None:
|
|
|
- """Run target in build directory."""
|
|
|
- if env is None:
|
|
|
- env = {}
|
|
|
|
|
|
+def run_target(target_name, args, env=dict(), custom_error_handler=None):
|
|
|
generator_cmd = GENERATORS[args.generator]['command']
|
|
|
- env.update(GENERATORS[args.generator]['envvar'])
|
|
|
|
|
|
if args.verbose:
|
|
|
generator_cmd += [GENERATORS[args.generator]['verbose_flag']]
|
|
|
|
|
|
- RunTool(generator_cmd[0], generator_cmd + [target_name], args.build_dir, env, custom_error_handler, hints=hints,
|
|
|
- force_progression=force_progression)()
|
|
|
+ run_tool(generator_cmd[0], generator_cmd + [target_name], args.build_dir, env, custom_error_handler)
|
|
|
|
|
|
|
|
|
-def _strip_quotes(value: str, regexp: re.Pattern=re.compile(r"^\"(.*)\"$|^'(.*)'$|^(.*)$")) -> Optional[str]:
|
|
|
+def _strip_quotes(value, regexp=re.compile(r"^\"(.*)\"$|^'(.*)'$|^(.*)$")):
|
|
|
"""
|
|
|
Strip quotes like CMake does during parsing cache entries
|
|
|
"""
|
|
|
- matching_values = regexp.match(value)
|
|
|
- return [x for x in matching_values.groups() if x is not None][0].rstrip() if matching_values is not None else None
|
|
|
+
|
|
|
+ return [x for x in regexp.match(value).groups() if x is not None][0].rstrip()
|
|
|
|
|
|
|
|
|
-def _parse_cmakecache(path: str) -> Dict:
|
|
|
+def _parse_cmakecache(path):
|
|
|
"""
|
|
|
Parse the CMakeCache file at 'path'.
|
|
|
|
|
|
@@ -271,7 +142,7 @@ def _parse_cmakecache(path: str) -> Dict:
|
|
|
return result
|
|
|
|
|
|
|
|
|
-def _new_cmakecache_entries(cache_path: str, new_cache_entries: List) -> bool:
|
|
|
+def _new_cmakecache_entries(cache_path, new_cache_entries):
|
|
|
if not os.path.exists(cache_path):
|
|
|
return True
|
|
|
|
|
|
@@ -287,7 +158,7 @@ def _new_cmakecache_entries(cache_path: str, new_cache_entries: List) -> bool:
|
|
|
return False
|
|
|
|
|
|
|
|
|
-def _detect_cmake_generator(prog_name: str) -> Any:
|
|
|
+def _detect_cmake_generator(prog_name):
|
|
|
"""
|
|
|
Find the default cmake generator, if none was specified. Raises an exception if no valid generator is found.
|
|
|
"""
|
|
|
@@ -297,7 +168,7 @@ def _detect_cmake_generator(prog_name: str) -> Any:
|
|
|
raise FatalError("To use %s, either the 'ninja' or 'GNU make' build tool must be available in the PATH" % prog_name)
|
|
|
|
|
|
|
|
|
-def ensure_build_directory(args: 'PropertyDict', prog_name: str, always_run_cmake: bool=False) -> None:
|
|
|
+def ensure_build_directory(args, prog_name, always_run_cmake=False):
|
|
|
"""Check the build directory exists and that cmake has been run there.
|
|
|
|
|
|
If this isn't the case, create the build directory (if necessary) and
|
|
|
@@ -349,8 +220,7 @@ def ensure_build_directory(args: 'PropertyDict', prog_name: str, always_run_cmak
|
|
|
cmake_args += ['-D' + d for d in args.define_cache_entry]
|
|
|
cmake_args += [project_dir]
|
|
|
|
|
|
- hints = not args.no_hints
|
|
|
- RunTool('cmake', cmake_args, cwd=args.build_dir, hints=hints)()
|
|
|
+ run_tool('cmake', cmake_args, cwd=args.build_dir)
|
|
|
except Exception:
|
|
|
# don't allow partially valid CMakeCache.txt files,
|
|
|
# to keep the "should I run cmake?" logic simple
|
|
|
@@ -381,8 +251,8 @@ def ensure_build_directory(args: 'PropertyDict', prog_name: str, always_run_cmak
|
|
|
pass # if cmake failed part way, CMAKE_HOME_DIRECTORY may not be set yet
|
|
|
|
|
|
|
|
|
-def merge_action_lists(*action_lists: Dict) -> Dict:
|
|
|
- merged_actions: Dict = {
|
|
|
+def merge_action_lists(*action_lists):
|
|
|
+ merged_actions = {
|
|
|
'global_options': [],
|
|
|
'actions': {},
|
|
|
'global_action_callbacks': [],
|
|
|
@@ -394,7 +264,7 @@ def merge_action_lists(*action_lists: Dict) -> Dict:
|
|
|
return merged_actions
|
|
|
|
|
|
|
|
|
-def get_sdkconfig_value(sdkconfig_file: str, key: str) -> Optional[str]:
|
|
|
+def get_sdkconfig_value(sdkconfig_file, key):
|
|
|
"""
|
|
|
Return the value of given key from sdkconfig_file.
|
|
|
If sdkconfig_file does not exist or the option is not present, returns None.
|
|
|
@@ -414,14 +284,14 @@ def get_sdkconfig_value(sdkconfig_file: str, key: str) -> Optional[str]:
|
|
|
return value
|
|
|
|
|
|
|
|
|
-def is_target_supported(project_path: str, supported_targets: List) -> bool:
|
|
|
+def is_target_supported(project_path, supported_targets):
|
|
|
"""
|
|
|
Returns True if the active target is supported, or False otherwise.
|
|
|
"""
|
|
|
return get_target(project_path) in supported_targets
|
|
|
|
|
|
|
|
|
-def _guess_or_check_idf_target(args: 'PropertyDict', prog_name: str, cache: Dict) -> None:
|
|
|
+def _guess_or_check_idf_target(args, prog_name, cache):
|
|
|
"""
|
|
|
If CMakeCache.txt doesn't exist, and IDF_TARGET is not set in the environment, guess the value from
|
|
|
sdkconfig or sdkconfig.defaults, and pass it to CMake in IDF_TARGET variable.
|