loader.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. #
  2. # Copyright 2021 Espressif Systems (Shanghai) CO., LTD
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import base64
  17. import binascii
  18. import hashlib
  19. import logging
  20. import os
  21. import subprocess
  22. import sys
  23. import tempfile
  24. from construct import AlignedStruct, Bytes, GreedyRange, Int32ul, Padding, Struct, abs_, this
  25. from . import ESPCoreDumpLoaderError
  26. from .elf import (TASK_STATUS_CORRECT, TASK_STATUS_TCB_CORRUPTED, ElfFile, ElfSegment, ESPCoreDumpElfFile,
  27. EspTaskStatus, NoteSection)
  28. from .riscv import Esp32c3Methods
  29. from .xtensa import Esp32Methods, Esp32S2Methods
  30. try:
  31. from typing import Optional, Tuple
  32. except ImportError:
  33. pass
  34. IDF_PATH = os.getenv('IDF_PATH', '')
  35. PARTTOOL_PY = os.path.join(IDF_PATH, 'components', 'partition_table', 'parttool.py')
  36. ESPTOOL_PY = os.path.join(IDF_PATH, 'components', 'esptool_py', 'esptool', 'esptool.py')
  37. # Following structs are based on source code
  38. # components/espcoredump/include_core_dump/esp_core_dump_priv.h
  39. EspCoreDumpV1Header = Struct(
  40. 'tot_len' / Int32ul,
  41. 'ver' / Int32ul,
  42. 'task_num' / Int32ul,
  43. 'tcbsz' / Int32ul,
  44. )
  45. EspCoreDumpV2Header = Struct(
  46. 'tot_len' / Int32ul,
  47. 'ver' / Int32ul,
  48. 'task_num' / Int32ul,
  49. 'tcbsz' / Int32ul,
  50. 'segs_num' / Int32ul,
  51. )
  52. CRC = Int32ul
  53. SHA256 = Bytes(32)
  54. TaskHeader = Struct(
  55. 'tcb_addr' / Int32ul,
  56. 'stack_top' / Int32ul,
  57. 'stack_end' / Int32ul,
  58. )
  59. MemSegmentHeader = Struct(
  60. 'mem_start' / Int32ul,
  61. 'mem_sz' / Int32ul,
  62. 'data' / Bytes(this.mem_sz),
  63. )
  64. class EspCoreDumpVersion(object):
  65. """Core dump version class
  66. """
  67. # This class contains all version-dependent params
  68. ESP32 = 0
  69. ESP32S2 = 2
  70. XTENSA_CHIPS = [ESP32, ESP32S2]
  71. ESP32C3 = 5
  72. RISCV_CHIPS = [ESP32C3]
  73. COREDUMP_SUPPORTED_TARGETS = XTENSA_CHIPS + RISCV_CHIPS
  74. def __init__(self, version=None): # type: (int) -> None
  75. """Constructor for core dump version
  76. """
  77. super(EspCoreDumpVersion, self).__init__()
  78. if version is None:
  79. self.version = 0
  80. else:
  81. self.set_version(version)
  82. @staticmethod
  83. def make_dump_ver(major, minor): # type: (int, int) -> int
  84. return ((major & 0xFF) << 8) | ((minor & 0xFF) << 0)
  85. def set_version(self, version): # type: (int) -> None
  86. self.version = version
  87. @property
  88. def chip_ver(self): # type: () -> int
  89. return (self.version & 0xFFFF0000) >> 16
  90. @property
  91. def dump_ver(self): # type: () -> int
  92. return self.version & 0x0000FFFF
  93. @property
  94. def major(self): # type: () -> int
  95. return (self.version & 0x0000FF00) >> 8
  96. @property
  97. def minor(self): # type: () -> int
  98. return self.version & 0x000000FF
  99. class EspCoreDumpLoader(EspCoreDumpVersion):
  100. # "legacy" stands for core dumps v0.1 (before IDF v4.1)
  101. BIN_V1 = EspCoreDumpVersion.make_dump_ver(0, 1)
  102. BIN_V2 = EspCoreDumpVersion.make_dump_ver(0, 2)
  103. ELF_CRC32 = EspCoreDumpVersion.make_dump_ver(1, 0)
  104. ELF_SHA256 = EspCoreDumpVersion.make_dump_ver(1, 1)
  105. def __init__(self): # type: () -> None
  106. super(EspCoreDumpLoader, self).__init__()
  107. self.core_src_file = None # type: Optional[str]
  108. self.core_src_struct = None
  109. self.core_src = None
  110. self.core_elf_file = None # type: Optional[str]
  111. self.header = None
  112. self.header_struct = EspCoreDumpV1Header
  113. self.checksum_struct = CRC
  114. # target classes will be assigned in ``_reload_coredump``
  115. self.target_methods = Esp32Methods()
  116. self.temp_files = [] # type: list[str]
  117. def _create_temp_file(self): # type: () -> str
  118. t = tempfile.NamedTemporaryFile('wb', delete=False)
  119. # Here we close this at first to make sure the read/write is wrapped in context manager
  120. # Otherwise the result will be wrong if you read while open in another session
  121. t.close()
  122. self.temp_files.append(t.name)
  123. return t.name
  124. def _load_core_src(self): # type: () -> str
  125. """
  126. Write core elf into ``self.core_src``,
  127. Return the target str by reading core elf
  128. """
  129. with open(self.core_src_file, 'rb') as fr: # type: ignore
  130. coredump_bytes = fr.read()
  131. _header = EspCoreDumpV1Header.parse(coredump_bytes) # first we use V1 format to get version
  132. self.set_version(_header.ver)
  133. if self.dump_ver == self.ELF_CRC32:
  134. self.checksum_struct = CRC
  135. self.header_struct = EspCoreDumpV2Header
  136. elif self.dump_ver == self.ELF_SHA256:
  137. self.checksum_struct = SHA256
  138. self.header_struct = EspCoreDumpV2Header
  139. elif self.dump_ver == self.BIN_V1:
  140. self.checksum_struct = CRC
  141. self.header_struct = EspCoreDumpV1Header
  142. elif self.dump_ver == self.BIN_V2:
  143. self.checksum_struct = CRC
  144. self.header_struct = EspCoreDumpV2Header
  145. else:
  146. raise ESPCoreDumpLoaderError('Core dump version "0x%x" is not supported!' % self.dump_ver)
  147. self.core_src_struct = Struct(
  148. 'header' / self.header_struct,
  149. 'data' / Bytes(this.header.tot_len - self.header_struct.sizeof() - self.checksum_struct.sizeof()),
  150. 'checksum' / self.checksum_struct,
  151. )
  152. self.core_src = self.core_src_struct.parse(coredump_bytes) # type: ignore
  153. # Reload header if header struct changes after parsing
  154. if self.header_struct != EspCoreDumpV1Header:
  155. self.header = EspCoreDumpV2Header.parse(coredump_bytes)
  156. if self.chip_ver in self.COREDUMP_SUPPORTED_TARGETS:
  157. if self.chip_ver == self.ESP32:
  158. self.target_methods = Esp32Methods() # type: ignore
  159. elif self.chip_ver == self.ESP32S2:
  160. self.target_methods = Esp32S2Methods() # type: ignore
  161. elif self.chip_ver == self.ESP32C3:
  162. self.target_methods = Esp32c3Methods() # type: ignore
  163. else:
  164. raise NotImplementedError
  165. else:
  166. raise ESPCoreDumpLoaderError('Core dump chip "0x%x" is not supported!' % self.chip_ver)
  167. return self.target_methods.TARGET # type: ignore
  168. def _validate_dump_file(self): # type: () -> None
  169. if self.chip_ver not in self.COREDUMP_SUPPORTED_TARGETS:
  170. raise ESPCoreDumpLoaderError('Invalid core dump chip version: "{}", should be <= "0x{:X}"'
  171. .format(self.chip_ver, self.ESP32S2))
  172. if self.checksum_struct == CRC:
  173. self._crc_validate()
  174. elif self.checksum_struct == SHA256:
  175. self._sha256_validate()
  176. def _crc_validate(self): # type: () -> None
  177. data_crc = binascii.crc32(
  178. EspCoreDumpV2Header.build(self.core_src.header) + self.core_src.data) & 0xffffffff # type: ignore
  179. if data_crc != self.core_src.checksum: # type: ignore
  180. raise ESPCoreDumpLoaderError(
  181. 'Invalid core dump CRC %x, should be %x' % (data_crc, self.core_src.crc)) # type: ignore
  182. def _sha256_validate(self): # type: () -> None
  183. data_sha256 = hashlib.sha256(
  184. EspCoreDumpV2Header.build(self.core_src.header) + self.core_src.data) # type: ignore
  185. data_sha256_str = data_sha256.hexdigest()
  186. sha256_str = binascii.hexlify(self.core_src.checksum).decode('ascii') # type: ignore
  187. if data_sha256_str != sha256_str:
  188. raise ESPCoreDumpLoaderError('Invalid core dump SHA256 "{}", should be "{}"'
  189. .format(data_sha256_str, sha256_str))
  190. def create_corefile(self, exe_name=None, e_machine=ESPCoreDumpElfFile.EM_XTENSA):
  191. # type: (Optional[str], int) -> None
  192. """
  193. Creates core dump ELF file
  194. """
  195. self._validate_dump_file()
  196. self.core_elf_file = self._create_temp_file()
  197. if self.dump_ver in [self.ELF_CRC32,
  198. self.ELF_SHA256]:
  199. self._extract_elf_corefile(exe_name, e_machine)
  200. elif self.dump_ver in [self.BIN_V1,
  201. self.BIN_V2]:
  202. self._extract_bin_corefile(e_machine)
  203. else:
  204. raise NotImplementedError
  205. def _extract_elf_corefile(self, exe_name=None, e_machine=ESPCoreDumpElfFile.EM_XTENSA): # type: (str, int) -> None
  206. """
  207. Reads the ELF formatted core dump image and parse it
  208. """
  209. with open(self.core_elf_file, 'wb') as fw: # type: ignore
  210. fw.write(self.core_src.data) # type: ignore
  211. core_elf = ESPCoreDumpElfFile(self.core_elf_file, e_machine=e_machine) # type: ignore
  212. # Read note segments from core file which are belong to tasks (TCB or stack)
  213. for seg in core_elf.note_segments:
  214. for note_sec in seg.note_secs:
  215. # Check for version info note
  216. if note_sec.name == 'ESP_CORE_DUMP_INFO' \
  217. and note_sec.type == ESPCoreDumpElfFile.PT_INFO \
  218. and exe_name:
  219. exe_elf = ElfFile(exe_name)
  220. app_sha256 = binascii.hexlify(exe_elf.sha256)
  221. coredump_sha256_struct = Struct(
  222. 'ver' / Int32ul,
  223. 'sha256' / Bytes(64) # SHA256 as hex string
  224. )
  225. coredump_sha256 = coredump_sha256_struct.parse(note_sec.desc[:coredump_sha256_struct.sizeof()])
  226. if coredump_sha256.sha256 != app_sha256:
  227. raise ESPCoreDumpLoaderError(
  228. 'Invalid application image for coredump: coredump SHA256({!r}) != app SHA256({!r}).'
  229. .format(coredump_sha256, app_sha256))
  230. if coredump_sha256.ver != self.version:
  231. raise ESPCoreDumpLoaderError(
  232. 'Invalid application image for coredump: coredump SHA256 version({}) != app SHA256 version({}).'
  233. .format(coredump_sha256.ver, self.version))
  234. @staticmethod
  235. def _get_aligned_size(size, align_with=4): # type: (int, int) -> int
  236. if size % align_with:
  237. return align_with * (size // align_with + 1)
  238. return size
  239. @staticmethod
  240. def _build_note_section(name, sec_type, desc): # type: (str, int, str) -> bytes
  241. b_name = bytearray(name, encoding='ascii') + b'\0'
  242. return NoteSection.build({ # type: ignore
  243. 'namesz': len(b_name),
  244. 'descsz': len(desc),
  245. 'type': sec_type,
  246. 'name': b_name,
  247. 'desc': desc,
  248. })
  249. def _extract_bin_corefile(self, e_machine=ESPCoreDumpElfFile.EM_XTENSA): # type: (int) -> None
  250. """
  251. Creates core dump ELF file
  252. """
  253. coredump_data_struct = Struct(
  254. 'tasks' / GreedyRange(
  255. AlignedStruct(
  256. 4,
  257. 'task_header' / TaskHeader,
  258. 'tcb' / Bytes(self.header.tcbsz), # type: ignore
  259. 'stack' / Bytes(abs_(this.task_header.stack_top - this.task_header.stack_end)), # type: ignore
  260. )
  261. ),
  262. 'mem_seg_headers' / MemSegmentHeader[self.core_src.header.segs_num] # type: ignore
  263. )
  264. core_elf = ESPCoreDumpElfFile(e_machine=e_machine)
  265. notes = b''
  266. core_dump_info_notes = b''
  267. task_info_notes = b''
  268. coredump_data = coredump_data_struct.parse(self.core_src.data) # type: ignore
  269. for i, task in enumerate(coredump_data.tasks):
  270. stack_len_aligned = self._get_aligned_size(abs(task.task_header.stack_top - task.task_header.stack_end))
  271. task_status_kwargs = {
  272. 'task_index': i,
  273. 'task_flags': TASK_STATUS_CORRECT,
  274. 'task_tcb_addr': task.task_header.tcb_addr,
  275. 'task_stack_start': min(task.task_header.stack_top, task.task_header.stack_end),
  276. 'task_stack_end': max(task.task_header.stack_top, task.task_header.stack_end),
  277. 'task_stack_len': stack_len_aligned,
  278. 'task_name': Padding(16).build({}) # currently we don't have task_name, keep it as padding
  279. }
  280. # Write TCB
  281. try:
  282. if self.target_methods.tcb_is_sane(task.task_header.tcb_addr, self.header.tcbsz): # type: ignore
  283. core_elf.add_segment(task.task_header.tcb_addr,
  284. task.tcb,
  285. ElfFile.PT_LOAD,
  286. ElfSegment.PF_R | ElfSegment.PF_W)
  287. elif task.task_header.tcb_addr and self.target_methods.addr_is_fake(task.task_header.tcb_addr):
  288. task_status_kwargs['task_flags'] |= TASK_STATUS_TCB_CORRUPTED
  289. except ESPCoreDumpLoaderError as e:
  290. logging.warning('Skip TCB {} bytes @ 0x{:x}. (Reason: {})'
  291. .format(self.header.tcbsz, task.task_header.tcb_addr, e)) # type: ignore
  292. # Write stack
  293. try:
  294. if self.target_methods.stack_is_sane(task_status_kwargs['task_stack_start'],
  295. task_status_kwargs['task_stack_end']):
  296. core_elf.add_segment(task_status_kwargs['task_stack_start'],
  297. task.stack,
  298. ElfFile.PT_LOAD,
  299. ElfSegment.PF_R | ElfSegment.PF_W)
  300. elif (task_status_kwargs['task_stack_start']
  301. and self.target_methods.addr_is_fake(task_status_kwargs['task_stack_start'])):
  302. task_status_kwargs['task_flags'] |= TASK_STATUS_TCB_CORRUPTED
  303. core_elf.add_segment(task_status_kwargs['task_stack_start'],
  304. task.stack,
  305. ElfFile.PT_LOAD,
  306. ElfSegment.PF_R | ElfSegment.PF_W)
  307. except ESPCoreDumpLoaderError as e:
  308. logging.warning('Skip task\'s ({:x}) stack {} bytes @ 0x{:x}. (Reason: {})'
  309. .format(task_status_kwargs['tcb_addr'],
  310. task_status_kwargs['stack_len_aligned'],
  311. task_status_kwargs['stack_base'],
  312. e))
  313. try:
  314. logging.debug('Stack start_end: 0x{:x} @ 0x{:x}'
  315. .format(task.task_header.stack_top, task.task_header.stack_end))
  316. task_regs, extra_regs = self.target_methods.get_registers_from_stack(
  317. task.stack,
  318. task.task_header.stack_end > task.task_header.stack_top
  319. )
  320. except Exception as e:
  321. raise ESPCoreDumpLoaderError(str(e))
  322. task_info_notes += self._build_note_section('TASK_INFO',
  323. ESPCoreDumpElfFile.PT_TASK_INFO,
  324. EspTaskStatus.build(task_status_kwargs))
  325. notes += self._build_note_section('CORE',
  326. ElfFile.PT_LOAD,
  327. self.target_methods.build_prstatus_data(task.task_header.tcb_addr,
  328. task_regs))
  329. if len(core_dump_info_notes) == 0: # the first task is the crashed task
  330. core_dump_info_notes += self._build_note_section('ESP_CORE_DUMP_INFO',
  331. ESPCoreDumpElfFile.PT_INFO,
  332. Int32ul.build(self.header.ver)) # type: ignore
  333. _regs = [task.task_header.tcb_addr]
  334. # For xtensa, we need to put the exception registers into the extra info as well
  335. if e_machine == ESPCoreDumpElfFile.EM_XTENSA and extra_regs:
  336. for reg_id in extra_regs:
  337. _regs.extend([reg_id, extra_regs[reg_id]])
  338. core_dump_info_notes += self._build_note_section(
  339. 'EXTRA_INFO',
  340. ESPCoreDumpElfFile.PT_EXTRA_INFO,
  341. Int32ul[len(_regs)].build(_regs)
  342. )
  343. if self.dump_ver == self.BIN_V2:
  344. for header in coredump_data.mem_seg_headers:
  345. logging.debug('Read memory segment {} bytes @ 0x{:x}'.format(header.mem_sz, header.mem_start))
  346. core_elf.add_segment(header.mem_start, header.data, ElfFile.PT_LOAD, ElfSegment.PF_R | ElfSegment.PF_W)
  347. # add notes
  348. try:
  349. core_elf.add_segment(0, notes, ElfFile.PT_NOTE, 0)
  350. except ESPCoreDumpLoaderError as e:
  351. logging.warning('Skip NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'.format(len(notes), 0, e))
  352. # add core dump info notes
  353. try:
  354. core_elf.add_segment(0, core_dump_info_notes, ElfFile.PT_NOTE, 0)
  355. except ESPCoreDumpLoaderError as e:
  356. logging.warning('Skip core dump info NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'
  357. .format(len(core_dump_info_notes), 0, e))
  358. try:
  359. core_elf.add_segment(0, task_info_notes, ElfFile.PT_NOTE, 0)
  360. except ESPCoreDumpLoaderError as e:
  361. logging.warning('Skip failed tasks info NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'
  362. .format(len(task_info_notes), 0, e))
  363. # dump core ELF
  364. core_elf.e_type = ElfFile.ET_CORE
  365. core_elf.dump(self.core_elf_file) # type: ignore
  366. class ESPCoreDumpFlashLoader(EspCoreDumpLoader):
  367. ESP_COREDUMP_PART_TABLE_OFF = 0x8000
  368. def __init__(self, offset, target=None, port=None, baud=None):
  369. # type: (int, Optional[str], Optional[str], Optional[int]) -> None
  370. super(ESPCoreDumpFlashLoader, self).__init__()
  371. self.port = port
  372. self.baud = baud
  373. self._get_core_src(offset, target)
  374. self.target = self._load_core_src()
  375. def _get_core_src(self, off, target=None): # type: (int, Optional[str]) -> None
  376. """
  377. Loads core dump from flash using parttool or elftool (if offset is set)
  378. """
  379. try:
  380. if off:
  381. logging.info('Invoke esptool to read image.')
  382. self._invoke_esptool(off=off, target=target)
  383. else:
  384. logging.info('Invoke parttool to read image.')
  385. self._invoke_parttool()
  386. except subprocess.CalledProcessError as e:
  387. if e.output:
  388. logging.info(e.output)
  389. logging.error('Error during the subprocess execution')
  390. def _invoke_esptool(self, off=None, target=None): # type: (Optional[int], Optional[str]) -> None
  391. """
  392. Loads core dump from flash using elftool
  393. """
  394. if target is None:
  395. target = 'auto'
  396. tool_args = [sys.executable, ESPTOOL_PY, '-c', target]
  397. if self.port:
  398. tool_args.extend(['-p', self.port])
  399. if self.baud:
  400. tool_args.extend(['-b', str(self.baud)])
  401. self.core_src_file = self._create_temp_file()
  402. try:
  403. (part_offset, part_size) = self._get_core_dump_partition_info()
  404. if not off:
  405. off = part_offset # set default offset if not specified
  406. logging.warning('The core dump image offset is not specified. Use partition offset: %d.', part_offset)
  407. if part_offset != off:
  408. logging.warning('Predefined image offset: %d does not match core dump partition offset: %d', off,
  409. part_offset)
  410. # Here we use V1 format to locate the size
  411. tool_args.extend(['read_flash', str(off), str(EspCoreDumpV1Header.sizeof())])
  412. tool_args.append(self.core_src_file) # type: ignore
  413. # read core dump length
  414. et_out = subprocess.check_output(tool_args)
  415. if et_out:
  416. logging.info(et_out.decode('utf-8'))
  417. header = EspCoreDumpV1Header.parse(open(self.core_src_file, 'rb').read()) # type: ignore
  418. if not header or not 0 < header.tot_len <= part_size:
  419. logging.error('Incorrect size of core dump image: {}, use partition size instead: {}'
  420. .format(header.tot_len, part_size))
  421. coredump_len = part_size
  422. else:
  423. coredump_len = header.tot_len
  424. # set actual size of core dump image and read it from flash
  425. tool_args[-2] = str(coredump_len)
  426. et_out = subprocess.check_output(tool_args)
  427. if et_out:
  428. logging.info(et_out.decode('utf-8'))
  429. except subprocess.CalledProcessError as e:
  430. logging.error('esptool script execution failed with err %d', e.returncode)
  431. logging.debug("Command ran: '%s'", e.cmd)
  432. logging.debug('Command out:')
  433. logging.debug(e.output)
  434. raise e
  435. def _invoke_parttool(self): # type: () -> None
  436. """
  437. Loads core dump from flash using parttool
  438. """
  439. tool_args = [sys.executable, PARTTOOL_PY]
  440. if self.port:
  441. tool_args.extend(['--port', self.port])
  442. tool_args.extend(['read_partition', '--partition-type', 'data', '--partition-subtype', 'coredump', '--output'])
  443. self.core_src_file = self._create_temp_file()
  444. try:
  445. tool_args.append(self.core_src_file) # type: ignore
  446. # read core dump partition
  447. et_out = subprocess.check_output(tool_args)
  448. if et_out:
  449. logging.info(et_out.decode('utf-8'))
  450. except subprocess.CalledProcessError as e:
  451. logging.error('parttool script execution failed with err %d', e.returncode)
  452. logging.debug("Command ran: '%s'", e.cmd)
  453. logging.debug('Command out:')
  454. logging.debug(e.output)
  455. raise e
  456. def _get_core_dump_partition_info(self, part_off=None): # type: (Optional[int]) -> Tuple[int, int]
  457. """
  458. Get core dump partition info using parttool
  459. """
  460. logging.info('Retrieving core dump partition offset and size...')
  461. if not part_off:
  462. part_off = self.ESP_COREDUMP_PART_TABLE_OFF
  463. try:
  464. tool_args = [sys.executable, PARTTOOL_PY, '-q', '--partition-table-offset', str(part_off)]
  465. if self.port:
  466. tool_args.extend(['--port', self.port])
  467. invoke_args = tool_args + ['get_partition_info', '--partition-type', 'data',
  468. '--partition-subtype', 'coredump',
  469. '--info', 'offset', 'size']
  470. res = subprocess.check_output(invoke_args).strip()
  471. (offset_str, size_str) = res.rsplit(b'\n')[-1].split(b' ')
  472. size = int(size_str, 16)
  473. offset = int(offset_str, 16)
  474. logging.info('Core dump partition offset=%d, size=%d', offset, size)
  475. except subprocess.CalledProcessError as e:
  476. logging.error('parttool get partition info failed with err %d', e.returncode)
  477. logging.debug("Command ran: '%s'", e.cmd)
  478. logging.debug('Command out:')
  479. logging.debug(e.output)
  480. logging.error('Check if the coredump partition exists in partition table.')
  481. raise e
  482. return offset, size
  483. class ESPCoreDumpFileLoader(EspCoreDumpLoader):
  484. def __init__(self, path, is_b64=False): # type: (str, bool) -> None
  485. super(ESPCoreDumpFileLoader, self).__init__()
  486. self.is_b64 = is_b64
  487. self._get_core_src(path)
  488. self.target = self._load_core_src()
  489. def _get_core_src(self, path): # type: (str) -> None
  490. """
  491. Loads core dump from (raw binary or base64-encoded) file
  492. """
  493. logging.debug('Load core dump from "%s", %s format', path, 'b64' if self.is_b64 else 'raw')
  494. if not self.is_b64:
  495. self.core_src_file = path
  496. else:
  497. self.core_src_file = self._create_temp_file()
  498. with open(self.core_src_file, 'wb') as fw:
  499. with open(path, 'rb') as fb64:
  500. while True:
  501. line = fb64.readline()
  502. if len(line) == 0:
  503. break
  504. data = base64.standard_b64decode(line.rstrip(b'\r\n'))
  505. fw.write(data) # type: ignore