conftest.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. # pylint: disable=W0621 # redefined-outer-name
  4. import hashlib
  5. import logging
  6. import os
  7. import subprocess
  8. import sys
  9. from typing import Any, Dict, List, TextIO
  10. import pexpect
  11. import pytest
  12. from _pytest.fixtures import FixtureRequest
  13. from _pytest.monkeypatch import MonkeyPatch
  14. from pygdbmi.gdbcontroller import GdbController, GdbTimeoutError, NoGdbProcessError
  15. from pytest_embedded_idf.app import IdfApp
  16. from pytest_embedded_idf.dut import IdfDut
  17. from pytest_embedded_idf.serial import IdfSerial
  18. def sha256(file: str) -> str:
  19. res = hashlib.sha256()
  20. with open(file, 'rb') as fr:
  21. res.update(fr.read())
  22. return res.hexdigest()
  23. class PanicTestDut(IdfDut):
  24. BOOT_CMD_ADDR = 0x9000
  25. BOOT_CMD_SIZE = 0x1000
  26. DEFAULT_EXPECT_TIMEOUT = 10
  27. COREDUMP_UART_START = '================= CORE DUMP START ================='
  28. COREDUMP_UART_END = '================= CORE DUMP END ================='
  29. app: IdfApp
  30. serial: IdfSerial
  31. def __init__(self, *args, **kwargs) -> None: # type: ignore
  32. super().__init__(*args, **kwargs)
  33. self.gdb: GdbController = None # type: ignore
  34. # record this since pygdbmi is using logging.debug to generate some single character mess
  35. self.log_level = logging.getLogger().level
  36. # pygdbmi is using logging.debug to generate some single character mess
  37. if self.log_level <= logging.DEBUG:
  38. logging.getLogger().setLevel(logging.INFO)
  39. self.coredump_output: TextIO = None # type: ignore
  40. def close(self) -> None:
  41. if self.gdb:
  42. self.gdb.exit()
  43. super().close()
  44. def revert_log_level(self) -> None:
  45. logging.getLogger().setLevel(self.log_level)
  46. def expect_test_func_name(self, test_func_name: str) -> None:
  47. self.expect_exact('Enter test name:')
  48. self.write(test_func_name)
  49. self.expect_exact('Got test name: ' + test_func_name)
  50. def expect_none(self, pattern, **kwargs) -> None: # type: ignore
  51. """like dut.expect_all, but with an inverse logic"""
  52. if 'timeout' not in kwargs:
  53. kwargs['timeout'] = 1
  54. try:
  55. res = self.expect(pattern, **kwargs)
  56. raise AssertionError(f'Unexpected: {res.group().decode("utf8")}')
  57. except pexpect.TIMEOUT:
  58. pass
  59. def expect_backtrace(self) -> None:
  60. self.expect_exact('Backtrace:')
  61. self.expect_none('CORRUPTED')
  62. def expect_gme(self, reason: str) -> None:
  63. """Expect method for Guru Meditation Errors"""
  64. self.expect_exact(f"Guru Meditation Error: Core 0 panic'ed ({reason})")
  65. def expect_reg_dump(self, core: int = 0) -> None:
  66. """Expect method for the register dump"""
  67. self.expect(r'Core\s+%d register dump:' % core)
  68. def expect_elf_sha256(self) -> None:
  69. """Expect method for ELF SHA256 line"""
  70. elf_sha256 = sha256(self.app.elf_file)
  71. elf_sha256_len = int(
  72. self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '16')
  73. )
  74. self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len])
  75. def _call_espcoredump(
  76. self, extra_args: List[str], coredump_file_name: str, output_file_name: str
  77. ) -> None:
  78. # no "with" here, since we need the file to be open for later inspection by the test case
  79. if not self.coredump_output:
  80. self.coredump_output = open(output_file_name, 'w')
  81. espcoredump_script = os.path.join(
  82. os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py'
  83. )
  84. espcoredump_args = [
  85. sys.executable,
  86. espcoredump_script,
  87. 'info_corefile',
  88. '--core',
  89. coredump_file_name,
  90. ]
  91. espcoredump_args += extra_args
  92. espcoredump_args.append(self.app.elf_file)
  93. logging.info('Running %s', ' '.join(espcoredump_args))
  94. logging.info('espcoredump output is written to %s', self.coredump_output.name)
  95. subprocess.check_call(espcoredump_args, stdout=self.coredump_output)
  96. self.coredump_output.flush()
  97. self.coredump_output.seek(0)
  98. def process_coredump_uart(self) -> None:
  99. """Extract the core dump from UART output of the test, run espcoredump on it"""
  100. self.expect(self.COREDUMP_UART_START)
  101. res = self.expect('(.+)' + self.COREDUMP_UART_END)
  102. coredump_base64 = res.group(1).decode('utf8')
  103. with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file:
  104. logging.info('Writing UART base64 core dump to %s', coredump_file.name)
  105. coredump_file.write(coredump_base64)
  106. output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt')
  107. self._call_espcoredump(
  108. ['--core-format', 'b64'], coredump_file.name, output_file_name
  109. )
  110. def process_coredump_flash(self) -> None:
  111. """Extract the core dump from flash, run espcoredump on it"""
  112. coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin')
  113. logging.info('Writing flash binary core dump to %s', coredump_file_name)
  114. self.serial.dump_flash(partition='coredump', output=coredump_file_name)
  115. output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt')
  116. self._call_espcoredump(
  117. ['--core-format', 'raw'], coredump_file_name, output_file_name
  118. )
  119. def gdb_write(self, command: str) -> Any:
  120. """
  121. Wrapper to write to gdb with a longer timeout, as test runner
  122. host can be slow sometimes
  123. """
  124. return self.gdb.write(command, timeout_sec=10)
  125. def start_gdb(self) -> None:
  126. """
  127. Runs GDB and connects it to the "serial" port of the DUT.
  128. After this, the DUT expect methods can no longer be used to capture output.
  129. """
  130. self.gdb = GdbController(gdb_path=self.toolchain_prefix + 'gdb')
  131. # pygdbmi logs to console by default, make it log to a file instead
  132. pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt')
  133. pygdbmi_logger = self.gdb.logger
  134. pygdbmi_logger.setLevel(logging.DEBUG)
  135. while pygdbmi_logger.hasHandlers():
  136. pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
  137. log_handler = logging.FileHandler(pygdbmi_log_file_name)
  138. log_handler.setFormatter(
  139. logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
  140. )
  141. pygdbmi_logger.addHandler(log_handler)
  142. logging.info('Running command: %s', self.gdb.get_subprocess_cmd())
  143. for _ in range(10):
  144. try:
  145. # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that
  146. # an RPI under high load will get non-responsive during creating a lot of processes.
  147. resp = self.gdb.get_gdb_response(
  148. timeout_sec=10
  149. ) # calls verify_valid_gdb_subprocess() internally
  150. # it will be interesting to look up this response if the next GDB command fails (times out)
  151. logging.info('GDB response: %s', resp)
  152. break # success
  153. except GdbTimeoutError:
  154. logging.warning(
  155. 'GDB internal error: cannot get response from the subprocess'
  156. )
  157. except NoGdbProcessError:
  158. logging.error('GDB internal error: process is not running')
  159. break # failure - TODO: create another GdbController
  160. except ValueError:
  161. logging.error(
  162. 'GDB internal error: select() returned an unexpected file number'
  163. )
  164. # Set up logging for GDB remote protocol
  165. gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt')
  166. self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name)
  167. # Load the ELF file
  168. self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file))
  169. # Connect GDB to UART
  170. self.serial.proc.close()
  171. logging.info('Connecting to GDB Stub...')
  172. self.gdb_write('-gdb-set serial baud 115200')
  173. responses = self.gdb_write('-target-select remote ' + self.serial.port)
  174. # Make sure we get the 'stopped' notification
  175. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  176. if not stop_response:
  177. responses = self.gdb_write('-exec-interrupt')
  178. stop_response = self.find_gdb_response('stopped', 'notify', responses)
  179. assert stop_response
  180. frame = stop_response['payload']['frame']
  181. if 'file' not in frame:
  182. frame['file'] = '?'
  183. if 'line' not in frame:
  184. frame['line'] = '?'
  185. logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame))
  186. # Drain remaining responses
  187. self.gdb.get_gdb_response(raise_error_on_timeout=False)
  188. def gdb_backtrace(self) -> Any:
  189. """
  190. Returns the list of stack frames for the current thread.
  191. Each frame is a dictionary, refer to pygdbmi docs for the format.
  192. """
  193. assert self.gdb
  194. responses = self.gdb_write('-stack-list-frames')
  195. return self.find_gdb_response('done', 'result', responses)['payload']['stack']
  196. @staticmethod
  197. def match_backtrace(
  198. gdb_backtrace: List[Any], expected_functions_list: List[Any]
  199. ) -> bool:
  200. """
  201. Returns True if the function names listed in expected_functions_list match the backtrace
  202. given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
  203. function.
  204. """
  205. return all(
  206. [
  207. frame['func'] == expected_functions_list[i]
  208. for i, frame in enumerate(gdb_backtrace)
  209. ]
  210. )
  211. @staticmethod
  212. def find_gdb_response(
  213. message: str, response_type: str, responses: List[Any]
  214. ) -> Any:
  215. """
  216. Helper function which extracts one response from an array of GDB responses, filtering
  217. by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.
  218. """
  219. def match_response(response: Dict[str, Any]) -> bool:
  220. return response['message'] == message and response['type'] == response_type # type: ignore
  221. filtered_responses = [r for r in responses if match_response(r)]
  222. if not filtered_responses:
  223. return None
  224. return filtered_responses[0]
  225. @pytest.fixture(scope='module')
  226. def monkeypatch_module(request: FixtureRequest) -> MonkeyPatch:
  227. mp = MonkeyPatch()
  228. request.addfinalizer(mp.undo)
  229. return mp
  230. @pytest.fixture(scope='module', autouse=True)
  231. def replace_dut_class(monkeypatch_module: MonkeyPatch) -> None:
  232. monkeypatch_module.setattr('pytest_embedded_idf.dut.IdfDut', PanicTestDut)