|
|
@@ -9,7 +9,7 @@
|
|
|
# - If core dump output is detected, it is converted to a human-readable report
|
|
|
# by espcoredump.py.
|
|
|
#
|
|
|
-# Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD
|
|
|
+# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
@@ -29,6 +29,7 @@
|
|
|
#
|
|
|
# Originally released under BSD-3-Clause license.
|
|
|
#
|
|
|
+
|
|
|
from __future__ import division, print_function, unicode_literals
|
|
|
|
|
|
import argparse
|
|
|
@@ -37,457 +38,45 @@ import datetime
|
|
|
import os
|
|
|
import re
|
|
|
import subprocess
|
|
|
-from builtins import bytes, chr, object
|
|
|
+import threading
|
|
|
+import time
|
|
|
+from builtins import bytes, object
|
|
|
+from typing import BinaryIO, Callable, List, Optional, Union
|
|
|
+
|
|
|
+import serial.tools.miniterm as miniterm
|
|
|
+from idf_monitor_base import (COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, COREDUMP_DONE, COREDUMP_IDLE,
|
|
|
+ COREDUMP_READING, COREDUMP_UART_END, COREDUMP_UART_PROMPT, COREDUMP_UART_START,
|
|
|
+ DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR, PANIC_DECODE_BACKTRACE,
|
|
|
+ PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, PANIC_READING, PANIC_STACK_DUMP,
|
|
|
+ PANIC_START)
|
|
|
+from idf_monitor_base.chip_specific_config import get_chip_config
|
|
|
+from idf_monitor_base.console_parser import ConsoleParser
|
|
|
+from idf_monitor_base.console_reader import ConsoleReader
|
|
|
+from idf_monitor_base.constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET,
|
|
|
+ CMD_STOP, CMD_TOGGLE_LOGGING, CTRL_H, CTRL_T, TAG_CMD, TAG_KEY, TAG_SERIAL,
|
|
|
+ TAG_SERIAL_FLUSH)
|
|
|
+from idf_monitor_base.exceptions import SerialStopException
|
|
|
+from idf_monitor_base.line_matcher import LineMatcher
|
|
|
+from idf_monitor_base.output_helpers import normal_print, red_print, yellow_print
|
|
|
+from idf_monitor_base.serial_reader import SerialReader
|
|
|
+from idf_monitor_base.web_socket_client import WebSocketClient
|
|
|
|
|
|
try:
|
|
|
- import queue
|
|
|
+ import queue # noqa
|
|
|
except ImportError:
|
|
|
- import Queue as queue
|
|
|
+ import Queue as queue # type: ignore # noqa
|
|
|
|
|
|
-import ctypes
|
|
|
-import json
|
|
|
import shlex
|
|
|
import sys
|
|
|
import tempfile
|
|
|
-import textwrap
|
|
|
-import threading
|
|
|
-import time
|
|
|
-import types
|
|
|
-from distutils.version import StrictVersion
|
|
|
-from io import open
|
|
|
|
|
|
import serial
|
|
|
import serial.tools.list_ports
|
|
|
-import serial.tools.miniterm as miniterm
|
|
|
-
|
|
|
-try:
|
|
|
- import websocket
|
|
|
-except ImportError:
|
|
|
- # This is needed for IDE integration only.
|
|
|
- pass
|
|
|
+# Windows console stuff
|
|
|
+from idf_monitor_base.ansi_color_convertor import ANSIColorConverter
|
|
|
|
|
|
key_description = miniterm.key_description
|
|
|
|
|
|
-# Control-key characters
|
|
|
-CTRL_A = '\x01'
|
|
|
-CTRL_B = '\x02'
|
|
|
-CTRL_F = '\x06'
|
|
|
-CTRL_H = '\x08'
|
|
|
-CTRL_R = '\x12'
|
|
|
-CTRL_T = '\x14'
|
|
|
-CTRL_Y = '\x19'
|
|
|
-CTRL_P = '\x10'
|
|
|
-CTRL_X = '\x18'
|
|
|
-CTRL_L = '\x0c'
|
|
|
-CTRL_RBRACKET = '\x1d' # Ctrl+]
|
|
|
-
|
|
|
-# Command parsed from console inputs
|
|
|
-CMD_STOP = 1
|
|
|
-CMD_RESET = 2
|
|
|
-CMD_MAKE = 3
|
|
|
-CMD_APP_FLASH = 4
|
|
|
-CMD_OUTPUT_TOGGLE = 5
|
|
|
-CMD_TOGGLE_LOGGING = 6
|
|
|
-CMD_ENTER_BOOT = 7
|
|
|
-
|
|
|
-# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
|
|
|
-ANSI_RED = '\033[1;31m'
|
|
|
-ANSI_YELLOW = '\033[0;33m'
|
|
|
-ANSI_NORMAL = '\033[0m'
|
|
|
-
|
|
|
-
|
|
|
-def color_print(message, color, newline='\n'):
|
|
|
- """ Print a message to stderr with colored highlighting """
|
|
|
- sys.stderr.write('%s%s%s%s' % (color, message, ANSI_NORMAL, newline))
|
|
|
-
|
|
|
-
|
|
|
-def yellow_print(message, newline='\n'):
|
|
|
- color_print(message, ANSI_YELLOW, newline)
|
|
|
-
|
|
|
-
|
|
|
-def red_print(message, newline='\n'):
|
|
|
- color_print(message, ANSI_RED, newline)
|
|
|
-
|
|
|
-
|
|
|
-__version__ = '1.1'
|
|
|
-
|
|
|
-# Tags for tuples in queues
|
|
|
-TAG_KEY = 0
|
|
|
-TAG_SERIAL = 1
|
|
|
-TAG_SERIAL_FLUSH = 2
|
|
|
-TAG_CMD = 3
|
|
|
-
|
|
|
-# regex matches an potential PC value (0x4xxxxxxx)
|
|
|
-MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
|
|
|
-
|
|
|
-DEFAULT_TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
|
|
|
-
|
|
|
-DEFAULT_PRINT_FILTER = ''
|
|
|
-
|
|
|
-# coredump related messages
|
|
|
-COREDUMP_UART_START = b'================= CORE DUMP START ================='
|
|
|
-COREDUMP_UART_END = b'================= CORE DUMP END ================='
|
|
|
-COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...'
|
|
|
-
|
|
|
-# coredump states
|
|
|
-COREDUMP_IDLE = 0
|
|
|
-COREDUMP_READING = 1
|
|
|
-COREDUMP_DONE = 2
|
|
|
-
|
|
|
-# coredump decoding options
|
|
|
-COREDUMP_DECODE_DISABLE = 'disable'
|
|
|
-COREDUMP_DECODE_INFO = 'info'
|
|
|
-
|
|
|
-# panic handler related messages
|
|
|
-PANIC_START = r'Core \s*\d+ register dump:'
|
|
|
-PANIC_END = b'ELF file SHA256:'
|
|
|
-PANIC_STACK_DUMP = b'Stack memory:'
|
|
|
-
|
|
|
-# panic handler decoding states
|
|
|
-PANIC_IDLE = 0
|
|
|
-PANIC_READING = 1
|
|
|
-
|
|
|
-# panic handler decoding options
|
|
|
-PANIC_DECODE_DISABLE = 'disable'
|
|
|
-PANIC_DECODE_BACKTRACE = 'backtrace'
|
|
|
-
|
|
|
-
|
|
|
-class StoppableThread(object):
|
|
|
- """
|
|
|
- Provide a Thread-like class which can be 'cancelled' via a subclass-provided
|
|
|
- cancellation method.
|
|
|
-
|
|
|
- Can be started and stopped multiple times.
|
|
|
-
|
|
|
- Isn't an instance of type Thread because Python Thread objects can only be run once
|
|
|
- """
|
|
|
- def __init__(self):
|
|
|
- self._thread = None
|
|
|
-
|
|
|
- @property
|
|
|
- def alive(self):
|
|
|
- """
|
|
|
- Is 'alive' whenever the internal thread object exists
|
|
|
- """
|
|
|
- return self._thread is not None
|
|
|
-
|
|
|
- def start(self):
|
|
|
- if self._thread is None:
|
|
|
- self._thread = threading.Thread(target=self._run_outer)
|
|
|
- self._thread.start()
|
|
|
-
|
|
|
- def _cancel(self):
|
|
|
- pass # override to provide cancellation functionality
|
|
|
-
|
|
|
- def run(self):
|
|
|
- pass # override for the main thread behaviour
|
|
|
-
|
|
|
- def _run_outer(self):
|
|
|
- try:
|
|
|
- self.run()
|
|
|
- finally:
|
|
|
- self._thread = None
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- if self._thread is not None:
|
|
|
- old_thread = self._thread
|
|
|
- self._thread = None
|
|
|
- self._cancel()
|
|
|
- old_thread.join()
|
|
|
-
|
|
|
-
|
|
|
-class ConsoleReader(StoppableThread):
|
|
|
- """ Read input keys from the console and push them to the queue,
|
|
|
- until stopped.
|
|
|
- """
|
|
|
- def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
|
|
|
- super(ConsoleReader, self).__init__()
|
|
|
- self.console = console
|
|
|
- self.event_queue = event_queue
|
|
|
- self.cmd_queue = cmd_queue
|
|
|
- self.parser = parser
|
|
|
- self.test_mode = test_mode
|
|
|
-
|
|
|
- def run(self):
|
|
|
- self.console.setup()
|
|
|
- try:
|
|
|
- while self.alive:
|
|
|
- try:
|
|
|
- if os.name == 'nt':
|
|
|
- # Windows kludge: because the console.cancel() method doesn't
|
|
|
- # seem to work to unblock getkey() on the Windows implementation.
|
|
|
- #
|
|
|
- # So we only call getkey() if we know there's a key waiting for us.
|
|
|
- import msvcrt
|
|
|
- while not msvcrt.kbhit() and self.alive:
|
|
|
- time.sleep(0.1)
|
|
|
- if not self.alive:
|
|
|
- break
|
|
|
- elif self.test_mode:
|
|
|
- # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
|
|
|
- # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
|
|
|
- # Therefore, we avoid calling it.
|
|
|
- while self.alive:
|
|
|
- time.sleep(0.1)
|
|
|
- break
|
|
|
- c = self.console.getkey()
|
|
|
- except KeyboardInterrupt:
|
|
|
- c = '\x03'
|
|
|
- if c is not None:
|
|
|
- ret = self.parser.parse(c)
|
|
|
- if ret is not None:
|
|
|
- (tag, cmd) = ret
|
|
|
- # stop command should be executed last
|
|
|
- if tag == TAG_CMD and cmd != CMD_STOP:
|
|
|
- self.cmd_queue.put(ret)
|
|
|
- else:
|
|
|
- self.event_queue.put(ret)
|
|
|
-
|
|
|
- finally:
|
|
|
- self.console.cleanup()
|
|
|
-
|
|
|
- def _cancel(self):
|
|
|
- if os.name == 'posix' and not self.test_mode:
|
|
|
- # this is the way cancel() is implemented in pyserial 3.3 or newer,
|
|
|
- # older pyserial (3.1+) has cancellation implemented via 'select',
|
|
|
- # which does not work when console sends an escape sequence response
|
|
|
- #
|
|
|
- # even older pyserial (<3.1) does not have this method
|
|
|
- #
|
|
|
- # on Windows there is a different (also hacky) fix, applied above.
|
|
|
- #
|
|
|
- # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
|
|
|
- # TODO: introduce some workaround to make it work there.
|
|
|
- #
|
|
|
- # Note: This would throw exception in testing mode when the stdin is connected to PTY.
|
|
|
- import fcntl
|
|
|
- import termios
|
|
|
- fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
|
|
|
-
|
|
|
-
|
|
|
-class ConsoleParser(object):
|
|
|
-
|
|
|
- def __init__(self, eol='CRLF'):
|
|
|
- self.translate_eol = {
|
|
|
- 'CRLF': lambda c: c.replace('\n', '\r\n'),
|
|
|
- 'CR': lambda c: c.replace('\n', '\r'),
|
|
|
- 'LF': lambda c: c.replace('\r', '\n'),
|
|
|
- }[eol]
|
|
|
- self.menu_key = CTRL_T
|
|
|
- self.exit_key = CTRL_RBRACKET
|
|
|
- self._pressed_menu_key = False
|
|
|
-
|
|
|
- def parse(self, key):
|
|
|
- ret = None
|
|
|
- if self._pressed_menu_key:
|
|
|
- ret = self._handle_menu_key(key)
|
|
|
- elif key == self.menu_key:
|
|
|
- self._pressed_menu_key = True
|
|
|
- elif key == self.exit_key:
|
|
|
- ret = (TAG_CMD, CMD_STOP)
|
|
|
- else:
|
|
|
- key = self.translate_eol(key)
|
|
|
- ret = (TAG_KEY, key)
|
|
|
- return ret
|
|
|
-
|
|
|
- def _handle_menu_key(self, c):
|
|
|
- ret = None
|
|
|
- if c == self.exit_key or c == self.menu_key: # send verbatim
|
|
|
- ret = (TAG_KEY, c)
|
|
|
- elif c in [CTRL_H, 'h', 'H', '?']:
|
|
|
- red_print(self.get_help_text())
|
|
|
- elif c == CTRL_R: # Reset device via RTS
|
|
|
- ret = (TAG_CMD, CMD_RESET)
|
|
|
- elif c == CTRL_F: # Recompile & upload
|
|
|
- ret = (TAG_CMD, CMD_MAKE)
|
|
|
- elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
|
|
|
- # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
|
|
|
- # instead
|
|
|
- ret = (TAG_CMD, CMD_APP_FLASH)
|
|
|
- elif c == CTRL_Y: # Toggle output display
|
|
|
- ret = (TAG_CMD, CMD_OUTPUT_TOGGLE)
|
|
|
- elif c == CTRL_L: # Toggle saving output into file
|
|
|
- ret = (TAG_CMD, CMD_TOGGLE_LOGGING)
|
|
|
- elif c == CTRL_P:
|
|
|
- yellow_print('Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart')
|
|
|
- # to fast trigger pause without press menu key
|
|
|
- ret = (TAG_CMD, CMD_ENTER_BOOT)
|
|
|
- elif c in [CTRL_X, 'x', 'X']: # Exiting from within the menu
|
|
|
- ret = (TAG_CMD, CMD_STOP)
|
|
|
- else:
|
|
|
- red_print('--- unknown menu character {} --'.format(key_description(c)))
|
|
|
-
|
|
|
- self._pressed_menu_key = False
|
|
|
- return ret
|
|
|
-
|
|
|
- def get_help_text(self):
|
|
|
- text = """\
|
|
|
- --- idf_monitor ({version}) - ESP-IDF monitor tool
|
|
|
- --- based on miniterm from pySerial
|
|
|
- ---
|
|
|
- --- {exit:8} Exit program
|
|
|
- --- {menu:8} Menu escape key, followed by:
|
|
|
- --- Menu keys:
|
|
|
- --- {menu:14} Send the menu character itself to remote
|
|
|
- --- {exit:14} Send the exit character itself to remote
|
|
|
- --- {reset:14} Reset target board via RTS line
|
|
|
- --- {makecmd:14} Build & flash project
|
|
|
- --- {appmake:14} Build & flash app only
|
|
|
- --- {output:14} Toggle output display
|
|
|
- --- {log:14} Toggle saving output into file
|
|
|
- --- {pause:14} Reset target into bootloader to pause app via RTS line
|
|
|
- --- {menuexit:14} Exit program
|
|
|
- """.format(version=__version__,
|
|
|
- exit=key_description(self.exit_key),
|
|
|
- menu=key_description(self.menu_key),
|
|
|
- reset=key_description(CTRL_R),
|
|
|
- makecmd=key_description(CTRL_F),
|
|
|
- appmake=key_description(CTRL_A) + ' (or A)',
|
|
|
- output=key_description(CTRL_Y),
|
|
|
- log=key_description(CTRL_L),
|
|
|
- pause=key_description(CTRL_P),
|
|
|
- menuexit=key_description(CTRL_X) + ' (or X)')
|
|
|
- return textwrap.dedent(text)
|
|
|
-
|
|
|
- def get_next_action_text(self):
|
|
|
- text = """\
|
|
|
- --- Press {} to exit monitor.
|
|
|
- --- Press {} to build & flash project.
|
|
|
- --- Press {} to build & flash app.
|
|
|
- --- Press any other key to resume monitor (resets target).
|
|
|
- """.format(key_description(self.exit_key),
|
|
|
- key_description(CTRL_F),
|
|
|
- key_description(CTRL_A))
|
|
|
- return textwrap.dedent(text)
|
|
|
-
|
|
|
- def parse_next_action_key(self, c):
|
|
|
- ret = None
|
|
|
- if c == self.exit_key:
|
|
|
- ret = (TAG_CMD, CMD_STOP)
|
|
|
- elif c == CTRL_F: # Recompile & upload
|
|
|
- ret = (TAG_CMD, CMD_MAKE)
|
|
|
- elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
|
|
|
- # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
|
|
|
- # instead
|
|
|
- ret = (TAG_CMD, CMD_APP_FLASH)
|
|
|
- return ret
|
|
|
-
|
|
|
-
|
|
|
-class SerialReader(StoppableThread):
|
|
|
- """ Read serial data from the serial port and push to the
|
|
|
- event queue, until stopped.
|
|
|
- """
|
|
|
- def __init__(self, serial, event_queue):
|
|
|
- super(SerialReader, self).__init__()
|
|
|
- self.baud = serial.baudrate
|
|
|
- self.serial = serial
|
|
|
- self.event_queue = event_queue
|
|
|
- if not hasattr(self.serial, 'cancel_read'):
|
|
|
- # enable timeout for checking alive flag,
|
|
|
- # if cancel_read not available
|
|
|
- self.serial.timeout = 0.25
|
|
|
-
|
|
|
- def run(self):
|
|
|
- if not self.serial.is_open:
|
|
|
- self.serial.baudrate = self.baud
|
|
|
- self.serial.rts = True # Force an RTS reset on open
|
|
|
- self.serial.open()
|
|
|
- time.sleep(0.005) # Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3)
|
|
|
- self.serial.rts = False
|
|
|
- self.serial.dtr = self.serial.dtr # usbser.sys workaround
|
|
|
- try:
|
|
|
- while self.alive:
|
|
|
- try:
|
|
|
- data = self.serial.read(self.serial.in_waiting or 1)
|
|
|
- except (serial.serialutil.SerialException, IOError) as e:
|
|
|
- data = b''
|
|
|
- # self.serial.open() was successful before, therefore, this is an issue related to
|
|
|
- # the disappearance of the device
|
|
|
- red_print(e)
|
|
|
- yellow_print('Waiting for the device to reconnect', newline='')
|
|
|
- self.serial.close()
|
|
|
- while self.alive: # so that exiting monitor works while waiting
|
|
|
- try:
|
|
|
- time.sleep(0.5)
|
|
|
- self.serial.open()
|
|
|
- break # device connected
|
|
|
- except serial.serialutil.SerialException:
|
|
|
- yellow_print('.', newline='')
|
|
|
- sys.stderr.flush()
|
|
|
- yellow_print('') # go to new line
|
|
|
- if len(data):
|
|
|
- self.event_queue.put((TAG_SERIAL, data), False)
|
|
|
- finally:
|
|
|
- self.serial.close()
|
|
|
-
|
|
|
- def _cancel(self):
|
|
|
- if hasattr(self.serial, 'cancel_read'):
|
|
|
- try:
|
|
|
- self.serial.cancel_read()
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
-class LineMatcher(object):
|
|
|
- """
|
|
|
- Assembles a dictionary of filtering rules based on the --print_filter
|
|
|
- argument of idf_monitor. Then later it is used to match lines and
|
|
|
- determine whether they should be shown on screen or not.
|
|
|
- """
|
|
|
- LEVEL_N = 0
|
|
|
- LEVEL_E = 1
|
|
|
- LEVEL_W = 2
|
|
|
- LEVEL_I = 3
|
|
|
- LEVEL_D = 4
|
|
|
- LEVEL_V = 5
|
|
|
-
|
|
|
- level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
|
|
|
- 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
|
|
|
-
|
|
|
- def __init__(self, print_filter):
|
|
|
- self._dict = dict()
|
|
|
- self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
|
|
|
- items = print_filter.split()
|
|
|
- if len(items) == 0:
|
|
|
- self._dict['*'] = self.LEVEL_V # default is to print everything
|
|
|
- for f in items:
|
|
|
- s = f.split(r':')
|
|
|
- if len(s) == 1:
|
|
|
- # specifying no warning level defaults to verbose level
|
|
|
- lev = self.LEVEL_V
|
|
|
- elif len(s) == 2:
|
|
|
- if len(s[0]) == 0:
|
|
|
- raise ValueError('No tag specified in filter ' + f)
|
|
|
- try:
|
|
|
- lev = self.level[s[1].upper()]
|
|
|
- except KeyError:
|
|
|
- raise ValueError('Unknown warning level in filter ' + f)
|
|
|
- else:
|
|
|
- raise ValueError('Missing ":" in filter ' + f)
|
|
|
- self._dict[s[0]] = lev
|
|
|
-
|
|
|
- def match(self, line):
|
|
|
- try:
|
|
|
- m = self._re.search(line)
|
|
|
- if m:
|
|
|
- lev = self.level[m.group(1)]
|
|
|
- if m.group(2) in self._dict:
|
|
|
- return self._dict[m.group(2)] >= lev
|
|
|
- return self._dict.get('*', self.LEVEL_N) >= lev
|
|
|
- except (KeyError, IndexError):
|
|
|
- # Regular line written with something else than ESP_LOG*
|
|
|
- # or an empty line.
|
|
|
- pass
|
|
|
- # We need something more than "*.N" for printing.
|
|
|
- return self._dict.get('*', self.LEVEL_N) > self.LEVEL_N
|
|
|
-
|
|
|
-
|
|
|
-class SerialStopException(Exception):
|
|
|
- """
|
|
|
- This exception is used for stopping the IDF monitor in testing mode.
|
|
|
- """
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
class Monitor(object):
|
|
|
"""
|
|
|
@@ -498,41 +87,36 @@ class Monitor(object):
|
|
|
|
|
|
Main difference is that all event processing happens in the main thread, not the worker threads.
|
|
|
"""
|
|
|
+
|
|
|
def __init__(self, serial_instance, elf_file, print_filter, make='make', encrypted=False,
|
|
|
toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol='CRLF',
|
|
|
decode_coredumps=COREDUMP_DECODE_INFO,
|
|
|
decode_panic=PANIC_DECODE_DISABLE,
|
|
|
- target=None,
|
|
|
- websocket_client=None,
|
|
|
- enable_address_decoding=True):
|
|
|
+ target='esp32',
|
|
|
+ websocket_client=None, enable_address_decoding=True):
|
|
|
+ # type: (serial.Serial, str, str, str, bool, str, str, str, str, str, WebSocketClient, bool) -> None
|
|
|
super(Monitor, self).__init__()
|
|
|
- self.event_queue = queue.Queue()
|
|
|
- self.cmd_queue = queue.Queue()
|
|
|
+ self.event_queue = queue.Queue() # type: queue.Queue
|
|
|
+ self.cmd_queue = queue.Queue() # type: queue.Queue
|
|
|
self.console = miniterm.Console()
|
|
|
self.enable_address_decoding = enable_address_decoding
|
|
|
- if os.name == 'nt':
|
|
|
- sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
|
|
|
- self.console.output = ANSIColorConverter(self.console.output)
|
|
|
- self.console.byte_output = ANSIColorConverter(self.console.byte_output)
|
|
|
-
|
|
|
- if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
|
|
|
- # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
|
|
|
- def getkey_patched(self):
|
|
|
- c = self.enc_stdin.read(1)
|
|
|
- if c == chr(0x7f):
|
|
|
- c = chr(8) # map the BS key (which yields DEL) to backspace
|
|
|
- return c
|
|
|
-
|
|
|
- self.console.getkey = types.MethodType(getkey_patched, self.console)
|
|
|
-
|
|
|
- socket_mode = serial_instance.port.startswith('socket://') # testing hook - data from serial can make exit the monitor
|
|
|
+
|
|
|
+ sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
|
|
|
+ self.console.output = ANSIColorConverter(self.console.output)
|
|
|
+ self.console.byte_output = ANSIColorConverter(self.console.byte_output)
|
|
|
+
|
|
|
+ socket_mode = serial_instance.port.startswith(
|
|
|
+ 'socket://') # testing hook - data from serial can make exit the monitor
|
|
|
self.serial = serial_instance
|
|
|
+
|
|
|
self.console_parser = ConsoleParser(eol)
|
|
|
- self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode)
|
|
|
+ self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser,
|
|
|
+ socket_mode)
|
|
|
self.serial_reader = SerialReader(self.serial, self.event_queue)
|
|
|
self.elf_file = elf_file
|
|
|
if not os.path.exists(make):
|
|
|
- self.make = shlex.split(make) # allow for possibility the "make" arg is a list of arguments (for idf.py)
|
|
|
+ # allow for possibility the "make" arg is a list of arguments (for idf.py)
|
|
|
+ self.make = shlex.split(make) # type: Union[str, List[str]]
|
|
|
else:
|
|
|
self.make = make
|
|
|
self.encrypted = encrypted
|
|
|
@@ -545,11 +129,11 @@ class Monitor(object):
|
|
|
self._gdb_buffer = b''
|
|
|
self._pc_address_buffer = b''
|
|
|
self._line_matcher = LineMatcher(print_filter)
|
|
|
- self._invoke_processing_last_line_timer = None
|
|
|
+ self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer]
|
|
|
self._force_line_print = False
|
|
|
self._output_enabled = True
|
|
|
self._serial_check_exit = socket_mode
|
|
|
- self._log_file = None
|
|
|
+ self._log_file = None # type: Optional[BinaryIO]
|
|
|
self._decode_coredumps = decode_coredumps
|
|
|
self._reading_coredump = COREDUMP_IDLE
|
|
|
self._coredump_buffer = b''
|
|
|
@@ -558,9 +142,11 @@ class Monitor(object):
|
|
|
self._panic_buffer = b''
|
|
|
|
|
|
def invoke_processing_last_line(self):
|
|
|
+ # type: () -> None
|
|
|
self.event_queue.put((TAG_SERIAL_FLUSH, b''), False)
|
|
|
|
|
|
def main_loop(self):
|
|
|
+ # type: () -> None
|
|
|
self.console_reader.start()
|
|
|
self.serial_reader.start()
|
|
|
try:
|
|
|
@@ -573,9 +159,9 @@ class Monitor(object):
|
|
|
except queue.Empty:
|
|
|
continue
|
|
|
|
|
|
- (event_tag, data) = item
|
|
|
+ event_tag, data = item
|
|
|
if event_tag == TAG_CMD:
|
|
|
- self.handle_commands(data)
|
|
|
+ self.handle_commands(data, self.target)
|
|
|
elif event_tag == TAG_KEY:
|
|
|
try:
|
|
|
self.serial.write(codecs.encode(data))
|
|
|
@@ -597,9 +183,9 @@ class Monitor(object):
|
|
|
elif event_tag == TAG_SERIAL_FLUSH:
|
|
|
self.handle_serial_input(data, finalize_line=True)
|
|
|
else:
|
|
|
- raise RuntimeError('Bad event data %r' % ((event_tag,data),))
|
|
|
+ raise RuntimeError('Bad event data %r' % ((event_tag, data),))
|
|
|
except SerialStopException:
|
|
|
- sys.stderr.write(ANSI_NORMAL + 'Stopping condition has been received\n')
|
|
|
+ normal_print('Stopping condition has been received\n')
|
|
|
finally:
|
|
|
try:
|
|
|
self.console_reader.stop()
|
|
|
@@ -610,9 +196,10 @@ class Monitor(object):
|
|
|
self._invoke_processing_last_line_timer = None
|
|
|
except Exception:
|
|
|
pass
|
|
|
- sys.stderr.write(ANSI_NORMAL + '\n')
|
|
|
+ normal_print('\n')
|
|
|
|
|
|
def handle_serial_input(self, data, finalize_line=False):
|
|
|
+ # type: (bytes, bool) -> None
|
|
|
sp = data.split(b'\n')
|
|
|
if self._last_line_part != b'':
|
|
|
# add unprocessed part from previous "data" to the first line
|
|
|
@@ -630,7 +217,7 @@ class Monitor(object):
|
|
|
if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
|
|
|
self._print(line + b'\n')
|
|
|
self.handle_possible_pc_address_in_line(line)
|
|
|
- self.check_coredump_trigger_after_print(line)
|
|
|
+ self.check_coredump_trigger_after_print()
|
|
|
self.check_gdbstub_trigger(line)
|
|
|
self._force_line_print = False
|
|
|
# Now we have the last part (incomplete line) in _last_line_part. By
|
|
|
@@ -657,6 +244,7 @@ class Monitor(object):
|
|
|
# handle_serial_input is invoked
|
|
|
|
|
|
def handle_possible_pc_address_in_line(self, line):
|
|
|
+ # type: (bytes) -> None
|
|
|
line = self._pc_address_buffer + line
|
|
|
self._pc_address_buffer = b''
|
|
|
if self.enable_address_decoding:
|
|
|
@@ -664,16 +252,17 @@ class Monitor(object):
|
|
|
self.lookup_pc_address(m.group())
|
|
|
|
|
|
def __enter__(self):
|
|
|
+ # type: () -> None
|
|
|
""" Use 'with self' to temporarily disable monitoring behaviour """
|
|
|
self.serial_reader.stop()
|
|
|
self.console_reader.stop()
|
|
|
|
|
|
- def __exit__(self, *args, **kwargs):
|
|
|
+ def __exit__(self, *args, **kwargs): # type: ignore
|
|
|
""" Use 'with self' to temporarily disable monitoring behaviour """
|
|
|
self.console_reader.start()
|
|
|
self.serial_reader.start()
|
|
|
|
|
|
- def prompt_next_action(self, reason):
|
|
|
+ def prompt_next_action(self, reason): # type: (str) -> None
|
|
|
self.console.setup() # set up console to trap input characters
|
|
|
try:
|
|
|
red_print('--- {}'.format(reason))
|
|
|
@@ -693,7 +282,7 @@ class Monitor(object):
|
|
|
else:
|
|
|
self.cmd_queue.put(ret)
|
|
|
|
|
|
- def run_make(self, target):
|
|
|
+ def run_make(self, target): # type: (str) -> None
|
|
|
with self:
|
|
|
if isinstance(self.make, list):
|
|
|
popen_args = self.make + [target]
|
|
|
@@ -710,7 +299,7 @@ class Monitor(object):
|
|
|
else:
|
|
|
self.output_enable(True)
|
|
|
|
|
|
- def lookup_pc_address(self, pc_addr):
|
|
|
+ def lookup_pc_address(self, pc_addr): # type: (str) -> None
|
|
|
cmd = ['%saddr2line' % self.toolchain_prefix,
|
|
|
'-pfiaC', '-e', self.elf_file, pc_addr]
|
|
|
try:
|
|
|
@@ -720,7 +309,7 @@ class Monitor(object):
|
|
|
except OSError as e:
|
|
|
red_print('%s: %s' % (' '.join(cmd), e))
|
|
|
|
|
|
- def check_gdbstub_trigger(self, line):
|
|
|
+ def check_gdbstub_trigger(self, line): # type: (bytes) -> None
|
|
|
line = self._gdb_buffer + line
|
|
|
self._gdb_buffer = b''
|
|
|
m = re.search(b'\\$(T..)#(..)', line) # look for a gdb "reason" for a break
|
|
|
@@ -744,7 +333,7 @@ class Monitor(object):
|
|
|
else:
|
|
|
red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum))
|
|
|
|
|
|
- def check_coredump_trigger_before_print(self, line):
|
|
|
+ def check_coredump_trigger_before_print(self, line): # type: (bytes) -> None
|
|
|
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
|
|
|
return
|
|
|
|
|
|
@@ -774,7 +363,7 @@ class Monitor(object):
|
|
|
if new_buffer_len_kb > buffer_len_kb:
|
|
|
yellow_print('Received %3d kB...' % (new_buffer_len_kb), newline='\r')
|
|
|
|
|
|
- def check_coredump_trigger_after_print(self, line):
|
|
|
+ def check_coredump_trigger_after_print(self): # type: () -> None
|
|
|
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
|
|
|
return
|
|
|
|
|
|
@@ -784,7 +373,7 @@ class Monitor(object):
|
|
|
self._output_enabled = True
|
|
|
self._coredump_buffer = b''
|
|
|
|
|
|
- def process_coredump(self):
|
|
|
+ def process_coredump(self): # type: () -> None
|
|
|
if self._decode_coredumps != COREDUMP_DECODE_INFO:
|
|
|
raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps)
|
|
|
|
|
|
@@ -829,9 +418,9 @@ class Monitor(object):
|
|
|
try:
|
|
|
os.unlink(coredump_file.name)
|
|
|
except OSError as e:
|
|
|
- yellow_print("Couldn't remote temporary core dump file ({})".format(e))
|
|
|
+ yellow_print('Couldn\'t remote temporary core dump file ({})'.format(e))
|
|
|
|
|
|
- def check_panic_decode_trigger(self, line):
|
|
|
+ def check_panic_decode_trigger(self, line): # type: (bytes) -> None
|
|
|
if self._decode_panic == PANIC_DECODE_DISABLE:
|
|
|
return
|
|
|
|
|
|
@@ -851,7 +440,7 @@ class Monitor(object):
|
|
|
self.process_panic_output(self._panic_buffer)
|
|
|
self._panic_buffer = b''
|
|
|
|
|
|
- def process_panic_output(self, panic_output):
|
|
|
+ def process_panic_output(self, panic_output): # type: (bytes) -> None
|
|
|
panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'tools', 'gdb_panic_server.py')
|
|
|
panic_output_file = None
|
|
|
try:
|
|
|
@@ -865,10 +454,10 @@ class Monitor(object):
|
|
|
'--batch', '-n',
|
|
|
self.elf_file,
|
|
|
'-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\""
|
|
|
- .format(python=sys.executable,
|
|
|
- script=panic_output_decode_script,
|
|
|
- target=self.target,
|
|
|
- output_file=panic_output_file.name),
|
|
|
+ .format(python=sys.executable,
|
|
|
+ script=panic_output_decode_script,
|
|
|
+ target=self.target,
|
|
|
+ output_file=panic_output_file.name),
|
|
|
'-ex', 'bt']
|
|
|
|
|
|
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
@@ -882,11 +471,11 @@ class Monitor(object):
|
|
|
try:
|
|
|
os.unlink(panic_output_file.name)
|
|
|
except OSError as e:
|
|
|
- yellow_print("Couldn't remove temporary panic output file ({})".format(e))
|
|
|
+ yellow_print('Couldn\'t remove temporary panic output file ({})'.format(e))
|
|
|
|
|
|
- def run_gdb(self):
|
|
|
+ def run_gdb(self): # type: () -> None
|
|
|
with self: # disable console control
|
|
|
- sys.stderr.write(ANSI_NORMAL)
|
|
|
+ normal_print('')
|
|
|
try:
|
|
|
cmd = ['%sgdb' % self.toolchain_prefix,
|
|
|
'-ex', 'set serial baud %d' % self.serial.baudrate,
|
|
|
@@ -912,30 +501,32 @@ class Monitor(object):
|
|
|
pass # don't care if there's no stty, we tried...
|
|
|
self.prompt_next_action('gdb exited')
|
|
|
|
|
|
- def output_enable(self, enable):
|
|
|
+ def output_enable(self, enable): # type: (bool) -> None
|
|
|
self._output_enabled = enable
|
|
|
|
|
|
- def output_toggle(self):
|
|
|
+ def output_toggle(self): # type: () -> None
|
|
|
self._output_enabled = not self._output_enabled
|
|
|
- yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(self._output_enabled))
|
|
|
|
|
|
- def toggle_logging(self):
|
|
|
+ yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(
|
|
|
+ self._output_enabled))
|
|
|
+
|
|
|
+ def toggle_logging(self): # type: () -> None
|
|
|
if self._log_file:
|
|
|
self.stop_logging()
|
|
|
else:
|
|
|
self.start_logging()
|
|
|
|
|
|
- def start_logging(self):
|
|
|
+ def start_logging(self): # type: () -> None
|
|
|
if not self._log_file:
|
|
|
+ name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
|
|
|
+ datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
|
|
|
try:
|
|
|
- name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
|
|
|
- datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
|
|
|
self._log_file = open(name, 'wb+')
|
|
|
yellow_print('\nLogging is enabled into file {}'.format(name))
|
|
|
except Exception as e:
|
|
|
red_print('\nLog file {} cannot be created: {}'.format(name, e))
|
|
|
|
|
|
- def stop_logging(self):
|
|
|
+ def stop_logging(self): # type: () -> None
|
|
|
if self._log_file:
|
|
|
try:
|
|
|
name = self._log_file.name
|
|
|
@@ -946,7 +537,7 @@ class Monitor(object):
|
|
|
finally:
|
|
|
self._log_file = None
|
|
|
|
|
|
- def _print(self, string, console_printer=None):
|
|
|
+ def _print(self, string, console_printer=None): # type: (Union[str, bytes], Optional[Callable]) -> None
|
|
|
if console_printer is None:
|
|
|
console_printer = self.console.write_bytes
|
|
|
if self._output_enabled:
|
|
|
@@ -955,23 +546,31 @@ class Monitor(object):
|
|
|
try:
|
|
|
if isinstance(string, type(u'')):
|
|
|
string = string.encode()
|
|
|
- self._log_file.write(string)
|
|
|
+ self._log_file.write(string) # type: ignore
|
|
|
except Exception as e:
|
|
|
red_print('\nCannot write to file: {}'.format(e))
|
|
|
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
|
|
|
self.stop_logging()
|
|
|
|
|
|
- def handle_commands(self, cmd):
|
|
|
+ def handle_commands(self, cmd, chip): # type: (int, str) -> None
|
|
|
+ config = get_chip_config(chip)
|
|
|
+ reset_delay = config['reset']
|
|
|
+ enter_boot_set = config['enter_boot_set']
|
|
|
+ enter_boot_unset = config['enter_boot_unset']
|
|
|
+
|
|
|
+ high = False
|
|
|
+ low = True
|
|
|
+
|
|
|
if cmd == CMD_STOP:
|
|
|
self.console_reader.stop()
|
|
|
self.serial_reader.stop()
|
|
|
elif cmd == CMD_RESET:
|
|
|
- self.serial.setRTS(True)
|
|
|
+ self.serial.setRTS(low)
|
|
|
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
|
|
|
- time.sleep(0.2)
|
|
|
- self.serial.setRTS(False)
|
|
|
+ time.sleep(reset_delay)
|
|
|
+ self.serial.setRTS(high)
|
|
|
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
|
|
|
- self.output_enable(True)
|
|
|
+ self.output_enable(low)
|
|
|
elif cmd == CMD_MAKE:
|
|
|
self.run_make('encrypted-flash' if self.encrypted else 'flash')
|
|
|
elif cmd == CMD_APP_FLASH:
|
|
|
@@ -981,20 +580,20 @@ class Monitor(object):
|
|
|
elif cmd == CMD_TOGGLE_LOGGING:
|
|
|
self.toggle_logging()
|
|
|
elif cmd == CMD_ENTER_BOOT:
|
|
|
- self.serial.setDTR(False) # IO0=HIGH
|
|
|
- self.serial.setRTS(True) # EN=LOW, chip in reset
|
|
|
+ self.serial.setDTR(high) # IO0=HIGH
|
|
|
+ self.serial.setRTS(low) # EN=LOW, chip in reset
|
|
|
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
|
|
|
- time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
|
|
|
- self.serial.setDTR(True) # IO0=LOW
|
|
|
- self.serial.setRTS(False) # EN=HIGH, chip out of reset
|
|
|
+ time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
|
|
|
+ self.serial.setDTR(low) # IO0=LOW
|
|
|
+ self.serial.setRTS(high) # EN=HIGH, chip out of reset
|
|
|
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
|
|
|
- time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
|
|
|
- self.serial.setDTR(False) # IO0=HIGH, done
|
|
|
+ time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
|
|
|
+ self.serial.setDTR(high) # IO0=HIGH, done
|
|
|
else:
|
|
|
- raise RuntimeError('Bad command data %d' % (cmd))
|
|
|
+ raise RuntimeError('Bad command data %d' % cmd) # type: ignore
|
|
|
|
|
|
|
|
|
-def main():
|
|
|
+def main(): # type: () -> None
|
|
|
parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf')
|
|
|
|
|
|
parser.add_argument(
|
|
|
@@ -1005,7 +604,7 @@ def main():
|
|
|
|
|
|
parser.add_argument(
|
|
|
'--disable-address-decoding', '-d',
|
|
|
- help="Don't print lines about decoded addresses from the application ELF file.",
|
|
|
+ help="Don't print lines about decoded addresses from the application ELF file",
|
|
|
action='store_true',
|
|
|
default=True if os.environ.get('ESP_MONITOR_DECODE') == 0 else False
|
|
|
)
|
|
|
@@ -1063,8 +662,15 @@ def main():
|
|
|
|
|
|
parser.add_argument(
|
|
|
'--target',
|
|
|
- required=False,
|
|
|
- help='Target name (used when stack dump decoding is enabled)'
|
|
|
+ help='Target name (used when stack dump decoding is enabled)',
|
|
|
+ default=os.environ.get('IDF_TARGET', 'esp32')
|
|
|
+ )
|
|
|
+
|
|
|
+ parser.add_argument(
|
|
|
+ '--revision',
|
|
|
+ help='Revision of the target',
|
|
|
+ type=int,
|
|
|
+ default=0
|
|
|
)
|
|
|
|
|
|
parser.add_argument(
|
|
|
@@ -1090,7 +696,6 @@ def main():
|
|
|
do_not_open=True)
|
|
|
serial_instance.dtr = False
|
|
|
serial_instance.rts = False
|
|
|
-
|
|
|
args.elf_file.close() # don't need this as a file
|
|
|
|
|
|
# remove the parallel jobserver arguments from MAKEFLAGS, as any
|
|
|
@@ -1118,8 +723,7 @@ def main():
|
|
|
args.decode_coredumps, args.decode_panic, args.target,
|
|
|
ws, enable_address_decoding=not args.disable_address_decoding)
|
|
|
|
|
|
- yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
|
|
|
- p=serial_instance))
|
|
|
+ yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(p=serial_instance))
|
|
|
yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
|
|
|
key_description(monitor.console_parser.exit_key),
|
|
|
key_description(monitor.console_parser.menu_key),
|
|
|
@@ -1134,185 +738,5 @@ def main():
|
|
|
ws.close()
|
|
|
|
|
|
|
|
|
-class WebSocketClient(object):
|
|
|
- """
|
|
|
- WebSocket client used to advertise debug events to WebSocket server by sending and receiving JSON-serialized
|
|
|
- dictionaries.
|
|
|
-
|
|
|
- Advertisement of debug event:
|
|
|
- {'event': 'gdb_stub', 'port': '/dev/ttyUSB1', 'prog': 'build/elf_file'} for GDB Stub, or
|
|
|
- {'event': 'coredump', 'file': '/tmp/xy', 'prog': 'build/elf_file'} for coredump,
|
|
|
- where 'port' is the port for the connected device, 'prog' is the full path to the ELF file and 'file' is the
|
|
|
- generated coredump file.
|
|
|
-
|
|
|
- Expected end of external debugging:
|
|
|
- {'event': 'debug_finished'}
|
|
|
- """
|
|
|
-
|
|
|
- RETRIES = 3
|
|
|
- CONNECTION_RETRY_DELAY = 1
|
|
|
-
|
|
|
- def __init__(self, url):
|
|
|
- self.url = url
|
|
|
- self._connect()
|
|
|
-
|
|
|
- def _connect(self):
|
|
|
- """
|
|
|
- Connect to WebSocket server at url
|
|
|
- """
|
|
|
- self.close()
|
|
|
-
|
|
|
- for _ in range(self.RETRIES):
|
|
|
- try:
|
|
|
- self.ws = websocket.create_connection(self.url)
|
|
|
- break # success
|
|
|
- except NameError:
|
|
|
- raise RuntimeError('Please install the websocket_client package for IDE integration!')
|
|
|
- except Exception as e:
|
|
|
- red_print('WebSocket connection error: {}'.format(e))
|
|
|
- time.sleep(self.CONNECTION_RETRY_DELAY)
|
|
|
- else:
|
|
|
- raise RuntimeError('Cannot connect to WebSocket server')
|
|
|
-
|
|
|
- def close(self):
|
|
|
- try:
|
|
|
- self.ws.close()
|
|
|
- except AttributeError:
|
|
|
- # Not yet connected
|
|
|
- pass
|
|
|
- except Exception as e:
|
|
|
- red_print('WebSocket close error: {}'.format(e))
|
|
|
-
|
|
|
- def send(self, payload_dict):
|
|
|
- """
|
|
|
- Serialize payload_dict in JSON format and send it to the server
|
|
|
- """
|
|
|
- for _ in range(self.RETRIES):
|
|
|
- try:
|
|
|
- self.ws.send(json.dumps(payload_dict))
|
|
|
- yellow_print('WebSocket sent: {}'.format(payload_dict))
|
|
|
- break
|
|
|
- except Exception as e:
|
|
|
- red_print('WebSocket send error: {}'.format(e))
|
|
|
- self._connect()
|
|
|
- else:
|
|
|
- raise RuntimeError('Cannot send to WebSocket server')
|
|
|
-
|
|
|
- def wait(self, expect_iterable):
|
|
|
- """
|
|
|
- Wait until a dictionary in JSON format is received from the server with all (key, value) tuples from
|
|
|
- expect_iterable.
|
|
|
- """
|
|
|
- for _ in range(self.RETRIES):
|
|
|
- try:
|
|
|
- r = self.ws.recv()
|
|
|
- except Exception as e:
|
|
|
- red_print('WebSocket receive error: {}'.format(e))
|
|
|
- self._connect()
|
|
|
- continue
|
|
|
- obj = json.loads(r)
|
|
|
- if all([k in obj and obj[k] == v for k, v in expect_iterable]):
|
|
|
- yellow_print('WebSocket received: {}'.format(obj))
|
|
|
- break
|
|
|
- red_print('WebSocket expected: {}, received: {}'.format(dict(expect_iterable), obj))
|
|
|
- else:
|
|
|
- raise RuntimeError('Cannot receive from WebSocket server')
|
|
|
-
|
|
|
-
|
|
|
-if os.name == 'nt':
|
|
|
- # Windows console stuff
|
|
|
-
|
|
|
- STD_OUTPUT_HANDLE = -11
|
|
|
- STD_ERROR_HANDLE = -12
|
|
|
-
|
|
|
- # wincon.h values
|
|
|
- FOREGROUND_INTENSITY = 8
|
|
|
- FOREGROUND_GREY = 7
|
|
|
-
|
|
|
- # matches the ANSI color change sequences that IDF sends
|
|
|
- RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
|
|
|
-
|
|
|
- # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
|
|
|
- ANSI_TO_WINDOWS_COLOR = [0, 4, 2, 6, 1, 5, 3, 7]
|
|
|
-
|
|
|
- GetStdHandle = ctypes.windll.kernel32.GetStdHandle
|
|
|
- SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
|
|
|
-
|
|
|
- class ANSIColorConverter(object):
|
|
|
- """Class to wrap a file-like output stream, intercept ANSI color codes,
|
|
|
- and convert them into calls to Windows SetConsoleTextAttribute.
|
|
|
-
|
|
|
- Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses.
|
|
|
-
|
|
|
- Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text
|
|
|
- color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the
|
|
|
- least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, output=None, decode_output=False):
|
|
|
- self.output = output
|
|
|
- self.decode_output = decode_output
|
|
|
- self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
|
|
|
- self.matched = b''
|
|
|
-
|
|
|
- def _output_write(self, data):
|
|
|
- try:
|
|
|
- if self.decode_output:
|
|
|
- self.output.write(data.decode())
|
|
|
- else:
|
|
|
- self.output.write(data)
|
|
|
- except (IOError, OSError):
|
|
|
- # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
|
|
|
- # an exception (however, the character is still written to the screen)
|
|
|
- # Ref https://github.com/espressif/esp-idf/issues/1163
|
|
|
- #
|
|
|
- # Also possible for Windows to throw an OSError error if the data is invalid for the console
|
|
|
- # (garbage bytes, etc)
|
|
|
- pass
|
|
|
- except UnicodeDecodeError:
|
|
|
- # In case of double byte Unicode characters display '?'
|
|
|
- self.output.write('?')
|
|
|
-
|
|
|
- def write(self, data):
|
|
|
- if isinstance(data, bytes):
|
|
|
- data = bytearray(data)
|
|
|
- else:
|
|
|
- data = bytearray(data, 'utf-8')
|
|
|
- for b in data:
|
|
|
- b = bytes([b])
|
|
|
- length = len(self.matched)
|
|
|
- if b == b'\033': # ESC
|
|
|
- self.matched = b
|
|
|
- elif (length == 1 and b == b'[') or (1 < length < 7):
|
|
|
- self.matched += b
|
|
|
- if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console
|
|
|
- # Flush is required only with Python3 - switching color before it is printed would mess up the console
|
|
|
- self.flush()
|
|
|
- SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
|
|
|
- self.matched = b''
|
|
|
- elif len(self.matched) == 7: # could be an ANSI sequence
|
|
|
- m = re.match(RE_ANSI_COLOR, self.matched)
|
|
|
- if m is not None:
|
|
|
- color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
|
|
|
- if m.group(1) == b'1':
|
|
|
- color |= FOREGROUND_INTENSITY
|
|
|
- # Flush is required only with Python3 - switching color before it is printed would mess up the console
|
|
|
- self.flush()
|
|
|
- SetConsoleTextAttribute(self.handle, color)
|
|
|
- else:
|
|
|
- self._output_write(self.matched) # not an ANSI color code, display verbatim
|
|
|
- self.matched = b''
|
|
|
- else:
|
|
|
- self._output_write(b)
|
|
|
- self.matched = b''
|
|
|
-
|
|
|
- def flush(self):
|
|
|
- try:
|
|
|
- self.output.flush()
|
|
|
- except OSError:
|
|
|
- # Account for Windows Console refusing to accept garbage bytes (serial noise, etc)
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
if __name__ == '__main__':
|
|
|
main()
|