conftest.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. # pylint: disable=W0621 # redefined-outer-name
  4. import logging
  5. import os
  6. import subprocess
  7. import sys
  8. from typing import Any, Dict, List, TextIO
  9. import pexpect
  10. import pytest
  11. from _pytest.fixtures import FixtureRequest
  12. from _pytest.monkeypatch import MonkeyPatch
  13. from panic_utils import NoGdbProcessError, attach_logger, quote_string, sha256, verify_valid_gdb_subprocess
  14. from pygdbmi.gdbcontroller import GdbController
  15. from pytest_embedded_idf.app import IdfApp
  16. from pytest_embedded_idf.dut import IdfDut
  17. from pytest_embedded_idf.serial import IdfSerial
  18. class PanicTestDut(IdfDut):
  19. BOOT_CMD_ADDR = 0x9000
  20. BOOT_CMD_SIZE = 0x1000
  21. DEFAULT_EXPECT_TIMEOUT = 10
  22. COREDUMP_UART_START = '================= CORE DUMP START ================='
  23. COREDUMP_UART_END = '================= CORE DUMP END ================='
  24. app: IdfApp
  25. serial: IdfSerial
  26. def __init__(self, *args, **kwargs) -> None: # type: ignore
  27. super().__init__(*args, **kwargs)
  28. self.gdb: GdbController = None # type: ignore
  29. # record this since pygdbmi is using logging.debug to generate some single character mess
  30. self.log_level = logging.getLogger().level
  31. # pygdbmi is using logging.debug to generate some single character mess
  32. if self.log_level <= logging.DEBUG:
  33. logging.getLogger().setLevel(logging.INFO)
  34. self.coredump_output: TextIO = None # type: ignore
  35. def close(self) -> None:
  36. if self.gdb:
  37. self.gdb.exit()
  38. super().close()
  39. def revert_log_level(self) -> None:
  40. logging.getLogger().setLevel(self.log_level)
  41. def expect_test_func_name(self, test_func_name: str) -> None:
  42. self.expect_exact('Enter test name:')
  43. self.write(test_func_name)
  44. self.expect_exact('Got test name: ' + test_func_name)
  45. def expect_none(self, pattern, **kwargs) -> None: # type: ignore
  46. """like dut.expect_all, but with an inverse logic"""
  47. if 'timeout' not in kwargs:
  48. kwargs['timeout'] = 1
  49. try:
  50. res = self.expect(pattern, **kwargs)
  51. raise AssertionError(f'Unexpected: {res.group().decode("utf8")}')
  52. except pexpect.TIMEOUT:
  53. pass
  54. def expect_backtrace(self) -> None:
  55. self.expect_exact('Backtrace:')
  56. self.expect_none('CORRUPTED')
  57. def expect_gme(self, reason: str) -> None:
  58. """Expect method for Guru Meditation Errors"""
  59. self.expect_exact(f"Guru Meditation Error: Core 0 panic'ed ({reason})")
  60. def expect_reg_dump(self, core: int = 0) -> None:
  61. """Expect method for the register dump"""
  62. self.expect(r'Core\s+%d register dump:' % core)
  63. def expect_elf_sha256(self) -> None:
  64. """Expect method for ELF SHA256 line"""
  65. elf_sha256 = sha256(self.app.elf_file)
  66. elf_sha256_len = int(
  67. self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '16')
  68. )
  69. self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len])
  70. def _call_espcoredump(
  71. self, extra_args: List[str], coredump_file_name: str, output_file_name: str
  72. ) -> None:
  73. # no "with" here, since we need the file to be open for later inspection by the test case
  74. if not self.coredump_output:
  75. self.coredump_output = open(output_file_name, 'w')
  76. espcoredump_script = os.path.join(
  77. os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py'
  78. )
  79. espcoredump_args = [
  80. sys.executable,
  81. espcoredump_script,
  82. 'info_corefile',
  83. '--core',
  84. coredump_file_name,
  85. ]
  86. espcoredump_args += extra_args
  87. espcoredump_args.append(self.app.elf_file)
  88. logging.info('Running %s', ' '.join(espcoredump_args))
  89. logging.info('espcoredump output is written to %s', self.coredump_output.name)
  90. subprocess.check_call(espcoredump_args, stdout=self.coredump_output)
  91. self.coredump_output.flush()
  92. self.coredump_output.seek(0)
  93. def process_coredump_uart(self) -> None:
  94. """Extract the core dump from UART output of the test, run espcoredump on it"""
  95. self.expect(self.COREDUMP_UART_START)
  96. res = self.expect('(.+)' + self.COREDUMP_UART_END)
  97. coredump_base64 = res.group(1).decode('utf8')
  98. with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file:
  99. logging.info('Writing UART base64 core dump to %s', coredump_file.name)
  100. coredump_file.write(coredump_base64)
  101. output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt')
  102. self._call_espcoredump(
  103. ['--core-format', 'b64'], coredump_file.name, output_file_name
  104. )
  105. def process_coredump_flash(self) -> None:
  106. """Extract the core dump from flash, run espcoredump on it"""
  107. coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin')
  108. logging.info('Writing flash binary core dump to %s', coredump_file_name)
  109. self.serial.dump_flash(partition='coredump', output=coredump_file_name)
  110. output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt')
  111. self._call_espcoredump(
  112. ['--core-format', 'raw'], coredump_file_name, output_file_name
  113. )
  114. def gdb_write(self, command: str) -> Any:
  115. """
  116. Wrapper to write to gdb with a longer timeout, as test runner
  117. host can be slow sometimes
  118. """
  119. return self.gdb.write(command, timeout_sec=10)
  120. def start_gdb(self) -> None:
  121. """
  122. Runs GDB and connects it to the "serial" port of the DUT.
  123. After this, the DUT expect methods can no longer be used to capture output.
  124. """
  125. gdb_path = self.toolchain_prefix + 'gdb'
  126. try:
  127. from pygdbmi.constants import GdbTimeoutError
  128. default_gdb_args = ['--nx', '--quiet', '--interpreter=mi2']
  129. gdb_command = [gdb_path] + default_gdb_args
  130. self.gdb = GdbController(command=gdb_command)
  131. pygdbmi_logger = attach_logger()
  132. except ImportError:
  133. # fallback for pygdbmi<0.10.0.0.
  134. from pygdbmi.gdbcontroller import GdbTimeoutError
  135. self.gdb = GdbController(gdb_path=gdb_path)
  136. pygdbmi_logger = self.gdb.logger
  137. # pygdbmi logs to console by default, make it log to a file instead
  138. pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt')
  139. pygdbmi_logger.setLevel(logging.DEBUG)
  140. while pygdbmi_logger.hasHandlers():
  141. pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
  142. log_handler = logging.FileHandler(pygdbmi_log_file_name)
  143. log_handler.setFormatter(
  144. logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
  145. )
  146. pygdbmi_logger.addHandler(log_handler)
  147. try:
  148. gdb_command = self.gdb.command
  149. except AttributeError:
  150. # fallback for pygdbmi < 0.10
  151. gdb_command = self.gdb.cmd
  152. logging.info(f'Running command: "{" ".join(quote_string(c) for c in gdb_command)}"')
  153. for _ in range(10):
  154. try:
  155. # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that
  156. # an RPI under high load will get non-responsive during creating a lot of processes.
  157. if not hasattr(self.gdb, 'verify_valid_gdb_subprocess'):
  158. # for pygdbmi >= 0.10.0.0
  159. verify_valid_gdb_subprocess(self.gdb.gdb_process)
  160. resp = self.gdb.get_gdb_response(
  161. timeout_sec=10
  162. ) # calls verify_valid_gdb_subprocess() internally for pygdbmi < 0.10.0.0
  163. # it will be interesting to look up this response if the next GDB command fails (times out)
  164. logging.info('GDB response: %s', resp)
  165. break # success
  166. except GdbTimeoutError:
  167. logging.warning(
  168. 'GDB internal error: cannot get response from the subprocess'
  169. )
  170. except NoGdbProcessError:
  171. logging.error('GDB internal error: process is not running')
  172. break # failure - TODO: create another GdbController
  173. except ValueError:
  174. logging.error(
  175. 'GDB internal error: select() returned an unexpected file number'
  176. )
  177. # Set up logging for GDB remote protocol
  178. gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt')
  179. self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name)
  180. # Load the ELF file
  181. self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file))
  182. # Connect GDB to UART
  183. self.serial.proc.close()
  184. logging.info('Connecting to GDB Stub...')
  185. self.gdb_write('-gdb-set serial baud 115200')
  186. responses = self.gdb_write('-target-select remote ' + self.serial.port)
  187. # Make sure we get the 'stopped' notification
  188. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  189. if not stop_response:
  190. responses = self.gdb_write('-exec-interrupt')
  191. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  192. assert stop_response
  193. frame = stop_response['payload']['frame']
  194. if 'file' not in frame:
  195. frame['file'] = '?'
  196. if 'line' not in frame:
  197. frame['line'] = '?'
  198. logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame))
  199. # Drain remaining responses
  200. self.gdb.get_gdb_response(raise_error_on_timeout=False)
  201. def gdb_backtrace(self) -> Any:
  202. """
  203. Returns the list of stack frames for the current thread.
  204. Each frame is a dictionary, refer to pygdbmi docs for the format.
  205. """
  206. assert self.gdb
  207. responses = self.gdb_write('-stack-list-frames')
  208. return self.find_gdb_response('done', 'result', responses)['payload']['stack']
  209. @staticmethod
  210. def match_backtrace(
  211. gdb_backtrace: List[Any], expected_functions_list: List[Any]
  212. ) -> bool:
  213. """
  214. Returns True if the function names listed in expected_functions_list match the backtrace
  215. given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
  216. function.
  217. """
  218. return all(
  219. [
  220. frame['func'] == expected_functions_list[i]
  221. for i, frame in enumerate(gdb_backtrace)
  222. ]
  223. )
  224. @staticmethod
  225. def find_gdb_response(
  226. message: str, response_type: str, responses: List[Any]
  227. ) -> Any:
  228. """
  229. Helper function which extracts one response from an array of GDB responses, filtering
  230. by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.
  231. """
  232. def match_response(response: Dict[str, Any]) -> bool:
  233. return response['message'] == message and response['type'] == response_type # type: ignore
  234. filtered_responses = [r for r in responses if match_response(r)]
  235. if not filtered_responses:
  236. return None
  237. return filtered_responses[0]
  238. @pytest.fixture(scope='module')
  239. def monkeypatch_module(request: FixtureRequest) -> MonkeyPatch:
  240. mp = MonkeyPatch()
  241. request.addfinalizer(mp.undo)
  242. return mp
  243. @pytest.fixture(scope='module', autouse=True)
  244. def replace_dut_class(monkeypatch_module: MonkeyPatch) -> None:
  245. monkeypatch_module.setattr('pytest_embedded_idf.dut.IdfDut', PanicTestDut)