coredump.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import io
  4. import os
  5. import queue
  6. import tempfile
  7. from contextlib import contextmanager, redirect_stdout
  8. from typing import Generator
  9. from .constants import TAG_KEY
  10. from .logger import Logger
  11. from .output_helpers import yellow_print
  12. from .web_socket_client import WebSocketClient
  13. # coredump related messages
  14. COREDUMP_UART_START = b'================= CORE DUMP START ================='
  15. COREDUMP_UART_END = b'================= CORE DUMP END ================='
  16. COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...'
  17. # coredump states
  18. COREDUMP_IDLE = 0
  19. COREDUMP_READING = 1
  20. COREDUMP_DONE = 2
  21. # coredump decoding options
  22. COREDUMP_DECODE_DISABLE = 'disable'
  23. COREDUMP_DECODE_INFO = 'info'
  24. class CoreDump:
  25. def __init__(self, decode_coredumps, event_queue, logger, websocket_client, elf_file):
  26. # type: (str, queue.Queue, Logger, WebSocketClient, str) -> None
  27. self._coredump_buffer = b''
  28. self._decode_coredumps = decode_coredumps
  29. self.event_queue = event_queue
  30. self._reading_coredump = COREDUMP_IDLE
  31. self.logger = logger
  32. self.websocket_client = websocket_client
  33. self.elf_file = elf_file
  34. @property
  35. def in_progress(self) -> bool:
  36. return bool(self._coredump_buffer)
  37. def _process_coredump(self): # type: () -> None
  38. if self._decode_coredumps != COREDUMP_DECODE_INFO:
  39. raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps)
  40. coredump_file = None
  41. # On Windows, the temporary file can't be read unless it is closed.
  42. # Set delete=False and delete the file manually later.
  43. with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
  44. coredump_file.write(self._coredump_buffer)
  45. coredump_file.flush()
  46. if self.websocket_client:
  47. self.logger.output_enabled = True
  48. yellow_print('Communicating through WebSocket')
  49. self.websocket_client.send({'event': 'coredump',
  50. 'file': coredump_file.name,
  51. 'prog': self.elf_file})
  52. yellow_print('Waiting for debug finished event')
  53. self.websocket_client.wait([('event', 'debug_finished')])
  54. yellow_print('Communications through WebSocket is finished')
  55. else:
  56. try:
  57. import esp_coredump
  58. except ImportError as e:
  59. yellow_print('Failed to parse core dump info: '
  60. 'Module {} is not installed \n\n'.format(e.name))
  61. self.logger.output_enabled = True
  62. self.logger.print(COREDUMP_UART_START + b'\n')
  63. self.logger.print(self._coredump_buffer)
  64. # end line will be printed in handle_serial_input
  65. else:
  66. coredump = esp_coredump.CoreDump(core=coredump_file.name, core_format='b64', prog=self.elf_file)
  67. f = io.StringIO()
  68. with redirect_stdout(f):
  69. coredump.info_corefile()
  70. output = f.getvalue()
  71. self.logger.output_enabled = True
  72. self.logger.print(output.encode('utf-8'))
  73. self.logger.output_enabled = False # Will be reenabled in check_coredump_trigger_after_print
  74. if coredump_file is not None:
  75. try:
  76. os.unlink(coredump_file.name)
  77. except OSError as e:
  78. yellow_print('Couldn\'t remote temporary core dump file ({})'.format(e))
  79. def _check_coredump_trigger_before_print(self, line): # type: (bytes) -> None
  80. if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
  81. return
  82. if COREDUMP_UART_PROMPT in line:
  83. yellow_print('Initiating core dump!')
  84. self.event_queue.put((TAG_KEY, '\n'))
  85. return
  86. if COREDUMP_UART_START in line:
  87. yellow_print('Core dump started (further output muted)')
  88. self._reading_coredump = COREDUMP_READING
  89. self._coredump_buffer = b''
  90. self.logger.output_enabled = False
  91. return
  92. if COREDUMP_UART_END in line:
  93. self._reading_coredump = COREDUMP_DONE
  94. yellow_print('\nCore dump finished!')
  95. self._process_coredump()
  96. return
  97. if self._reading_coredump == COREDUMP_READING:
  98. kb = 1024
  99. buffer_len_kb = len(self._coredump_buffer) // kb
  100. self._coredump_buffer += line.replace(b'\r', b'') + b'\n'
  101. new_buffer_len_kb = len(self._coredump_buffer) // kb
  102. if new_buffer_len_kb > buffer_len_kb:
  103. yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r')
  104. def _check_coredump_trigger_after_print(self): # type: () -> None
  105. if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
  106. return
  107. # Re-enable output after the last line of core dump has been consumed
  108. if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE:
  109. self._reading_coredump = COREDUMP_IDLE
  110. self.logger.output_enabled = True
  111. self._coredump_buffer = b''
  112. @contextmanager
  113. def check(self, line): # type: (bytes) -> Generator
  114. self._check_coredump_trigger_before_print(line)
  115. try:
  116. yield
  117. finally:
  118. self._check_coredump_trigger_after_print()