panic_dut.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. import logging
  4. import os
  5. import re
  6. import subprocess
  7. import sys
  8. from typing import Any, Dict, List, Optional, TextIO, Union
  9. import pexpect
  10. from panic_utils import NoGdbProcessError, attach_logger, quote_string, sha256, verify_valid_gdb_subprocess
  11. from pygdbmi.gdbcontroller import GdbController
  12. from pytest_embedded_idf.app import IdfApp
  13. from pytest_embedded_idf.dut import IdfDut
  14. from pytest_embedded_idf.serial import IdfSerial
  15. class PanicTestDut(IdfDut):
  16. BOOT_CMD_ADDR = 0x9000
  17. BOOT_CMD_SIZE = 0x1000
  18. DEFAULT_EXPECT_TIMEOUT = 10
  19. COREDUMP_UART_START = '================= CORE DUMP START ================='
  20. COREDUMP_UART_END = '================= CORE DUMP END ================='
  21. app: IdfApp
  22. serial: IdfSerial
  23. def __init__(self, *args: Any, **kwargs: Any) -> None:
  24. super().__init__(*args, **kwargs)
  25. self.gdbmi: Optional[GdbController] = None
  26. # record this since pygdbmi is using logging.debug to generate some single character mess
  27. self.log_level = logging.getLogger().level
  28. # pygdbmi is using logging.debug to generate some single character mess
  29. if self.log_level <= logging.DEBUG:
  30. logging.getLogger().setLevel(logging.INFO)
  31. self.coredump_output: Optional[TextIO] = None
  32. def close(self) -> None:
  33. if self.gdbmi:
  34. logging.info('Waiting for GDB to exit')
  35. self.gdbmi.exit()
  36. super().close()
  37. def revert_log_level(self) -> None:
  38. logging.getLogger().setLevel(self.log_level)
  39. @property
  40. def is_xtensa(self) -> bool:
  41. return self.target in self.XTENSA_TARGETS
  42. def run_test_func(self, test_func_name: str) -> None:
  43. self.expect_exact('Enter test name:')
  44. self.write(test_func_name)
  45. self.expect_exact('Got test name: ' + test_func_name)
  46. def expect_none(self, pattern, **kwargs) -> None: # type: ignore
  47. """like dut.expect_all, but with an inverse logic"""
  48. if 'timeout' not in kwargs:
  49. kwargs['timeout'] = 1
  50. try:
  51. res = self.expect(pattern, **kwargs)
  52. raise AssertionError(f'Unexpected: {res.group().decode("utf8")}')
  53. except pexpect.TIMEOUT:
  54. pass
  55. def expect_backtrace(self) -> None:
  56. assert self.is_xtensa, 'Backtrace can be printed only on Xtensa'
  57. match = self.expect(r'Backtrace:( 0x[0-9a-fA-F]{8}:0x[0-9a-fA-F]{8})+(?P<corrupted> \|<-CORRUPTED)?')
  58. assert not match.group('corrupted')
  59. def expect_corrupted_backtrace(self) -> None:
  60. assert self.is_xtensa, 'Backtrace can be printed only on Xtensa'
  61. self.expect_exact('Backtrace:')
  62. self.expect_exact('CORRUPTED')
  63. def expect_stack_dump(self) -> None:
  64. assert not self.is_xtensa, 'Stack memory dump is only printed on RISC-V'
  65. self.expect_exact('Stack memory:')
  66. def expect_gme(self, reason: str) -> None:
  67. """Expect method for Guru Meditation Errors"""
  68. self.expect_exact(f"Guru Meditation Error: Core 0 panic'ed ({reason})")
  69. def expect_reg_dump(self, core: int = 0) -> None:
  70. """Expect method for the register dump"""
  71. self.expect(r'Core\s+%d register dump:' % core)
  72. def expect_cpu_reset(self) -> None:
  73. # no digital system reset for panic handling restarts (see IDF-7255)
  74. self.expect(r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU)')
  75. def expect_elf_sha256(self) -> None:
  76. """Expect method for ELF SHA256 line"""
  77. elf_sha256 = sha256(self.app.elf_file)
  78. elf_sha256_len = int(
  79. self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9')
  80. )
  81. self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len])
  82. def expect_coredump(self, output_file_name: str, patterns: List[Union[str, re.Pattern]]) -> None:
  83. with open(output_file_name, 'r') as file:
  84. coredump = file.read()
  85. for pattern in patterns:
  86. if isinstance(pattern, str):
  87. position = coredump.find(pattern)
  88. assert position != -1, f"'{pattern}' not found in the coredump output"
  89. elif isinstance(pattern, re.Pattern):
  90. match = pattern.findall(coredump)
  91. assert match, f"'{pattern.pattern}' not found in the coredump output"
  92. else:
  93. raise ValueError(f'Unsupported input type: {type(pattern).__name__}')
  94. def _call_espcoredump(
  95. self, extra_args: List[str], coredump_file_name: str, output_file_name: str
  96. ) -> None:
  97. # no "with" here, since we need the file to be open for later inspection by the test case
  98. if not self.coredump_output:
  99. self.coredump_output = open(output_file_name, 'w')
  100. espcoredump_script = os.path.join(
  101. os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py'
  102. )
  103. espcoredump_args = [
  104. sys.executable,
  105. espcoredump_script,
  106. 'info_corefile',
  107. '--core',
  108. coredump_file_name,
  109. ]
  110. espcoredump_args += extra_args
  111. espcoredump_args.append(self.app.elf_file)
  112. logging.info('Running %s', ' '.join(espcoredump_args))
  113. logging.info('espcoredump output is written to %s', self.coredump_output.name)
  114. subprocess.check_call(espcoredump_args, stdout=self.coredump_output)
  115. self.coredump_output.flush()
  116. self.coredump_output.seek(0)
  117. def process_coredump_uart(self, expected: Optional[List[Union[str, re.Pattern]]] = None) -> None:
  118. """Extract the core dump from UART output of the test, run espcoredump on it"""
  119. self.expect(self.COREDUMP_UART_START)
  120. res = self.expect('(.+)' + self.COREDUMP_UART_END)
  121. coredump_base64 = res.group(1).decode('utf8')
  122. with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file:
  123. logging.info('Writing UART base64 core dump to %s', coredump_file.name)
  124. coredump_file.write(coredump_base64)
  125. output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt')
  126. self._call_espcoredump(
  127. ['--core-format', 'b64'], coredump_file.name, output_file_name
  128. )
  129. if expected:
  130. self.expect_coredump(output_file_name, expected)
  131. def process_coredump_flash(self, expected: Optional[List[Union[str, re.Pattern]]] = None) -> None:
  132. """Extract the core dump from flash, run espcoredump on it"""
  133. coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin')
  134. logging.info('Writing flash binary core dump to %s', coredump_file_name)
  135. self.serial.dump_flash(partition='coredump', output=coredump_file_name)
  136. output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt')
  137. self._call_espcoredump(
  138. ['--core-format', 'raw'], coredump_file_name, output_file_name
  139. )
  140. if expected:
  141. self.expect_coredump(output_file_name, expected)
  142. def gdb_write(self, command: str) -> Any:
  143. """
  144. Wrapper to write to gdb with a longer timeout, as test runner
  145. host can be slow sometimes
  146. """
  147. assert self.gdbmi, 'This function should be called only after start_gdb'
  148. return self.gdbmi.write(command, timeout_sec=10)
  149. def start_gdb(self) -> None:
  150. """
  151. Runs GDB and connects it to the "serial" port of the DUT.
  152. After this, the DUT expect methods can no longer be used to capture output.
  153. """
  154. gdb_args = ['--nx', '--quiet', '--interpreter=mi2']
  155. if self.is_xtensa:
  156. gdb_path = 'xtensa-esp-elf-gdb-no-python' # TODO: GCC-311
  157. gdb_args = [f'--mcpu={self.target}'] + gdb_args
  158. else:
  159. gdb_path = 'riscv32-esp-elf-gdb-no-python' # TODO: GCC-311
  160. try:
  161. from pygdbmi.constants import GdbTimeoutError
  162. gdb_command = [gdb_path] + gdb_args
  163. self.gdbmi = GdbController(command=gdb_command)
  164. pygdbmi_logger = attach_logger()
  165. except ImportError:
  166. # fallback for pygdbmi<0.10.0.0.
  167. from pygdbmi.gdbcontroller import GdbTimeoutError
  168. self.gdbmi = GdbController(gdb_path=gdb_path, gdb_args=gdb_args)
  169. pygdbmi_logger = self.gdbmi.logger
  170. # pygdbmi logs to console by default, make it log to a file instead
  171. pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt')
  172. pygdbmi_logger.setLevel(logging.DEBUG)
  173. while pygdbmi_logger.hasHandlers():
  174. pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
  175. log_handler = logging.FileHandler(pygdbmi_log_file_name)
  176. log_handler.setFormatter(
  177. logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
  178. )
  179. logging.info(f'Saving pygdbmi logs to {pygdbmi_log_file_name}')
  180. pygdbmi_logger.addHandler(log_handler)
  181. try:
  182. gdb_command = self.gdbmi.command
  183. except AttributeError:
  184. # fallback for pygdbmi < 0.10
  185. gdb_command = self.gdbmi.cmd
  186. logging.info(f'Running command: "{" ".join(quote_string(c) for c in gdb_command)}"')
  187. for _ in range(10):
  188. try:
  189. # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that
  190. # an RPI under high load will get non-responsive during creating a lot of processes.
  191. if not hasattr(self.gdbmi, 'verify_valid_gdb_subprocess'):
  192. # for pygdbmi >= 0.10.0.0
  193. verify_valid_gdb_subprocess(self.gdbmi.gdb_process)
  194. resp = self.gdbmi.get_gdb_response(
  195. timeout_sec=10
  196. ) # calls verify_valid_gdb_subprocess() internally for pygdbmi < 0.10.0.0
  197. # it will be interesting to look up this response if the next GDB command fails (times out)
  198. logging.info('GDB response: %s', resp)
  199. break # success
  200. except GdbTimeoutError:
  201. logging.warning(
  202. 'GDB internal error: cannot get response from the subprocess'
  203. )
  204. except NoGdbProcessError:
  205. logging.error('GDB internal error: process is not running')
  206. break # failure - TODO: create another GdbController
  207. except ValueError:
  208. logging.error(
  209. 'GDB internal error: select() returned an unexpected file number'
  210. )
  211. # Set up logging for GDB remote protocol
  212. gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt')
  213. self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name)
  214. # Load the ELF file
  215. self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file))
  216. # Connect GDB to UART
  217. self.serial.close()
  218. logging.info('Connecting to GDB Stub...')
  219. self.gdb_write('-gdb-set serial baud 115200')
  220. if sys.platform == 'darwin':
  221. assert '/dev/tty.' not in self.serial.port, \
  222. '/dev/tty.* ports can\'t be used with GDB on macOS. Use with /dev/cu.* instead.'
  223. # Make sure we get the 'stopped' notification
  224. responses = self.gdb_write('-target-select remote ' + self.serial.port)
  225. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  226. retries = 3
  227. while not stop_response and retries > 0:
  228. logging.info('Sending -exec-interrupt')
  229. responses = self.gdb_write('-exec-interrupt')
  230. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  231. retries -= 1
  232. frame = stop_response['payload']['frame']
  233. if 'file' not in frame:
  234. frame['file'] = '?'
  235. if 'line' not in frame:
  236. frame['line'] = '?'
  237. logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame))
  238. # Drain remaining responses
  239. self.gdbmi.get_gdb_response(raise_error_on_timeout=False)
  240. def gdb_backtrace(self) -> Any:
  241. """
  242. Returns the list of stack frames for the current thread.
  243. Each frame is a dictionary, refer to pygdbmi docs for the format.
  244. """
  245. assert self.gdbmi
  246. responses = self.gdb_write('-stack-list-frames')
  247. return self.find_gdb_response('done', 'result', responses)['payload']['stack']
  248. @staticmethod
  249. def verify_gdb_backtrace(
  250. gdb_backtrace: List[Any], expected_functions_list: List[Any]
  251. ) -> None:
  252. """
  253. Raises an assert if the function names listed in expected_functions_list do not match the backtrace
  254. given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
  255. function.
  256. """
  257. actual_functions_list = [frame['func'] for frame in gdb_backtrace]
  258. if actual_functions_list != expected_functions_list:
  259. logging.error(f'Expected backtrace: {expected_functions_list}')
  260. logging.error(f'Actual backtrace: {actual_functions_list}')
  261. assert False, 'Got unexpected backtrace'
  262. @staticmethod
  263. def find_gdb_response(
  264. message: str, response_type: str, responses: List[Any]
  265. ) -> Any:
  266. """
  267. Helper function which extracts one response from an array of GDB responses, filtering
  268. by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.
  269. """
  270. def match_response(response: Dict[str, Any]) -> bool:
  271. return response['message'] == message and response['type'] == response_type # type: ignore
  272. filtered_responses = [r for r in responses if match_response(r)]
  273. if not filtered_responses:
  274. return None
  275. return filtered_responses[0]