IDFDUT.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. # Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http:#www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ DUT for IDF applications """
  15. import collections
  16. import functools
  17. import io
  18. import os
  19. import os.path
  20. import re
  21. import subprocess
  22. import sys
  23. import tempfile
  24. import time
  25. import pexpect
  26. import serial
  27. # python2 and python3 queue package name is different
  28. try:
  29. import Queue as _queue
  30. except ImportError:
  31. import queue as _queue # type: ignore
  32. from serial.tools import list_ports
  33. from tiny_test_fw import DUT, Utility
  34. try:
  35. import esptool
  36. except ImportError: # cheat and use IDF's copy of esptool if available
  37. idf_path = os.getenv('IDF_PATH')
  38. if not idf_path or not os.path.exists(idf_path):
  39. raise
  40. sys.path.insert(0, os.path.join(idf_path, 'components', 'esptool_py', 'esptool'))
  41. import esptool
  42. import espefuse
  43. import espsecure
  44. class IDFToolError(OSError):
  45. pass
  46. class IDFDUTException(RuntimeError):
  47. pass
  48. class IDFRecvThread(DUT.RecvThread):
  49. PERFORMANCE_PATTERN = re.compile(r'\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n')
  50. EXCEPTION_PATTERNS = [
  51. re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
  52. re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)'),
  53. re.compile(r'(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))')
  54. ]
  55. BACKTRACE_PATTERN = re.compile(r'Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)')
  56. BACKTRACE_ADDRESS_PATTERN = re.compile(r'(0x[0-9a-f]{8}):0x[0-9a-f]{8}')
  57. def __init__(self, read, dut):
  58. super(IDFRecvThread, self).__init__(read, dut)
  59. self.exceptions = _queue.Queue()
  60. self.performance_items = _queue.Queue()
  61. def collect_performance(self, comp_data):
  62. matches = self.PERFORMANCE_PATTERN.findall(comp_data)
  63. for match in matches:
  64. Utility.console_log('[Performance][{}]: {}'.format(match[0], match[1]), color='orange')
  65. self.performance_items.put((match[0], match[1]))
  66. def detect_exception(self, comp_data):
  67. for pattern in self.EXCEPTION_PATTERNS:
  68. start = 0
  69. while True:
  70. match = pattern.search(comp_data, pos=start)
  71. if match:
  72. start = match.end()
  73. self.exceptions.put(match.group(0))
  74. Utility.console_log('[Exception]: {}'.format(match.group(0)), color='red')
  75. else:
  76. break
  77. def detect_backtrace(self, comp_data):
  78. start = 0
  79. while True:
  80. match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
  81. if match:
  82. start = match.end()
  83. Utility.console_log('[Backtrace]:{}'.format(match.group(1)), color='red')
  84. # translate backtrace
  85. addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
  86. translated_backtrace = ''
  87. for addr in addresses:
  88. ret = self.dut.lookup_pc_address(addr)
  89. if ret:
  90. translated_backtrace += ret + '\n'
  91. if translated_backtrace:
  92. Utility.console_log('Translated backtrace\n:' + translated_backtrace, color='yellow')
  93. else:
  94. Utility.console_log('Failed to translate backtrace', color='yellow')
  95. else:
  96. break
  97. CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
  98. def _uses_esptool(func):
  99. """ Suspend listener thread, connect with esptool,
  100. call target function with esptool instance,
  101. then resume listening for output
  102. """
  103. @functools.wraps(func)
  104. def handler(self, *args, **kwargs):
  105. self.stop_receive()
  106. settings = self.port_inst.get_settings()
  107. try:
  108. if not self.rom_inst:
  109. if not self.secure_boot_en:
  110. self.rom_inst = esptool.ESPLoader.detect_chip(self.port_inst)
  111. else:
  112. self.rom_inst = self.get_rom()(self.port_inst)
  113. self.rom_inst.connect('hard_reset')
  114. if (self.secure_boot_en):
  115. esp = self.rom_inst
  116. esp.flash_spi_attach(0)
  117. else:
  118. esp = self.rom_inst.run_stub()
  119. ret = func(self, esp, *args, **kwargs)
  120. # do hard reset after use esptool
  121. esp.hard_reset()
  122. finally:
  123. # always need to restore port settings
  124. self.port_inst.apply_settings(settings)
  125. self.start_receive()
  126. return ret
  127. return handler
  128. class IDFDUT(DUT.SerialDUT):
  129. """ IDF DUT, extends serial with esptool methods
  130. (Becomes aware of IDFApp instance which holds app-specific data)
  131. """
  132. # /dev/ttyAMA0 port is listed in Raspberry Pi
  133. # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
  134. INVALID_PORT_PATTERN = re.compile(r'AMA|Bluetooth')
  135. # if need to erase NVS partition in start app
  136. ERASE_NVS = True
  137. RECV_THREAD_CLS = IDFRecvThread
  138. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  139. super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
  140. self.allow_dut_exception = allow_dut_exception
  141. self.exceptions = _queue.Queue()
  142. self.performance_items = _queue.Queue()
  143. self.rom_inst = None
  144. self.secure_boot_en = self.app.get_sdkconfig_config_value('CONFIG_SECURE_BOOT') and \
  145. not self.app.get_sdkconfig_config_value('CONFIG_EFUSE_VIRTUAL')
  146. @classmethod
  147. def get_rom(cls):
  148. raise NotImplementedError('This is an abstraction class, method not defined.')
  149. @classmethod
  150. def get_mac(cls, app, port):
  151. """
  152. get MAC address via esptool
  153. :param app: application instance (to get tool)
  154. :param port: serial port as string
  155. :return: MAC address or None
  156. """
  157. esp = None
  158. try:
  159. esp = cls.get_rom()(port)
  160. esp.connect()
  161. return esp.read_mac()
  162. except RuntimeError:
  163. return None
  164. finally:
  165. if esp:
  166. # do hard reset after use esptool
  167. esp.hard_reset()
  168. esp._port.close()
  169. @classmethod
  170. def confirm_dut(cls, port, **kwargs):
  171. inst = None
  172. try:
  173. expected_rom_class = cls.get_rom()
  174. except NotImplementedError:
  175. expected_rom_class = None
  176. try:
  177. # TODO: check whether 8266 works with this logic
  178. # Otherwise overwrite it in ESP8266DUT
  179. inst = esptool.ESPLoader.detect_chip(port)
  180. if expected_rom_class and type(inst) != expected_rom_class:
  181. raise RuntimeError('Target not expected')
  182. return inst.read_mac() is not None, get_target_by_rom_class(type(inst))
  183. except(esptool.FatalError, RuntimeError):
  184. return False, None
  185. finally:
  186. if inst is not None:
  187. inst._port.close()
  188. def _try_flash(self, erase_nvs):
  189. """
  190. Called by start_app()
  191. :return: None
  192. """
  193. flash_files = []
  194. encrypt_files = []
  195. try:
  196. # Open the files here to prevents us from having to seek back to 0
  197. # each time. Before opening them, we have to organize the lists the
  198. # way esptool.write_flash needs:
  199. # If encrypt is provided, flash_files contains all the files to
  200. # flash.
  201. # Else, flash_files contains the files to be flashed as plain text
  202. # and encrypt_files contains the ones to flash encrypted.
  203. flash_files = self.app.flash_files
  204. encrypt_files = self.app.encrypt_files
  205. encrypt = self.app.flash_settings.get('encrypt', False)
  206. if encrypt:
  207. flash_files = encrypt_files
  208. encrypt_files = []
  209. else:
  210. flash_files = [entry
  211. for entry in flash_files
  212. if entry not in encrypt_files]
  213. flash_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
  214. encrypt_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
  215. if erase_nvs:
  216. address = self.app.partition_table['nvs']['offset']
  217. size = self.app.partition_table['nvs']['size']
  218. nvs_file = tempfile.TemporaryFile()
  219. nvs_file.write(b'\xff' * size)
  220. nvs_file.seek(0)
  221. if not isinstance(address, int):
  222. address = int(address, 0)
  223. # We have to check whether this file needs to be added to
  224. # flash_files list or encrypt_files.
  225. # Get the CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
  226. # value. If it is set to True, then NVS is always encrypted.
  227. sdkconfig_dict = self.app.get_sdkconfig()
  228. macro_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
  229. # If the macro is not enabled (plain text flash) or all files
  230. # must be encrypted, add NVS to flash_files.
  231. if not macro_encryption or encrypt:
  232. flash_files.append((address, nvs_file))
  233. else:
  234. encrypt_files.append((address, nvs_file))
  235. self.write_flash_data(flash_files, encrypt_files, False, encrypt)
  236. finally:
  237. for (_, f) in flash_files:
  238. f.close()
  239. for (_, f) in encrypt_files:
  240. f.close()
  241. @_uses_esptool
  242. def write_flash_data(self, esp, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False):
  243. """
  244. Try flashing at a particular baud rate.
  245. Structured this way so @_uses_esptool will reconnect each time
  246. :return: None
  247. """
  248. last_error = None
  249. for baud_rate in [921600, 115200]:
  250. try:
  251. # fake flasher args object, this is a hack until
  252. # esptool Python API is improved
  253. class FlashArgs(object):
  254. def __init__(self, attributes):
  255. for key, value in attributes.items():
  256. self.__setattr__(key, value)
  257. # write_flash expects the parameter encrypt_files to be None and not
  258. # an empty list, so perform the check here
  259. flash_args = FlashArgs({
  260. 'flash_size': self.app.flash_settings['flash_size'],
  261. 'flash_mode': self.app.flash_settings['flash_mode'],
  262. 'flash_freq': self.app.flash_settings['flash_freq'],
  263. 'addr_filename': flash_files or None,
  264. 'encrypt_files': encrypt_files or None,
  265. 'no_stub': self.secure_boot_en,
  266. 'compress': not self.secure_boot_en,
  267. 'verify': False,
  268. 'encrypt': encrypt,
  269. 'ignore_flash_encryption_efuse_setting': ignore_flash_encryption_efuse_setting,
  270. 'erase_all': False,
  271. 'after': 'no_reset',
  272. })
  273. esp.change_baud(baud_rate)
  274. esptool.detect_flash_size(esp, flash_args)
  275. esptool.write_flash(esp, flash_args)
  276. break
  277. except RuntimeError as e:
  278. last_error = e
  279. else:
  280. raise last_error
  281. def image_info(self, path_to_file):
  282. """
  283. get hash256 of app
  284. :param: path: path to file
  285. :return: sha256 appended to app
  286. """
  287. old_stdout = sys.stdout
  288. new_stdout = io.StringIO()
  289. sys.stdout = new_stdout
  290. class Args(object):
  291. def __init__(self, attributes):
  292. for key, value in attributes.items():
  293. self.__setattr__(key, value)
  294. args = Args({
  295. 'chip': self.TARGET,
  296. 'filename': path_to_file,
  297. })
  298. esptool.image_info(args)
  299. output = new_stdout.getvalue()
  300. sys.stdout = old_stdout
  301. return output
  302. def start_app(self, erase_nvs=ERASE_NVS):
  303. """
  304. download and start app.
  305. :param: erase_nvs: whether erase NVS partition during flash
  306. :return: None
  307. """
  308. self._try_flash(erase_nvs)
  309. def start_app_no_enc(self):
  310. """
  311. download and start app.
  312. :param: erase_nvs: whether erase NVS partition during flash
  313. :return: None
  314. """
  315. flash_files = self.app.flash_files + self.app.encrypt_files
  316. self.write_flash(flash_files)
  317. def write_flash(self, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False):
  318. """
  319. Flash files
  320. :return: None
  321. """
  322. flash_offs_files = []
  323. encrypt_offs_files = []
  324. try:
  325. if flash_files:
  326. flash_offs_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
  327. if encrypt_files:
  328. encrypt_offs_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
  329. self.write_flash_data(flash_offs_files, encrypt_offs_files, ignore_flash_encryption_efuse_setting, encrypt)
  330. finally:
  331. for (_, f) in flash_offs_files:
  332. f.close()
  333. for (_, f) in encrypt_offs_files:
  334. f.close()
  335. def bootloader_flash(self):
  336. """
  337. download bootloader.
  338. :return: None
  339. """
  340. bootloader_path = os.path.join(self.app.binary_path, 'bootloader', 'bootloader.bin')
  341. offs = int(self.app.get_sdkconfig()['CONFIG_BOOTLOADER_OFFSET_IN_FLASH'], 0)
  342. flash_files = [(offs, bootloader_path)]
  343. self.write_flash(flash_files)
  344. @_uses_esptool
  345. def reset(self, esp):
  346. """
  347. hard reset DUT
  348. :return: None
  349. """
  350. # decorator `_use_esptool` will do reset
  351. # so we don't need to do anything in this method
  352. pass
  353. @_uses_esptool
  354. def erase_partition(self, esp, partition):
  355. """
  356. :param partition: partition name to erase
  357. :return: None
  358. """
  359. address = self.app.partition_table[partition]['offset']
  360. size = self.app.partition_table[partition]['size']
  361. esp.erase_region(address, size)
  362. @_uses_esptool
  363. def erase_flash(self, esp):
  364. """
  365. erase the flash completely
  366. :return: None
  367. """
  368. esp.erase_flash()
  369. @_uses_esptool
  370. def dump_flash(self, esp, output_file, **kwargs):
  371. """
  372. dump flash
  373. :param output_file: output file name, if relative path, will use sdk path as base path.
  374. :keyword partition: partition name, dump the partition.
  375. ``partition`` is preferred than using ``address`` and ``size``.
  376. :keyword address: dump from address (need to be used with size)
  377. :keyword size: dump size (need to be used with address)
  378. :return: None
  379. """
  380. if os.path.isabs(output_file) is False:
  381. output_file = os.path.relpath(output_file, self.app.get_log_folder())
  382. if 'partition' in kwargs:
  383. partition = self.app.partition_table[kwargs['partition']]
  384. _address = partition['offset']
  385. _size = partition['size']
  386. elif 'address' in kwargs and 'size' in kwargs:
  387. _address = kwargs['address']
  388. _size = kwargs['size']
  389. else:
  390. raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
  391. content = esp.read_flash(_address, _size)
  392. with open(output_file, 'wb') as f:
  393. f.write(content)
  394. @staticmethod
  395. def _sort_usb_ports(ports):
  396. """
  397. Move the usb ports to the very beginning
  398. :param ports: list of ports
  399. :return: list of ports with usb ports at beginning
  400. """
  401. usb_ports = []
  402. rest_ports = []
  403. for port in ports:
  404. if 'usb' in port.lower():
  405. usb_ports.append(port)
  406. else:
  407. rest_ports.append(port)
  408. return usb_ports + rest_ports
  409. @classmethod
  410. def list_available_ports(cls):
  411. # It will return other kinds of ports as well, such as ttyS* ports.
  412. # Give the usb ports higher priority
  413. ports = cls._sort_usb_ports([x.device for x in list_ports.comports()])
  414. espport = os.getenv('ESPPORT')
  415. if not espport:
  416. # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`:
  417. # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`.
  418. # This means we need to make all 3 conditions fail, to filter out the port.
  419. # So some part of the filters will not be straight forward to users.
  420. # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand.
  421. # Filter out invalid port by our own will be much simpler.
  422. return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)]
  423. # On MacOs with python3.6: type of espport is already utf8
  424. if isinstance(espport, type(u'')):
  425. port_hint = espport
  426. else:
  427. port_hint = espport.decode('utf8')
  428. # If $ESPPORT is a valid port, make it appear first in the list
  429. if port_hint in ports:
  430. ports.remove(port_hint)
  431. return [port_hint] + ports
  432. # On macOS, user may set ESPPORT to /dev/tty.xxx while
  433. # pySerial lists only the corresponding /dev/cu.xxx port
  434. if sys.platform == 'darwin' and 'tty.' in port_hint:
  435. port_hint = port_hint.replace('tty.', 'cu.')
  436. if port_hint in ports:
  437. ports.remove(port_hint)
  438. return [port_hint] + ports
  439. return ports
  440. def lookup_pc_address(self, pc_addr):
  441. cmd = ['%saddr2line' % self.TOOLCHAIN_PREFIX,
  442. '-pfiaC', '-e', self.app.elf_file, pc_addr]
  443. ret = ''
  444. try:
  445. translation = subprocess.check_output(cmd)
  446. ret = translation.decode()
  447. except OSError:
  448. pass
  449. return ret
  450. @staticmethod
  451. def _queue_read_all(source_queue):
  452. output = []
  453. while True:
  454. try:
  455. output.append(source_queue.get(timeout=0))
  456. except _queue.Empty:
  457. break
  458. return output
  459. def _queue_copy(self, source_queue, dest_queue):
  460. data = self._queue_read_all(source_queue)
  461. for d in data:
  462. dest_queue.put(d)
  463. def _get_from_queue(self, queue_name):
  464. self_queue = getattr(self, queue_name)
  465. if self.receive_thread:
  466. recv_thread_queue = getattr(self.receive_thread, queue_name)
  467. self._queue_copy(recv_thread_queue, self_queue)
  468. return self._queue_read_all(self_queue)
  469. def stop_receive(self):
  470. if self.receive_thread:
  471. for name in ['performance_items', 'exceptions']:
  472. source_queue = getattr(self.receive_thread, name)
  473. dest_queue = getattr(self, name)
  474. self._queue_copy(source_queue, dest_queue)
  475. super(IDFDUT, self).stop_receive()
  476. def get_exceptions(self):
  477. """ Get exceptions detected by DUT receive thread. """
  478. return self._get_from_queue('exceptions')
  479. def get_performance_items(self):
  480. """
  481. DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``.
  482. This method is used to get all performance results.
  483. :return: a list of performance items.
  484. """
  485. return self._get_from_queue('performance_items')
  486. def close(self):
  487. super(IDFDUT, self).close()
  488. if not self.allow_dut_exception and self.get_exceptions():
  489. raise IDFDUTException('DUT exception detected on {}'.format(self))
  490. class ESP32DUT(IDFDUT):
  491. TARGET = 'esp32'
  492. TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
  493. @classmethod
  494. def get_rom(cls):
  495. return esptool.ESP32ROM
  496. class ESP32S2DUT(IDFDUT):
  497. TARGET = 'esp32s2'
  498. TOOLCHAIN_PREFIX = 'xtensa-esp32s2-elf-'
  499. @classmethod
  500. def get_rom(cls):
  501. return esptool.ESP32S2ROM
  502. class ESP32S3DUT(IDFDUT):
  503. TARGET = 'esp32s3'
  504. TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-'
  505. @classmethod
  506. def get_rom(cls):
  507. return esptool.ESP32S3ROM
  508. def erase_partition(self, esp, partition):
  509. raise NotImplementedError()
  510. class ESP32C3DUT(IDFDUT):
  511. TARGET = 'esp32c3'
  512. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  513. @classmethod
  514. def get_rom(cls):
  515. return esptool.ESP32C3ROM
  516. class ESP8266DUT(IDFDUT):
  517. TARGET = 'esp8266'
  518. TOOLCHAIN_PREFIX = 'xtensa-lx106-elf-'
  519. @classmethod
  520. def get_rom(cls):
  521. return esptool.ESP8266ROM
  522. def get_target_by_rom_class(cls):
  523. for c in [ESP32DUT, ESP32S2DUT, ESP32S3DUT, ESP32C3DUT, ESP8266DUT, IDFQEMUDUT]:
  524. if c.get_rom() == cls:
  525. return c.TARGET
  526. return None
  527. class IDFQEMUDUT(IDFDUT):
  528. TARGET = None
  529. TOOLCHAIN_PREFIX = None
  530. ERASE_NVS = True
  531. DEFAULT_EXPECT_TIMEOUT = 30 # longer timeout, since app startup takes more time in QEMU (due to slow SHA emulation)
  532. QEMU_SERIAL_PORT = 3334
  533. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  534. self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix='.bin', prefix='qemu_flash_img')
  535. self.app = app
  536. self.flash_size = 4 * 1024 * 1024
  537. self._write_flash_img()
  538. args = [
  539. 'qemu-system-xtensa',
  540. '-nographic',
  541. '-machine', self.TARGET,
  542. '-drive', 'file={},if=mtd,format=raw'.format(self.flash_image.name),
  543. '-nic', 'user,model=open_eth',
  544. '-serial', 'tcp::{},server,nowait'.format(self.QEMU_SERIAL_PORT),
  545. '-S',
  546. '-global driver=timer.esp32.timg,property=wdt_disable,value=true']
  547. # TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU
  548. if 'QEMU_BIOS_PATH' in os.environ:
  549. args += ['-L', os.environ['QEMU_BIOS_PATH']]
  550. self.qemu = pexpect.spawn(' '.join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
  551. self.qemu.expect_exact(b'(qemu)')
  552. super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
  553. def _write_flash_img(self):
  554. self.flash_image.seek(0)
  555. self.flash_image.write(b'\x00' * self.flash_size)
  556. for offs, path in self.app.flash_files:
  557. with open(path, 'rb') as flash_file:
  558. contents = flash_file.read()
  559. self.flash_image.seek(offs)
  560. self.flash_image.write(contents)
  561. self.flash_image.flush()
  562. @classmethod
  563. def get_rom(cls):
  564. return esptool.ESP32ROM
  565. @classmethod
  566. def get_mac(cls, app, port):
  567. # TODO(IDF-1242): get this from QEMU/efuse binary
  568. return '11:22:33:44:55:66'
  569. @classmethod
  570. def confirm_dut(cls, port, **kwargs):
  571. return True, cls.TARGET
  572. def start_app(self, erase_nvs=ERASE_NVS):
  573. # TODO: implement erase_nvs
  574. # since the flash image is generated every time in the constructor, maybe this isn't needed...
  575. self.qemu.sendline(b'cont\n')
  576. self.qemu.expect_exact(b'(qemu)')
  577. def reset(self):
  578. self.qemu.sendline(b'system_reset\n')
  579. self.qemu.expect_exact(b'(qemu)')
  580. def erase_partition(self, partition):
  581. raise NotImplementedError('method erase_partition not implemented')
  582. def erase_flash(self):
  583. raise NotImplementedError('method erase_flash not implemented')
  584. def dump_flash(self, output_file, **kwargs):
  585. raise NotImplementedError('method dump_flash not implemented')
  586. @classmethod
  587. def list_available_ports(cls):
  588. return ['socket://localhost:{}'.format(cls.QEMU_SERIAL_PORT)]
  589. def close(self):
  590. super(IDFQEMUDUT, self).close()
  591. self.qemu.sendline(b'q\n')
  592. self.qemu.expect_exact(b'(qemu)')
  593. for _ in range(self.DEFAULT_EXPECT_TIMEOUT):
  594. if not self.qemu.isalive():
  595. break
  596. time.sleep(1)
  597. else:
  598. self.qemu.terminate(force=True)
  599. class ESP32QEMUDUT(IDFQEMUDUT):
  600. TARGET = 'esp32' # type: ignore
  601. TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-' # type: ignore
  602. class IDFFPGADUT(IDFDUT):
  603. TARGET = None # type: str
  604. TOOLCHAIN_PREFIX = None # type: str
  605. ERASE_NVS = True
  606. FLASH_ENCRYPT_SCHEME = None # type: str
  607. FLASH_ENCRYPT_CNT_KEY = None # type: str
  608. FLASH_ENCRYPT_CNT_VAL = 0
  609. FLASH_ENCRYPT_PURPOSE = None # type: str
  610. SECURE_BOOT_EN_KEY = None # type: str
  611. SECURE_BOOT_EN_VAL = 0
  612. FLASH_SECTOR_SIZE = 4096
  613. def __init__(self, name, port, log_file, app, allow_dut_exception=False, efuse_reset_port=None, **kwargs):
  614. super(IDFFPGADUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
  615. self.esp = self.get_rom()(port)
  616. self.efuses = None
  617. self.efuse_operations = None
  618. self.efuse_reset_port = efuse_reset_port
  619. @classmethod
  620. def get_rom(cls):
  621. raise NotImplementedError('This is an abstraction class, method not defined.')
  622. def erase_partition(self, esp, partition):
  623. raise NotImplementedError()
  624. def enable_efuses(self):
  625. # We use an extra COM port to reset the efuses on FPGA.
  626. # Connect DTR pin of the COM port to the efuse reset pin on daughter board
  627. # Set EFUSEPORT env variable to the extra COM port
  628. if not self.efuse_reset_port:
  629. raise RuntimeError('EFUSEPORT not specified')
  630. # Stop any previous serial port operation
  631. self.stop_receive()
  632. if self.secure_boot_en:
  633. self.esp.connect()
  634. self.efuses, self.efuse_operations = espefuse.get_efuses(self.esp, False, False, True)
  635. def burn_efuse(self, field, val):
  636. if not self.efuse_operations:
  637. self.enable_efuses()
  638. BurnEfuseArgs = collections.namedtuple('burn_efuse_args', ['name_value_pairs', 'only_burn_at_end'])
  639. args = BurnEfuseArgs({field: val}, False)
  640. self.efuse_operations.burn_efuse(self.esp, self.efuses, args)
  641. def burn_efuse_key(self, key, purpose, block):
  642. if not self.efuse_operations:
  643. self.enable_efuses()
  644. BurnKeyArgs = collections.namedtuple('burn_key_args',
  645. ['keyfile', 'keypurpose', 'block',
  646. 'force_write_always', 'no_write_protect', 'no_read_protect', 'only_burn_at_end'])
  647. args = BurnKeyArgs([key],
  648. [purpose],
  649. [block],
  650. False, False, False, False)
  651. self.efuse_operations.burn_key(self.esp, self.efuses, args)
  652. def burn_efuse_key_digest(self, key, purpose, block):
  653. if not self.efuse_operations:
  654. self.enable_efuses()
  655. BurnDigestArgs = collections.namedtuple('burn_key_digest_args',
  656. ['keyfile', 'keypurpose', 'block',
  657. 'force_write_always', 'no_write_protect', 'no_read_protect', 'only_burn_at_end'])
  658. args = BurnDigestArgs([open(key, 'rb')],
  659. [purpose],
  660. [block],
  661. False, False, True, False)
  662. self.efuse_operations.burn_key_digest(self.esp, self.efuses, args)
  663. def reset_efuses(self):
  664. if not self.efuse_reset_port:
  665. raise RuntimeError('EFUSEPORT not specified')
  666. with serial.Serial(self.efuse_reset_port) as efuseport:
  667. print('Resetting efuses')
  668. efuseport.dtr = 0
  669. self.port_inst.setRTS(1)
  670. self.port_inst.setRTS(0)
  671. time.sleep(1)
  672. efuseport.dtr = 1
  673. self.efuse_operations = None
  674. self.efuses = None
  675. def sign_data(self, data_file, key_files, version, append_signature=0):
  676. SignDataArgs = collections.namedtuple('sign_data_args',
  677. ['datafile','keyfile','output', 'version', 'append_signatures'])
  678. outfile = tempfile.NamedTemporaryFile()
  679. args = SignDataArgs(data_file, key_files, outfile.name, str(version), append_signature)
  680. espsecure.sign_data(args)
  681. outfile.seek(0)
  682. return outfile.read()
  683. class ESP32C3FPGADUT(IDFFPGADUT):
  684. TARGET = 'esp32c3'
  685. TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
  686. FLASH_ENCRYPT_SCHEME = 'AES-XTS'
  687. FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT'
  688. FLASH_ENCRYPT_CNT_VAL = 1
  689. FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY'
  690. SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN'
  691. SECURE_BOOT_EN_VAL = 1
  692. @classmethod
  693. def get_rom(cls):
  694. return esptool.ESP32C3ROM
  695. def erase_partition(self, esp, partition):
  696. raise NotImplementedError()
  697. def flash_encrypt_burn_cnt(self):
  698. self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL)
  699. def flash_encrypt_burn_key(self, key, block=0):
  700. self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block)
  701. def flash_encrypt_get_scheme(self):
  702. return self.FLASH_ENCRYPT_SCHEME
  703. def secure_boot_burn_en_bit(self):
  704. self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL)
  705. def secure_boot_burn_digest(self, digest, key_index=0, block=0):
  706. self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block)
  707. @classmethod
  708. def confirm_dut(cls, port, **kwargs):
  709. return True, cls.TARGET
  710. class ESP32S3FPGADUT(IDFFPGADUT):
  711. TARGET = 'esp32s3'
  712. TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-'
  713. FLASH_ENCRYPT_SCHEME = 'AES-XTS'
  714. FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT'
  715. FLASH_ENCRYPT_CNT_VAL = 1
  716. FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY'
  717. SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN'
  718. SECURE_BOOT_EN_VAL = 1
  719. @classmethod
  720. def get_rom(cls):
  721. return esptool.ESP32S3ROM
  722. def erase_partition(self, esp, partition):
  723. raise NotImplementedError()
  724. def flash_encrypt_burn_cnt(self):
  725. self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL)
  726. def flash_encrypt_burn_key(self, key, block=0):
  727. self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block)
  728. def flash_encrypt_get_scheme(self):
  729. return self.FLASH_ENCRYPT_SCHEME
  730. def secure_boot_burn_en_bit(self):
  731. self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL)
  732. def secure_boot_burn_digest(self, digest, key_index=0, block=0):
  733. self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block)
  734. @classmethod
  735. def confirm_dut(cls, port, **kwargs):
  736. return True, cls.TARGET