serial_handler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import hashlib
  4. import os
  5. import queue # noqa: F401
  6. import re
  7. import subprocess
  8. import time
  9. from typing import Callable, Optional
  10. import serial # noqa: F401
  11. from serial.tools import miniterm # noqa: F401
  12. from .chip_specific_config import get_chip_config
  13. from .console_parser import ConsoleParser, prompt_next_action # noqa: F401
  14. from .console_reader import ConsoleReader # noqa: F401
  15. from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP,
  16. CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE,
  17. PANIC_READING, PANIC_STACK_DUMP, PANIC_START)
  18. from .coredump import CoreDump
  19. from .exceptions import SerialStopException
  20. from .gdbhelper import GDBHelper
  21. from .line_matcher import LineMatcher
  22. from .logger import Logger
  23. from .output_helpers import yellow_print
  24. from .serial_reader import Reader
  25. def get_sha256(filename, block_size=65536): # type: (str, int) -> str
  26. sha256 = hashlib.sha256()
  27. with open(filename, 'rb') as f:
  28. for block in iter(lambda: f.read(block_size), b''):
  29. sha256.update(block)
  30. return sha256.hexdigest()
  31. def run_make(target, make, console, console_parser, event_queue, cmd_queue, logger):
  32. # type: (str, str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue, Logger) -> None
  33. if isinstance(make, list):
  34. popen_args = make + [target]
  35. else:
  36. popen_args = [make, target]
  37. yellow_print('Running %s...' % ' '.join(popen_args))
  38. p = subprocess.Popen(popen_args, env=os.environ)
  39. try:
  40. p.wait()
  41. except KeyboardInterrupt:
  42. p.wait()
  43. if p.returncode != 0:
  44. prompt_next_action('Build failed', console, console_parser, event_queue, cmd_queue)
  45. else:
  46. logger.output_enabled = True
  47. class SerialHandler:
  48. """
  49. The class is responsible for buffering serial input and performing corresponding commands.
  50. """
  51. def __init__(self, last_line_part, serial_check_exit, logger, decode_panic, reading_panic, panic_buffer, target,
  52. force_line_print, start_cmd_sent, serial_instance, encrypted, reset, elf_file):
  53. # type: (bytes, bool, Logger, str, int, bytes,str, bool, bool, serial.Serial, bool, bool, str) -> None
  54. self._last_line_part = last_line_part
  55. self._serial_check_exit = serial_check_exit
  56. self.logger = logger
  57. self._decode_panic = decode_panic
  58. self._reading_panic = reading_panic
  59. self._panic_buffer = panic_buffer
  60. self.target = target
  61. self._force_line_print = force_line_print
  62. self.start_cmd_sent = start_cmd_sent
  63. self.serial_instance = serial_instance
  64. self.encrypted = encrypted
  65. self.reset = reset
  66. self.elf_file = elf_file
  67. def handle_serial_input(self, data, console_parser, coredump, gdb_helper, line_matcher,
  68. check_gdb_stub_and_run, finalize_line=False):
  69. # type: (bytes, ConsoleParser, CoreDump, Optional[GDBHelper], LineMatcher, Callable, bool) -> None
  70. # Remove "+" after Continue command
  71. if self.start_cmd_sent:
  72. self.start_cmd_sent = False
  73. pos = data.find(b'+')
  74. if pos != -1:
  75. data = data[(pos + 1):]
  76. sp = data.split(b'\n')
  77. if self._last_line_part != b'':
  78. # add unprocessed part from previous "data" to the first line
  79. sp[0] = self._last_line_part + sp[0]
  80. self._last_line_part = b''
  81. if sp[-1] != b'':
  82. # last part is not a full line
  83. self._last_line_part = sp.pop()
  84. for line in sp:
  85. if line == b'':
  86. continue
  87. if self._serial_check_exit and line == console_parser.exit_key.encode('latin-1'):
  88. raise SerialStopException()
  89. if gdb_helper:
  90. self.check_panic_decode_trigger(line, gdb_helper)
  91. with coredump.check(line):
  92. if self._force_line_print or line_matcher.match(line.decode(errors='ignore')):
  93. self.logger.print(line + b'\n')
  94. self.compare_elf_sha256(line.decode(errors='ignore'))
  95. self.logger.handle_possible_pc_address_in_line(line)
  96. check_gdb_stub_and_run(line)
  97. self._force_line_print = False
  98. # Now we have the last part (incomplete line) in _last_line_part. By
  99. # default we don't touch it and just wait for the arrival of the rest
  100. # of the line. But after some time when we didn't received it we need
  101. # to make a decision.
  102. force_print_or_matched = any((
  103. self._force_line_print,
  104. (finalize_line and line_matcher.match(self._last_line_part.decode(errors='ignore')))
  105. ))
  106. if self._last_line_part != b'' and force_print_or_matched:
  107. self._force_line_print = True
  108. self.logger.print(self._last_line_part)
  109. self.logger.handle_possible_pc_address_in_line(self._last_line_part)
  110. check_gdb_stub_and_run(self._last_line_part)
  111. # It is possible that the incomplete line cuts in half the PC
  112. # address. A small buffer is kept and will be used the next time
  113. # handle_possible_pc_address_in_line is invoked to avoid this problem.
  114. # MATCH_PCADDR matches 10 character long addresses. Therefore, we
  115. # keep the last 9 characters.
  116. self.logger.pc_address_buffer = self._last_line_part[-9:]
  117. # GDB sequence can be cut in half also. GDB sequence is 7
  118. # characters long, therefore, we save the last 6 characters.
  119. if gdb_helper:
  120. gdb_helper.gdb_buffer = self._last_line_part[-6:]
  121. self._last_line_part = b''
  122. # else: keeping _last_line_part and it will be processed the next time
  123. # handle_serial_input is invoked
  124. def check_panic_decode_trigger(self, line, gdb_helper): # type: (bytes, GDBHelper) -> None
  125. if self._decode_panic == PANIC_DECODE_DISABLE:
  126. return
  127. if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')):
  128. self._reading_panic = PANIC_READING
  129. yellow_print('Stack dump detected')
  130. if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line:
  131. self.logger.output_enabled = False
  132. if self._reading_panic == PANIC_READING:
  133. self._panic_buffer += line.replace(b'\r', b'') + b'\n'
  134. if self._reading_panic == PANIC_READING and PANIC_END in line:
  135. self._reading_panic = PANIC_IDLE
  136. self.logger.output_enabled = True
  137. gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target)
  138. self._panic_buffer = b''
  139. def compare_elf_sha256(self, line): # type: (str) -> None
  140. elf_sha256_matcher = re.compile(
  141. r'ELF file SHA256:\s+(?P<sha256_flashed>[a-z0-9]+)'
  142. )
  143. file_sha256_flashed_match = re.search(elf_sha256_matcher, line)
  144. if not file_sha256_flashed_match:
  145. return
  146. file_sha256_flashed = file_sha256_flashed_match.group('sha256_flashed')
  147. if not os.path.exists(self.elf_file):
  148. yellow_print(f'ELF file not found. '
  149. f"You need to build & flash the project before running 'monitor', "
  150. f'and the binary on the device must match the one in the build directory exactly. ')
  151. else:
  152. file_sha256_build = get_sha256(self.elf_file)
  153. if file_sha256_flashed not in f'{file_sha256_build}':
  154. yellow_print(f'Warning: checksum mismatch between flashed and built applications. '
  155. f'Checksum of built application is {file_sha256_build}')
  156. def handle_commands(self, cmd, chip, run_make_func, console_reader, serial_reader):
  157. # type: (int, str, Callable, ConsoleReader, Reader) -> None
  158. config = get_chip_config(chip)
  159. reset_delay = config['reset']
  160. enter_boot_set = config['enter_boot_set']
  161. enter_boot_unset = config['enter_boot_unset']
  162. high = False
  163. low = True
  164. if chip == 'linux':
  165. if cmd in [CMD_RESET,
  166. CMD_MAKE,
  167. CMD_APP_FLASH,
  168. CMD_ENTER_BOOT]:
  169. yellow_print('linux target does not support this command')
  170. return
  171. if cmd == CMD_STOP:
  172. console_reader.stop()
  173. serial_reader.stop()
  174. elif cmd == CMD_RESET:
  175. self.serial_instance.setRTS(low) # EN=LOW, chip in reset
  176. self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
  177. time.sleep(reset_delay)
  178. self.serial_instance.setRTS(high) # EN=HIGH, chip out of reset
  179. self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
  180. self.logger.output_enabled = True
  181. elif cmd == CMD_MAKE:
  182. run_make_func('encrypted-flash' if self.encrypted else 'flash')
  183. elif cmd == CMD_APP_FLASH:
  184. run_make_func('encrypted-app-flash' if self.encrypted else 'app-flash')
  185. elif cmd == CMD_OUTPUT_TOGGLE:
  186. self.logger.output_toggle()
  187. elif cmd == CMD_TOGGLE_LOGGING:
  188. self.logger.toggle_logging()
  189. elif cmd == CMD_TOGGLE_TIMESTAMPS:
  190. self.logger.toggle_timestamps()
  191. elif cmd == CMD_ENTER_BOOT:
  192. yellow_print('Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart')
  193. self.serial_instance.setDTR(high) # IO0=HIGH
  194. self.serial_instance.setRTS(low) # EN=LOW, chip in reset
  195. self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
  196. time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
  197. self.serial_instance.setDTR(low) # IO0=LOW
  198. self.serial_instance.setRTS(high) # EN=HIGH, chip out of reset
  199. self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
  200. time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
  201. self.serial_instance.setDTR(high) # IO0=HIGH, done
  202. else:
  203. raise RuntimeError('Bad command data %d' % cmd) # type: ignore
  204. class SerialHandlerNoElf(SerialHandler):
  205. # This method avoids using 'gdb_helper,' 'coredump' and 'handle_possible_pc_address_in_line'
  206. # where the elf file is required to be passed
  207. def handle_serial_input(self, data, console_parser, coredump, gdb_helper, line_matcher,
  208. check_gdb_stub_and_run, finalize_line=False):
  209. # type: (bytes, ConsoleParser, CoreDump, Optional[GDBHelper], LineMatcher, Callable, bool) -> None
  210. if self.start_cmd_sent:
  211. self.start_cmd_sent = False
  212. pos = data.find(b'+')
  213. if pos != -1:
  214. data = data[(pos + 1):]
  215. sp = data.split(b'\n')
  216. if self._last_line_part != b'':
  217. # add unprocessed part from previous "data" to the first line
  218. sp[0] = self._last_line_part + sp[0]
  219. self._last_line_part = b''
  220. if sp[-1] != b'':
  221. # last part is not a full line
  222. self._last_line_part = sp.pop()
  223. for line in sp:
  224. if line == b'':
  225. continue
  226. if self._serial_check_exit and line == console_parser.exit_key.encode('latin-1'):
  227. raise SerialStopException()
  228. self.logger.print(line + b'\n')
  229. self.compare_elf_sha256(line.decode(errors='ignore'))
  230. self._force_line_print = False
  231. force_print_or_matched = any((
  232. self._force_line_print,
  233. (finalize_line and line_matcher.match(self._last_line_part.decode(errors='ignore')))
  234. ))
  235. if self._last_line_part != b'' and force_print_or_matched:
  236. self._force_line_print = True
  237. self.logger.print(self._last_line_part)