panic_dut.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. import logging
  4. import os
  5. import subprocess
  6. import sys
  7. from typing import Any, Dict, List, TextIO
  8. import pexpect
  9. from panic_utils import NoGdbProcessError, attach_logger, quote_string, sha256, verify_valid_gdb_subprocess
  10. from pygdbmi.gdbcontroller import GdbController
  11. from pytest_embedded_idf.app import IdfApp
  12. from pytest_embedded_idf.dut import IdfDut
  13. from pytest_embedded_idf.serial import IdfSerial
  14. class PanicTestDut(IdfDut):
  15. BOOT_CMD_ADDR = 0x9000
  16. BOOT_CMD_SIZE = 0x1000
  17. DEFAULT_EXPECT_TIMEOUT = 10
  18. COREDUMP_UART_START = '================= CORE DUMP START ================='
  19. COREDUMP_UART_END = '================= CORE DUMP END ================='
  20. app: IdfApp
  21. serial: IdfSerial
  22. def __init__(self, *args, **kwargs) -> None: # type: ignore
  23. super().__init__(*args, **kwargs)
  24. self.gdb: GdbController = None # type: ignore
  25. # record this since pygdbmi is using logging.debug to generate some single character mess
  26. self.log_level = logging.getLogger().level
  27. # pygdbmi is using logging.debug to generate some single character mess
  28. if self.log_level <= logging.DEBUG:
  29. logging.getLogger().setLevel(logging.INFO)
  30. self.coredump_output: TextIO = None # type: ignore
  31. def close(self) -> None:
  32. if self.gdb:
  33. self.gdb.exit()
  34. super().close()
  35. def revert_log_level(self) -> None:
  36. logging.getLogger().setLevel(self.log_level)
  37. def expect_test_func_name(self, test_func_name: str) -> None:
  38. self.expect_exact('Enter test name:')
  39. self.write(test_func_name)
  40. self.expect_exact('Got test name: ' + test_func_name)
  41. def expect_none(self, pattern, **kwargs) -> None: # type: ignore
  42. """like dut.expect_all, but with an inverse logic"""
  43. if 'timeout' not in kwargs:
  44. kwargs['timeout'] = 1
  45. try:
  46. res = self.expect(pattern, **kwargs)
  47. raise AssertionError(f'Unexpected: {res.group().decode("utf8")}')
  48. except pexpect.TIMEOUT:
  49. pass
  50. def expect_backtrace(self) -> None:
  51. self.expect_exact('Backtrace:')
  52. self.expect_none('CORRUPTED')
  53. def expect_gme(self, reason: str) -> None:
  54. """Expect method for Guru Meditation Errors"""
  55. self.expect_exact(f"Guru Meditation Error: Core 0 panic'ed ({reason})")
  56. def expect_reg_dump(self, core: int = 0) -> None:
  57. """Expect method for the register dump"""
  58. self.expect(r'Core\s+%d register dump:' % core)
  59. def expect_elf_sha256(self) -> None:
  60. """Expect method for ELF SHA256 line"""
  61. elf_sha256 = sha256(self.app.elf_file)
  62. elf_sha256_len = int(
  63. self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '16')
  64. )
  65. self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len])
  66. def _call_espcoredump(
  67. self, extra_args: List[str], coredump_file_name: str, output_file_name: str
  68. ) -> None:
  69. # no "with" here, since we need the file to be open for later inspection by the test case
  70. if not self.coredump_output:
  71. self.coredump_output = open(output_file_name, 'w')
  72. espcoredump_script = os.path.join(
  73. os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py'
  74. )
  75. espcoredump_args = [
  76. sys.executable,
  77. espcoredump_script,
  78. 'info_corefile',
  79. '--core',
  80. coredump_file_name,
  81. ]
  82. espcoredump_args += extra_args
  83. espcoredump_args.append(self.app.elf_file)
  84. logging.info('Running %s', ' '.join(espcoredump_args))
  85. logging.info('espcoredump output is written to %s', self.coredump_output.name)
  86. subprocess.check_call(espcoredump_args, stdout=self.coredump_output)
  87. self.coredump_output.flush()
  88. self.coredump_output.seek(0)
  89. def process_coredump_uart(self) -> None:
  90. """Extract the core dump from UART output of the test, run espcoredump on it"""
  91. self.expect(self.COREDUMP_UART_START)
  92. res = self.expect('(.+)' + self.COREDUMP_UART_END)
  93. coredump_base64 = res.group(1).decode('utf8')
  94. with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file:
  95. logging.info('Writing UART base64 core dump to %s', coredump_file.name)
  96. coredump_file.write(coredump_base64)
  97. output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt')
  98. self._call_espcoredump(
  99. ['--core-format', 'b64'], coredump_file.name, output_file_name
  100. )
  101. def process_coredump_flash(self) -> None:
  102. """Extract the core dump from flash, run espcoredump on it"""
  103. coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin')
  104. logging.info('Writing flash binary core dump to %s', coredump_file_name)
  105. self.serial.dump_flash(partition='coredump', output=coredump_file_name)
  106. output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt')
  107. self._call_espcoredump(
  108. ['--core-format', 'raw'], coredump_file_name, output_file_name
  109. )
  110. def gdb_write(self, command: str) -> Any:
  111. """
  112. Wrapper to write to gdb with a longer timeout, as test runner
  113. host can be slow sometimes
  114. """
  115. return self.gdb.write(command, timeout_sec=10)
  116. def start_gdb(self) -> None:
  117. """
  118. Runs GDB and connects it to the "serial" port of the DUT.
  119. After this, the DUT expect methods can no longer be used to capture output.
  120. """
  121. gdb_path = self.toolchain_prefix + 'gdb'
  122. try:
  123. from pygdbmi.constants import GdbTimeoutError
  124. default_gdb_args = ['--nx', '--quiet', '--interpreter=mi2']
  125. gdb_command = [gdb_path] + default_gdb_args
  126. self.gdb = GdbController(command=gdb_command)
  127. pygdbmi_logger = attach_logger()
  128. except ImportError:
  129. # fallback for pygdbmi<0.10.0.0.
  130. from pygdbmi.gdbcontroller import GdbTimeoutError
  131. self.gdb = GdbController(gdb_path=gdb_path)
  132. pygdbmi_logger = self.gdb.logger
  133. # pygdbmi logs to console by default, make it log to a file instead
  134. pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt')
  135. pygdbmi_logger.setLevel(logging.DEBUG)
  136. while pygdbmi_logger.hasHandlers():
  137. pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
  138. log_handler = logging.FileHandler(pygdbmi_log_file_name)
  139. log_handler.setFormatter(
  140. logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
  141. )
  142. pygdbmi_logger.addHandler(log_handler)
  143. try:
  144. gdb_command = self.gdb.command
  145. except AttributeError:
  146. # fallback for pygdbmi < 0.10
  147. gdb_command = self.gdb.cmd
  148. logging.info(f'Running command: "{" ".join(quote_string(c) for c in gdb_command)}"')
  149. for _ in range(10):
  150. try:
  151. # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that
  152. # an RPI under high load will get non-responsive during creating a lot of processes.
  153. if not hasattr(self.gdb, 'verify_valid_gdb_subprocess'):
  154. # for pygdbmi >= 0.10.0.0
  155. verify_valid_gdb_subprocess(self.gdb.gdb_process)
  156. resp = self.gdb.get_gdb_response(
  157. timeout_sec=10
  158. ) # calls verify_valid_gdb_subprocess() internally for pygdbmi < 0.10.0.0
  159. # it will be interesting to look up this response if the next GDB command fails (times out)
  160. logging.info('GDB response: %s', resp)
  161. break # success
  162. except GdbTimeoutError:
  163. logging.warning(
  164. 'GDB internal error: cannot get response from the subprocess'
  165. )
  166. except NoGdbProcessError:
  167. logging.error('GDB internal error: process is not running')
  168. break # failure - TODO: create another GdbController
  169. except ValueError:
  170. logging.error(
  171. 'GDB internal error: select() returned an unexpected file number'
  172. )
  173. # Set up logging for GDB remote protocol
  174. gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt')
  175. self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name)
  176. # Load the ELF file
  177. self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file))
  178. # Connect GDB to UART
  179. self.serial.proc.close()
  180. logging.info('Connecting to GDB Stub...')
  181. self.gdb_write('-gdb-set serial baud 115200')
  182. responses = self.gdb_write('-target-select remote ' + self.serial.port)
  183. # Make sure we get the 'stopped' notification
  184. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  185. if not stop_response:
  186. responses = self.gdb_write('-exec-interrupt')
  187. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  188. assert stop_response
  189. frame = stop_response['payload']['frame']
  190. if 'file' not in frame:
  191. frame['file'] = '?'
  192. if 'line' not in frame:
  193. frame['line'] = '?'
  194. logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame))
  195. # Drain remaining responses
  196. self.gdb.get_gdb_response(raise_error_on_timeout=False)
  197. def gdb_backtrace(self) -> Any:
  198. """
  199. Returns the list of stack frames for the current thread.
  200. Each frame is a dictionary, refer to pygdbmi docs for the format.
  201. """
  202. assert self.gdb
  203. responses = self.gdb_write('-stack-list-frames')
  204. return self.find_gdb_response('done', 'result', responses)['payload']['stack']
  205. @staticmethod
  206. def match_backtrace(
  207. gdb_backtrace: List[Any], expected_functions_list: List[Any]
  208. ) -> bool:
  209. """
  210. Returns True if the function names listed in expected_functions_list match the backtrace
  211. given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
  212. function.
  213. """
  214. return all(
  215. [
  216. frame['func'] == expected_functions_list[i]
  217. for i, frame in enumerate(gdb_backtrace)
  218. ]
  219. )
  220. @staticmethod
  221. def find_gdb_response(
  222. message: str, response_type: str, responses: List[Any]
  223. ) -> Any:
  224. """
  225. Helper function which extracts one response from an array of GDB responses, filtering
  226. by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.
  227. """
  228. def match_response(response: Dict[str, Any]) -> bool:
  229. return response['message'] == message and response['type'] == response_type # type: ignore
  230. filtered_responses = [r for r in responses if match_response(r)]
  231. if not filtered_responses:
  232. return None
  233. return filtered_responses[0]