IDFDUT.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917
  1. # SPDX-FileCopyrightText: 2015-2021 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. """ DUT for IDF applications """
  4. import collections
  5. import functools
  6. import io
  7. import os
  8. import os.path
  9. import re
  10. import subprocess
  11. import sys
  12. import tempfile
  13. import time
  14. import pexpect
  15. import serial
  16. # python2 and python3 queue package name is different
  17. try:
  18. import Queue as _queue
  19. except ImportError:
  20. import queue as _queue # type: ignore
  21. from serial.tools import list_ports
  22. from tiny_test_fw import DUT, Utility
  23. try:
  24. import esptool
  25. except ImportError: # cheat and use IDF's copy of esptool if available
  26. idf_path = os.getenv('IDF_PATH')
  27. if not idf_path or not os.path.exists(idf_path):
  28. raise
  29. sys.path.insert(0, os.path.join(idf_path, 'components', 'esptool_py', 'esptool'))
  30. import esptool
  31. try:
  32. # esptool>=4.0
  33. detect_chip = esptool.cmds.detect_chip
  34. FatalError = esptool.util.FatalError
  35. targets = esptool.targets
  36. except (AttributeError, ModuleNotFoundError):
  37. # esptool<4.0
  38. detect_chip = esptool.ESPLoader.detect_chip
  39. FatalError = esptool.FatalError
  40. targets = esptool
  41. import espefuse
  42. import espsecure
  43. class IDFToolError(OSError):
  44. pass
  45. class IDFDUTException(RuntimeError):
  46. pass
  47. class IDFRecvThread(DUT.RecvThread):
  48. PERFORMANCE_PATTERN = re.compile(r'\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n')
  49. EXCEPTION_PATTERNS = [
  50. re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
  51. re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)'),
  52. re.compile(r'(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))')
  53. ]
  54. BACKTRACE_PATTERN = re.compile(r'Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)')
  55. BACKTRACE_ADDRESS_PATTERN = re.compile(r'(0x[0-9a-f]{8}):0x[0-9a-f]{8}')
  56. def __init__(self, read, dut):
  57. super(IDFRecvThread, self).__init__(read, dut)
  58. self.exceptions = _queue.Queue()
  59. self.performance_items = _queue.Queue()
  60. def collect_performance(self, comp_data):
  61. matches = self.PERFORMANCE_PATTERN.findall(comp_data)
  62. for match in matches:
  63. Utility.console_log('[Performance][{}]: {}'.format(match[0], match[1]), color='orange')
  64. self.performance_items.put((match[0], match[1]))
  65. def detect_exception(self, comp_data):
  66. for pattern in self.EXCEPTION_PATTERNS:
  67. start = 0
  68. while True:
  69. match = pattern.search(comp_data, pos=start)
  70. if match:
  71. start = match.end()
  72. self.exceptions.put(match.group(0))
  73. Utility.console_log('[Exception]: {}'.format(match.group(0)), color='red')
  74. else:
  75. break
  76. def detect_backtrace(self, comp_data):
  77. start = 0
  78. while True:
  79. match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
  80. if match:
  81. start = match.end()
  82. Utility.console_log('[Backtrace]:{}'.format(match.group(1)), color='red')
  83. # translate backtrace
  84. addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
  85. translated_backtrace = ''
  86. for addr in addresses:
  87. ret = self.dut.lookup_pc_address(addr)
  88. if ret:
  89. translated_backtrace += ret + '\n'
  90. if translated_backtrace:
  91. Utility.console_log('Translated backtrace\n:' + translated_backtrace, color='yellow')
  92. else:
  93. Utility.console_log('Failed to translate backtrace', color='yellow')
  94. else:
  95. break
  96. CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
  97. def _uses_esptool(func):
  98. """ Suspend listener thread, connect with esptool,
  99. call target function with esptool instance,
  100. then resume listening for output
  101. """
  102. @functools.wraps(func)
  103. def handler(self, *args, **kwargs):
  104. self.stop_receive()
  105. settings = self.port_inst.get_settings()
  106. try:
  107. if not self.rom_inst:
  108. if not self.secure_boot_en:
  109. self.rom_inst = detect_chip(self.port_inst)
  110. else:
  111. self.rom_inst = self.get_rom()(self.port_inst)
  112. self.rom_inst.connect('hard_reset')
  113. if (self.secure_boot_en):
  114. esp = self.rom_inst
  115. esp.flash_spi_attach(0)
  116. else:
  117. esp = self.rom_inst.run_stub()
  118. ret = func(self, esp, *args, **kwargs)
  119. # do hard reset after use esptool
  120. esp.hard_reset()
  121. finally:
  122. # always need to restore port settings
  123. self.port_inst.apply_settings(settings)
  124. self.start_receive()
  125. return ret
  126. return handler
  127. class IDFDUT(DUT.SerialDUT):
  128. """ IDF DUT, extends serial with esptool methods
  129. (Becomes aware of IDFApp instance which holds app-specific data)
  130. """
  131. # /dev/ttyAMA0 port is listed in Raspberry Pi
  132. # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
  133. INVALID_PORT_PATTERN = re.compile(r'AMA|Bluetooth')
  134. # if need to erase NVS partition in start app
  135. ERASE_NVS = True
  136. RECV_THREAD_CLS = IDFRecvThread
  137. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  138. super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
  139. self.allow_dut_exception = allow_dut_exception
  140. self.exceptions = _queue.Queue()
  141. self.performance_items = _queue.Queue()
  142. self.rom_inst = None
  143. self.secure_boot_en = self.app.get_sdkconfig_config_value('CONFIG_SECURE_BOOT') and \
  144. not self.app.get_sdkconfig_config_value('CONFIG_EFUSE_VIRTUAL')
  145. @classmethod
  146. def get_rom(cls):
  147. raise NotImplementedError('This is an abstraction class, method not defined.')
  148. @classmethod
  149. def get_mac(cls, app, port):
  150. """
  151. get MAC address via esptool
  152. :param app: application instance (to get tool)
  153. :param port: serial port as string
  154. :return: MAC address or None
  155. """
  156. esp = None
  157. try:
  158. esp = cls.get_rom()(port)
  159. esp.connect()
  160. return esp.read_mac()
  161. except RuntimeError:
  162. return None
  163. finally:
  164. if esp:
  165. # do hard reset after use esptool
  166. esp.hard_reset()
  167. esp._port.close()
  168. @classmethod
  169. def confirm_dut(cls, port, **kwargs):
  170. inst = None
  171. try:
  172. expected_rom_class = cls.get_rom()
  173. except NotImplementedError:
  174. expected_rom_class = None
  175. try:
  176. # TODO: check whether 8266 works with this logic
  177. # Otherwise overwrite it in ESP8266DUT
  178. inst = detect_chip(port)
  179. if expected_rom_class and type(inst) != expected_rom_class:
  180. raise RuntimeError('Target not expected')
  181. return inst.read_mac() is not None, get_target_by_rom_class(type(inst))
  182. except (FatalError, RuntimeError):
  183. return False, None
  184. finally:
  185. if inst is not None:
  186. inst._port.close()
  187. def _try_flash(self, erase_nvs):
  188. """
  189. Called by start_app()
  190. :return: None
  191. """
  192. flash_files = []
  193. encrypt_files = []
  194. try:
  195. # Open the files here to prevents us from having to seek back to 0
  196. # each time. Before opening them, we have to organize the lists the
  197. # way esptool.write_flash needs:
  198. # If encrypt is provided, flash_files contains all the files to
  199. # flash.
  200. # Else, flash_files contains the files to be flashed as plain text
  201. # and encrypt_files contains the ones to flash encrypted.
  202. flash_files = self.app.flash_files
  203. encrypt_files = self.app.encrypt_files
  204. encrypt = self.app.flash_settings.get('encrypt', False)
  205. if encrypt:
  206. flash_files = encrypt_files
  207. encrypt_files = []
  208. else:
  209. flash_files = [entry
  210. for entry in flash_files
  211. if entry not in encrypt_files]
  212. flash_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
  213. encrypt_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
  214. if erase_nvs:
  215. address = self.app.partition_table['nvs']['offset']
  216. size = self.app.partition_table['nvs']['size']
  217. nvs_file = tempfile.TemporaryFile()
  218. nvs_file.write(b'\xff' * size)
  219. nvs_file.seek(0)
  220. if not isinstance(address, int):
  221. address = int(address, 0)
  222. # We have to check whether this file needs to be added to
  223. # flash_files list or encrypt_files.
  224. # Get the CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
  225. # value. If it is set to True, then NVS is always encrypted.
  226. sdkconfig_dict = self.app.get_sdkconfig()
  227. macro_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
  228. # If the macro is not enabled (plain text flash) or all files
  229. # must be encrypted, add NVS to flash_files.
  230. if not macro_encryption or encrypt:
  231. flash_files.append((address, nvs_file))
  232. else:
  233. encrypt_files.append((address, nvs_file))
  234. self.write_flash_data(flash_files, encrypt_files, False, encrypt)
  235. finally:
  236. for (_, f) in flash_files:
  237. f.close()
  238. for (_, f) in encrypt_files:
  239. f.close()
  240. @_uses_esptool
  241. def write_flash_data(self, esp, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False):
  242. """
  243. Try flashing at a particular baud rate.
  244. Structured this way so @_uses_esptool will reconnect each time
  245. :return: None
  246. """
  247. last_error = None
  248. for baud_rate in [921600, 115200]:
  249. try:
  250. # fake flasher args object, this is a hack until
  251. # esptool Python API is improved
  252. class FlashArgs(object):
  253. def __init__(self, attributes):
  254. for key, value in attributes.items():
  255. self.__setattr__(key, value)
  256. # write_flash expects the parameter encrypt_files to be None and not
  257. # an empty list, so perform the check here
  258. flash_args = FlashArgs({
  259. 'flash_size': self.app.flash_settings['flash_size'],
  260. 'flash_mode': self.app.flash_settings['flash_mode'],
  261. 'flash_freq': self.app.flash_settings['flash_freq'],
  262. 'addr_filename': flash_files or None,
  263. 'encrypt_files': encrypt_files or None,
  264. 'no_stub': self.secure_boot_en,
  265. 'compress': not self.secure_boot_en,
  266. 'verify': False,
  267. 'encrypt': encrypt,
  268. 'ignore_flash_encryption_efuse_setting': ignore_flash_encryption_efuse_setting,
  269. 'erase_all': False,
  270. 'after': 'no_reset',
  271. 'force': False,
  272. 'chip': esp.CHIP_NAME.lower().replace('-', ''),
  273. })
  274. esp.change_baud(baud_rate)
  275. esptool.detect_flash_size(esp, flash_args)
  276. esptool.write_flash(esp, flash_args)
  277. break
  278. except RuntimeError as e:
  279. last_error = e
  280. else:
  281. raise last_error
  282. def image_info(self, path_to_file):
  283. """
  284. get hash256 of app
  285. :param: path: path to file
  286. :return: sha256 appended to app
  287. """
  288. old_stdout = sys.stdout
  289. new_stdout = io.StringIO()
  290. sys.stdout = new_stdout
  291. class Args(object):
  292. def __init__(self, attributes):
  293. for key, value in attributes.items():
  294. self.__setattr__(key, value)
  295. args = Args({
  296. 'chip': self.TARGET,
  297. 'filename': path_to_file,
  298. })
  299. esptool.image_info(args)
  300. output = new_stdout.getvalue()
  301. sys.stdout = old_stdout
  302. return output
  303. def start_app(self, erase_nvs=ERASE_NVS):
  304. """
  305. download and start app.
  306. :param: erase_nvs: whether erase NVS partition during flash
  307. :return: None
  308. """
  309. self._try_flash(erase_nvs)
  310. def start_app_no_enc(self):
  311. """
  312. download and start app.
  313. :param: erase_nvs: whether erase NVS partition during flash
  314. :return: None
  315. """
  316. flash_files = self.app.flash_files + self.app.encrypt_files
  317. self.write_flash(flash_files)
  318. def write_flash(self, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False):
  319. """
  320. Flash files
  321. :return: None
  322. """
  323. flash_offs_files = []
  324. encrypt_offs_files = []
  325. try:
  326. if flash_files:
  327. flash_offs_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
  328. if encrypt_files:
  329. encrypt_offs_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
  330. self.write_flash_data(flash_offs_files, encrypt_offs_files, ignore_flash_encryption_efuse_setting, encrypt)
  331. finally:
  332. for (_, f) in flash_offs_files:
  333. f.close()
  334. for (_, f) in encrypt_offs_files:
  335. f.close()
  336. def bootloader_flash(self):
  337. """
  338. download bootloader.
  339. :return: None
  340. """
  341. bootloader_path = os.path.join(self.app.binary_path, 'bootloader', 'bootloader.bin')
  342. offs = int(self.app.get_sdkconfig()['CONFIG_BOOTLOADER_OFFSET_IN_FLASH'], 0)
  343. flash_files = [(offs, bootloader_path)]
  344. self.write_flash(flash_files)
  345. @_uses_esptool
  346. def reset(self, esp):
  347. """
  348. hard reset DUT
  349. :return: None
  350. """
  351. # decorator `_use_esptool` will do reset
  352. # so we don't need to do anything in this method
  353. pass
  354. @_uses_esptool
  355. def erase_partition(self, esp, partition):
  356. """
  357. :param partition: partition name to erase
  358. :return: None
  359. """
  360. address = self.app.partition_table[partition]['offset']
  361. size = self.app.partition_table[partition]['size']
  362. esp.erase_region(address, size)
  363. @_uses_esptool
  364. def erase_flash(self, esp):
  365. """
  366. erase the flash completely
  367. :return: None
  368. """
  369. esp.erase_flash()
  370. @_uses_esptool
  371. def dump_flash(self, esp, output_file, **kwargs):
  372. """
  373. dump flash
  374. :param output_file: output file name, if relative path, will use sdk path as base path.
  375. :keyword partition: partition name, dump the partition.
  376. ``partition`` is preferred than using ``address`` and ``size``.
  377. :keyword address: dump from address (need to be used with size)
  378. :keyword size: dump size (need to be used with address)
  379. :return: None
  380. """
  381. if os.path.isabs(output_file) is False:
  382. output_file = os.path.relpath(output_file, self.app.get_log_folder())
  383. if 'partition' in kwargs:
  384. partition = self.app.partition_table[kwargs['partition']]
  385. _address = partition['offset']
  386. _size = partition['size']
  387. elif 'address' in kwargs and 'size' in kwargs:
  388. _address = kwargs['address']
  389. _size = kwargs['size']
  390. else:
  391. raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
  392. content = esp.read_flash(_address, _size)
  393. with open(output_file, 'wb') as f:
  394. f.write(content)
  395. @staticmethod
  396. def _sort_usb_ports(ports):
  397. """
  398. Move the usb ports to the very beginning
  399. :param ports: list of ports
  400. :return: list of ports with usb ports at beginning
  401. """
  402. usb_ports = []
  403. rest_ports = []
  404. for port in ports:
  405. if 'usb' in port.lower():
  406. usb_ports.append(port)
  407. else:
  408. rest_ports.append(port)
  409. return usb_ports + rest_ports
  410. @classmethod
  411. def list_available_ports(cls):
  412. # It will return other kinds of ports as well, such as ttyS* ports.
  413. # Give the usb ports higher priority
  414. ports = cls._sort_usb_ports([x.device for x in list_ports.comports()])
  415. espport = os.getenv('ESPPORT')
  416. if not espport:
  417. # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`:
  418. # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`.
  419. # This means we need to make all 3 conditions fail, to filter out the port.
  420. # So some part of the filters will not be straight forward to users.
  421. # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand.
  422. # Filter out invalid port by our own will be much simpler.
  423. return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)]
  424. # On MacOs with python3.6: type of espport is already utf8
  425. if isinstance(espport, type(u'')):
  426. port_hint = espport
  427. else:
  428. port_hint = espport.decode('utf8')
  429. # If $ESPPORT is a valid port, make it appear first in the list
  430. if port_hint in ports:
  431. ports.remove(port_hint)
  432. return [port_hint] + ports
  433. # On macOS, user may set ESPPORT to /dev/tty.xxx while
  434. # pySerial lists only the corresponding /dev/cu.xxx port
  435. if sys.platform == 'darwin' and 'tty.' in port_hint:
  436. port_hint = port_hint.replace('tty.', 'cu.')
  437. if port_hint in ports:
  438. ports.remove(port_hint)
  439. return [port_hint] + ports
  440. return ports
  441. def lookup_pc_address(self, pc_addr):
  442. cmd = ['%saddr2line' % self.TOOLCHAIN_PREFIX,
  443. '-pfiaC', '-e', self.app.elf_file, pc_addr]
  444. ret = ''
  445. try:
  446. translation = subprocess.check_output(cmd)
  447. ret = translation.decode()
  448. except OSError:
  449. pass
  450. return ret
  451. @staticmethod
  452. def _queue_read_all(source_queue):
  453. output = []
  454. while True:
  455. try:
  456. output.append(source_queue.get(timeout=0))
  457. except _queue.Empty:
  458. break
  459. return output
  460. def _queue_copy(self, source_queue, dest_queue):
  461. data = self._queue_read_all(source_queue)
  462. for d in data:
  463. dest_queue.put(d)
  464. def _get_from_queue(self, queue_name):
  465. self_queue = getattr(self, queue_name)
  466. if self.receive_thread:
  467. recv_thread_queue = getattr(self.receive_thread, queue_name)
  468. self._queue_copy(recv_thread_queue, self_queue)
  469. return self._queue_read_all(self_queue)
  470. def stop_receive(self):
  471. if self.receive_thread:
  472. for name in ['performance_items', 'exceptions']:
  473. source_queue = getattr(self.receive_thread, name)
  474. dest_queue = getattr(self, name)
  475. self._queue_copy(source_queue, dest_queue)
  476. super(IDFDUT, self).stop_receive()
  477. def get_exceptions(self):
  478. """ Get exceptions detected by DUT receive thread. """
  479. return self._get_from_queue('exceptions')
  480. def get_performance_items(self):
  481. """
  482. DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``.
  483. This method is used to get all performance results.
  484. :return: a list of performance items.
  485. """
  486. return self._get_from_queue('performance_items')
  487. def close(self):
  488. super(IDFDUT, self).close()
  489. if not self.allow_dut_exception and self.get_exceptions():
  490. raise IDFDUTException('DUT exception detected on {}'.format(self))
  491. class ESP32DUT(IDFDUT):
  492. TARGET = 'esp32'
  493. TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
  494. @classmethod
  495. def get_rom(cls):
  496. return targets.ESP32ROM
  497. class ESP32S2DUT(IDFDUT):
  498. TARGET = 'esp32s2'
  499. TOOLCHAIN_PREFIX = 'xtensa-esp32s2-elf-'
  500. @classmethod
  501. def get_rom(cls):
  502. return targets.ESP32S2ROM
  503. class ESP32S3DUT(IDFDUT):
  504. TARGET = 'esp32s3'
  505. TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-'
  506. @classmethod
  507. def get_rom(cls):
  508. return targets.ESP32S3ROM
  509. def erase_partition(self, esp, partition):
  510. raise NotImplementedError()
  511. class ESP32C2DUT(IDFDUT):
  512. TARGET = 'esp32c2'
  513. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  514. @classmethod
  515. def get_rom(cls):
  516. return targets.ESP32C2ROM
  517. class ESP32C3DUT(IDFDUT):
  518. TARGET = 'esp32c3'
  519. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  520. @classmethod
  521. def get_rom(cls):
  522. return targets.ESP32C3ROM
  523. class ESP32C6DUT(IDFDUT):
  524. TARGET = 'esp32c6'
  525. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  526. @classmethod
  527. def get_rom(cls):
  528. return targets.ESP32C6BETAROM
  529. class ESP32H2DUT(IDFDUT):
  530. TARGET = 'esp32h2'
  531. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  532. @classmethod
  533. def get_rom(cls):
  534. return targets.ESP32H2ROM
  535. class ESP8266DUT(IDFDUT):
  536. TARGET = 'esp8266'
  537. TOOLCHAIN_PREFIX = 'xtensa-lx106-elf-'
  538. @classmethod
  539. def get_rom(cls):
  540. return targets.ESP8266ROM
  541. def get_target_by_rom_class(cls):
  542. for c in [ESP32DUT, ESP32S2DUT, ESP32S3DUT, ESP32C2DUT, ESP32C3DUT, ESP32C6DUT, ESP32H2DUT, ESP8266DUT, IDFQEMUDUT]:
  543. if c.get_rom() == cls:
  544. return c.TARGET
  545. return None
  546. class IDFQEMUDUT(IDFDUT):
  547. TARGET = None
  548. TOOLCHAIN_PREFIX = None
  549. ERASE_NVS = True
  550. DEFAULT_EXPECT_TIMEOUT = 30 # longer timeout, since app startup takes more time in QEMU (due to slow SHA emulation)
  551. QEMU_SERIAL_PORT = 3334
  552. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  553. self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix='.bin', prefix='qemu_flash_img')
  554. self.app = app
  555. self.flash_size = 4 * 1024 * 1024
  556. self._write_flash_img()
  557. args = [
  558. 'qemu-system-xtensa',
  559. '-nographic',
  560. '-machine', self.TARGET,
  561. '-drive', 'file={},if=mtd,format=raw'.format(self.flash_image.name),
  562. '-nic', 'user,model=open_eth',
  563. '-serial', 'tcp::{},server,nowait'.format(self.QEMU_SERIAL_PORT),
  564. '-S',
  565. '-global driver=timer.esp32.timg,property=wdt_disable,value=true']
  566. # TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU
  567. if 'QEMU_BIOS_PATH' in os.environ:
  568. args += ['-L', os.environ['QEMU_BIOS_PATH']]
  569. self.qemu = pexpect.spawn(' '.join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
  570. self.qemu.expect_exact(b'(qemu)')
  571. super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
  572. def _write_flash_img(self):
  573. self.flash_image.seek(0)
  574. self.flash_image.write(b'\x00' * self.flash_size)
  575. for offs, path in self.app.flash_files:
  576. with open(path, 'rb') as flash_file:
  577. contents = flash_file.read()
  578. self.flash_image.seek(offs)
  579. self.flash_image.write(contents)
  580. self.flash_image.flush()
  581. @classmethod
  582. def get_rom(cls):
  583. return targets.ESP32ROM
  584. @classmethod
  585. def get_mac(cls, app, port):
  586. # TODO(IDF-1242): get this from QEMU/efuse binary
  587. return '11:22:33:44:55:66'
  588. @classmethod
  589. def confirm_dut(cls, port, **kwargs):
  590. return True, cls.TARGET
  591. def start_app(self, erase_nvs=ERASE_NVS):
  592. # TODO: implement erase_nvs
  593. # since the flash image is generated every time in the constructor, maybe this isn't needed...
  594. self.qemu.sendline(b'cont\n')
  595. self.qemu.expect_exact(b'(qemu)')
  596. def reset(self):
  597. self.qemu.sendline(b'system_reset\n')
  598. self.qemu.expect_exact(b'(qemu)')
  599. def erase_partition(self, partition):
  600. raise NotImplementedError('method erase_partition not implemented')
  601. def erase_flash(self):
  602. raise NotImplementedError('method erase_flash not implemented')
  603. def dump_flash(self, output_file, **kwargs):
  604. raise NotImplementedError('method dump_flash not implemented')
  605. @classmethod
  606. def list_available_ports(cls):
  607. return ['socket://localhost:{}'.format(cls.QEMU_SERIAL_PORT)]
  608. def close(self):
  609. super(IDFQEMUDUT, self).close()
  610. self.qemu.sendline(b'q\n')
  611. self.qemu.expect_exact(b'(qemu)')
  612. for _ in range(self.DEFAULT_EXPECT_TIMEOUT):
  613. if not self.qemu.isalive():
  614. break
  615. time.sleep(1)
  616. else:
  617. self.qemu.terminate(force=True)
  618. class ESP32QEMUDUT(IDFQEMUDUT):
  619. TARGET = 'esp32' # type: ignore
  620. TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-' # type: ignore
  621. class IDFFPGADUT(IDFDUT):
  622. TARGET = None # type: str
  623. TOOLCHAIN_PREFIX = None # type: str
  624. ERASE_NVS = True
  625. FLASH_ENCRYPT_SCHEME = None # type: str
  626. FLASH_ENCRYPT_CNT_KEY = None # type: str
  627. FLASH_ENCRYPT_CNT_VAL = 0
  628. FLASH_ENCRYPT_PURPOSE = None # type: str
  629. SECURE_BOOT_EN_KEY = None # type: str
  630. SECURE_BOOT_EN_VAL = 0
  631. FLASH_SECTOR_SIZE = 4096
  632. def __init__(self, name, port, log_file, app, allow_dut_exception=False, efuse_reset_port=None, **kwargs):
  633. super(IDFFPGADUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
  634. self.esp = self.get_rom()(port)
  635. self.efuses = None
  636. self.efuse_operations = None
  637. self.efuse_reset_port = efuse_reset_port
  638. @classmethod
  639. def get_rom(cls):
  640. raise NotImplementedError('This is an abstraction class, method not defined.')
  641. def erase_partition(self, esp, partition):
  642. raise NotImplementedError()
  643. def enable_efuses(self):
  644. # We use an extra COM port to reset the efuses on FPGA.
  645. # Connect DTR pin of the COM port to the efuse reset pin on daughter board
  646. # Set EFUSEPORT env variable to the extra COM port
  647. if not self.efuse_reset_port:
  648. raise RuntimeError('EFUSEPORT not specified')
  649. # Stop any previous serial port operation
  650. self.stop_receive()
  651. if self.secure_boot_en:
  652. self.esp.connect()
  653. self.efuses, self.efuse_operations = espefuse.get_efuses(self.esp, False, False, True)
  654. def burn_efuse(self, field, val):
  655. if not self.efuse_operations:
  656. self.enable_efuses()
  657. BurnEfuseArgs = collections.namedtuple('burn_efuse_args', ['name_value_pairs'])
  658. args = BurnEfuseArgs({field: val})
  659. self.efuse_operations.burn_efuse(self.esp, self.efuses, args)
  660. def burn_efuse_key(self, key, purpose, block):
  661. if not self.efuse_operations:
  662. self.enable_efuses()
  663. BurnKeyArgs = collections.namedtuple('burn_key_args',
  664. ['keyfile', 'keypurpose', 'block',
  665. 'force_write_always', 'no_write_protect', 'no_read_protect'])
  666. args = BurnKeyArgs([key],
  667. [purpose],
  668. [block],
  669. False, False, False)
  670. self.efuse_operations.burn_key(self.esp, self.efuses, args)
  671. def burn_efuse_key_digest(self, key, purpose, block):
  672. if not self.efuse_operations:
  673. self.enable_efuses()
  674. BurnDigestArgs = collections.namedtuple('burn_key_digest_args',
  675. ['keyfile', 'keypurpose', 'block',
  676. 'force_write_always', 'no_write_protect', 'no_read_protect'])
  677. args = BurnDigestArgs([open(key, 'rb')],
  678. [purpose],
  679. [block],
  680. False, False, True)
  681. self.efuse_operations.burn_key_digest(self.esp, self.efuses, args)
  682. def reset_efuses(self):
  683. if not self.efuse_reset_port:
  684. raise RuntimeError('EFUSEPORT not specified')
  685. with serial.Serial(self.efuse_reset_port) as efuseport:
  686. print('Resetting efuses')
  687. efuseport.dtr = 0
  688. self.port_inst.setRTS(1)
  689. self.port_inst.setRTS(0)
  690. time.sleep(1)
  691. efuseport.dtr = 1
  692. self.efuse_operations = None
  693. self.efuses = None
  694. def sign_data(self, data_file, key_files, version, append_signature=0):
  695. SignDataArgs = collections.namedtuple('sign_data_args',
  696. ['datafile','keyfile','output', 'version', 'append_signatures'])
  697. outfile = tempfile.NamedTemporaryFile()
  698. args = SignDataArgs(data_file, key_files, outfile.name, str(version), append_signature)
  699. espsecure.sign_data(args)
  700. outfile.seek(0)
  701. return outfile.read()
  702. class ESP32C3FPGADUT(IDFFPGADUT):
  703. TARGET = 'esp32c3'
  704. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  705. FLASH_ENCRYPT_SCHEME = 'AES-XTS'
  706. FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT'
  707. FLASH_ENCRYPT_CNT_VAL = 1
  708. FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY'
  709. SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN'
  710. SECURE_BOOT_EN_VAL = 1
  711. @classmethod
  712. def get_rom(cls):
  713. return targets.ESP32C3ROM
  714. def erase_partition(self, esp, partition):
  715. raise NotImplementedError()
  716. def flash_encrypt_burn_cnt(self):
  717. self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL)
  718. def flash_encrypt_burn_key(self, key, block=0):
  719. self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block)
  720. def flash_encrypt_get_scheme(self):
  721. return self.FLASH_ENCRYPT_SCHEME
  722. def secure_boot_burn_en_bit(self):
  723. self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL)
  724. def secure_boot_burn_digest(self, digest, key_index=0, block=0):
  725. self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block)
  726. @classmethod
  727. def confirm_dut(cls, port, **kwargs):
  728. return True, cls.TARGET
  729. class ESP32S3FPGADUT(IDFFPGADUT):
  730. TARGET = 'esp32s3'
  731. TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-'
  732. FLASH_ENCRYPT_SCHEME = 'AES-XTS'
  733. FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT'
  734. FLASH_ENCRYPT_CNT_VAL = 1
  735. FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY'
  736. SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN'
  737. SECURE_BOOT_EN_VAL = 1
  738. @classmethod
  739. def get_rom(cls):
  740. return targets.ESP32S3ROM
  741. def erase_partition(self, esp, partition):
  742. raise NotImplementedError()
  743. def flash_encrypt_burn_cnt(self):
  744. self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL)
  745. def flash_encrypt_burn_key(self, key, block=0):
  746. self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block)
  747. def flash_encrypt_get_scheme(self):
  748. return self.FLASH_ENCRYPT_SCHEME
  749. def secure_boot_burn_en_bit(self):
  750. self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL)
  751. def secure_boot_burn_digest(self, digest, key_index=0, block=0):
  752. self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block)
  753. @classmethod
  754. def confirm_dut(cls, port, **kwargs):
  755. return True, cls.TARGET