IDFDUT.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. # Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http:#www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ DUT for IDF applications """
  15. import os
  16. import os.path
  17. import sys
  18. import re
  19. import functools
  20. import tempfile
  21. import subprocess
  22. import time
  23. import pexpect
  24. # python2 and python3 queue package name is different
  25. try:
  26. import Queue as _queue
  27. except ImportError:
  28. import queue as _queue
  29. from serial.tools import list_ports
  30. from tiny_test_fw import DUT, Utility
  31. try:
  32. import esptool
  33. except ImportError: # cheat and use IDF's copy of esptool if available
  34. idf_path = os.getenv("IDF_PATH")
  35. if not idf_path or not os.path.exists(idf_path):
  36. raise
  37. sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
  38. import esptool
  39. class IDFToolError(OSError):
  40. pass
  41. class IDFDUTException(RuntimeError):
  42. pass
  43. class IDFRecvThread(DUT.RecvThread):
  44. PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
  45. EXCEPTION_PATTERNS = [
  46. re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
  47. re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
  48. re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
  49. ]
  50. BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
  51. BACKTRACE_ADDRESS_PATTERN = re.compile(r"(0x[0-9a-f]{8}):0x[0-9a-f]{8}")
  52. def __init__(self, read, dut):
  53. super(IDFRecvThread, self).__init__(read, dut)
  54. self.exceptions = _queue.Queue()
  55. self.performance_items = _queue.Queue()
  56. def collect_performance(self, comp_data):
  57. matches = self.PERFORMANCE_PATTERN.findall(comp_data)
  58. for match in matches:
  59. Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
  60. color="orange")
  61. self.performance_items.put((match[0], match[1]))
  62. def detect_exception(self, comp_data):
  63. for pattern in self.EXCEPTION_PATTERNS:
  64. start = 0
  65. while True:
  66. match = pattern.search(comp_data, pos=start)
  67. if match:
  68. start = match.end()
  69. self.exceptions.put(match.group(0))
  70. Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
  71. else:
  72. break
  73. def detect_backtrace(self, comp_data):
  74. start = 0
  75. while True:
  76. match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
  77. if match:
  78. start = match.end()
  79. Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
  80. # translate backtrace
  81. addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
  82. translated_backtrace = ""
  83. for addr in addresses:
  84. ret = self.dut.lookup_pc_address(addr)
  85. if ret:
  86. translated_backtrace += ret + "\n"
  87. if translated_backtrace:
  88. Utility.console_log("Translated backtrace\n:" + translated_backtrace, color="yellow")
  89. else:
  90. Utility.console_log("Failed to translate backtrace", color="yellow")
  91. else:
  92. break
  93. CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
  94. def _uses_esptool(func):
  95. """ Suspend listener thread, connect with esptool,
  96. call target function with esptool instance,
  97. then resume listening for output
  98. """
  99. @functools.wraps(func)
  100. def handler(self, *args, **kwargs):
  101. self.stop_receive()
  102. settings = self.port_inst.get_settings()
  103. try:
  104. if not self._rom_inst:
  105. self._rom_inst = esptool.ESPLoader.detect_chip(self.port_inst)
  106. self._rom_inst.connect('hard_reset')
  107. esp = self._rom_inst.run_stub()
  108. ret = func(self, esp, *args, **kwargs)
  109. # do hard reset after use esptool
  110. esp.hard_reset()
  111. finally:
  112. # always need to restore port settings
  113. self.port_inst.apply_settings(settings)
  114. self.start_receive()
  115. return ret
  116. return handler
  117. class IDFDUT(DUT.SerialDUT):
  118. """ IDF DUT, extends serial with esptool methods
  119. (Becomes aware of IDFApp instance which holds app-specific data)
  120. """
  121. # /dev/ttyAMA0 port is listed in Raspberry Pi
  122. # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
  123. INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
  124. # if need to erase NVS partition in start app
  125. ERASE_NVS = True
  126. RECV_THREAD_CLS = IDFRecvThread
  127. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  128. super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
  129. self.allow_dut_exception = allow_dut_exception
  130. self.exceptions = _queue.Queue()
  131. self.performance_items = _queue.Queue()
  132. self._rom_inst = None
  133. @classmethod
  134. def _get_rom(cls):
  135. raise NotImplementedError("This is an abstraction class, method not defined.")
  136. @classmethod
  137. def get_mac(cls, app, port):
  138. """
  139. get MAC address via esptool
  140. :param app: application instance (to get tool)
  141. :param port: serial port as string
  142. :return: MAC address or None
  143. """
  144. esp = None
  145. try:
  146. esp = cls._get_rom()(port)
  147. esp.connect()
  148. return esp.read_mac()
  149. except RuntimeError:
  150. return None
  151. finally:
  152. if esp:
  153. # do hard reset after use esptool
  154. esp.hard_reset()
  155. esp._port.close()
  156. @classmethod
  157. def confirm_dut(cls, port, **kwargs):
  158. inst = None
  159. try:
  160. expected_rom_class = cls._get_rom()
  161. except NotImplementedError:
  162. expected_rom_class = None
  163. try:
  164. # TODO: check whether 8266 works with this logic
  165. # Otherwise overwrite it in ESP8266DUT
  166. inst = esptool.ESPLoader.detect_chip(port)
  167. if expected_rom_class and type(inst) != expected_rom_class:
  168. raise RuntimeError("Target not expected")
  169. return inst.read_mac() is not None, get_target_by_rom_class(type(inst))
  170. except(esptool.FatalError, RuntimeError):
  171. return False, None
  172. finally:
  173. if inst is not None:
  174. inst._port.close()
  175. @_uses_esptool
  176. def _try_flash(self, esp, erase_nvs, baud_rate):
  177. """
  178. Called by start_app() to try flashing at a particular baud rate.
  179. Structured this way so @_uses_esptool will reconnect each time
  180. """
  181. flash_files = []
  182. try:
  183. # note: opening here prevents us from having to seek back to 0 each time
  184. flash_files = [(offs, open(path, "rb")) for (offs, path) in self.app.flash_files]
  185. if erase_nvs:
  186. address = self.app.partition_table["nvs"]["offset"]
  187. size = self.app.partition_table["nvs"]["size"]
  188. nvs_file = tempfile.TemporaryFile()
  189. nvs_file.write(b'\xff' * size)
  190. nvs_file.seek(0)
  191. if not isinstance(address, int):
  192. address = int(address, 0)
  193. flash_files.append((address, nvs_file))
  194. # fake flasher args object, this is a hack until
  195. # esptool Python API is improved
  196. class FlashArgs(object):
  197. def __init__(self, attributes):
  198. for key, value in attributes.items():
  199. self.__setattr__(key, value)
  200. flash_args = FlashArgs({
  201. 'flash_size': self.app.flash_settings["flash_size"],
  202. 'flash_mode': self.app.flash_settings["flash_mode"],
  203. 'flash_freq': self.app.flash_settings["flash_freq"],
  204. 'addr_filename': flash_files,
  205. 'no_stub': False,
  206. 'compress': True,
  207. 'verify': False,
  208. 'encrypt': self.app.flash_settings.get("encrypt", False),
  209. 'erase_all': False,
  210. })
  211. esp.change_baud(baud_rate)
  212. esptool.detect_flash_size(esp, flash_args)
  213. esptool.write_flash(esp, flash_args)
  214. finally:
  215. for (_, f) in flash_files:
  216. f.close()
  217. def start_app(self, erase_nvs=ERASE_NVS):
  218. """
  219. download and start app.
  220. :param: erase_nvs: whether erase NVS partition during flash
  221. :return: None
  222. """
  223. for baud_rate in [921600, 115200]:
  224. try:
  225. self._try_flash(erase_nvs, baud_rate)
  226. break
  227. except RuntimeError:
  228. continue
  229. else:
  230. raise IDFToolError()
  231. @_uses_esptool
  232. def reset(self, esp):
  233. """
  234. hard reset DUT
  235. :return: None
  236. """
  237. # decorator `_use_esptool` will do reset
  238. # so we don't need to do anything in this method
  239. pass
  240. @_uses_esptool
  241. def erase_partition(self, esp, partition):
  242. """
  243. :param partition: partition name to erase
  244. :return: None
  245. """
  246. raise NotImplementedError() # TODO: implement this
  247. # address = self.app.partition_table[partition]["offset"]
  248. size = self.app.partition_table[partition]["size"]
  249. # TODO can use esp.erase_region() instead of this, I think
  250. with open(".erase_partition.tmp", "wb") as f:
  251. f.write(chr(0xFF) * size)
  252. @_uses_esptool
  253. def dump_flush(self, esp, output_file, **kwargs):
  254. """
  255. dump flush
  256. :param output_file: output file name, if relative path, will use sdk path as base path.
  257. :keyword partition: partition name, dump the partition.
  258. ``partition`` is preferred than using ``address`` and ``size``.
  259. :keyword address: dump from address (need to be used with size)
  260. :keyword size: dump size (need to be used with address)
  261. :return: None
  262. """
  263. if os.path.isabs(output_file) is False:
  264. output_file = os.path.relpath(output_file, self.app.get_log_folder())
  265. if "partition" in kwargs:
  266. partition = self.app.partition_table[kwargs["partition"]]
  267. _address = partition["offset"]
  268. _size = partition["size"]
  269. elif "address" in kwargs and "size" in kwargs:
  270. _address = kwargs["address"]
  271. _size = kwargs["size"]
  272. else:
  273. raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
  274. content = esp.read_flash(_address, _size)
  275. with open(output_file, "wb") as f:
  276. f.write(content)
  277. @classmethod
  278. def list_available_ports(cls):
  279. ports = [x.device for x in list_ports.comports()]
  280. espport = os.getenv('ESPPORT')
  281. if not espport:
  282. # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`:
  283. # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`.
  284. # This means we need to make all 3 conditions fail, to filter out the port.
  285. # So some part of the filters will not be straight forward to users.
  286. # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand.
  287. # Filter out invalid port by our own will be much simpler.
  288. return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)]
  289. # On MacOs with python3.6: type of espport is already utf8
  290. if isinstance(espport, type(u'')):
  291. port_hint = espport
  292. else:
  293. port_hint = espport.decode('utf8')
  294. # If $ESPPORT is a valid port, make it appear first in the list
  295. if port_hint in ports:
  296. ports.remove(port_hint)
  297. return [port_hint] + ports
  298. # On macOS, user may set ESPPORT to /dev/tty.xxx while
  299. # pySerial lists only the corresponding /dev/cu.xxx port
  300. if sys.platform == 'darwin' and 'tty.' in port_hint:
  301. port_hint = port_hint.replace('tty.', 'cu.')
  302. if port_hint in ports:
  303. ports.remove(port_hint)
  304. return [port_hint] + ports
  305. return ports
  306. def lookup_pc_address(self, pc_addr):
  307. cmd = ["%saddr2line" % self.TOOLCHAIN_PREFIX,
  308. "-pfiaC", "-e", self.app.elf_file, pc_addr]
  309. ret = ""
  310. try:
  311. translation = subprocess.check_output(cmd)
  312. ret = translation.decode()
  313. except OSError:
  314. pass
  315. return ret
  316. @staticmethod
  317. def _queue_read_all(source_queue):
  318. output = []
  319. while True:
  320. try:
  321. output.append(source_queue.get(timeout=0))
  322. except _queue.Empty:
  323. break
  324. return output
  325. def _queue_copy(self, source_queue, dest_queue):
  326. data = self._queue_read_all(source_queue)
  327. for d in data:
  328. dest_queue.put(d)
  329. def _get_from_queue(self, queue_name):
  330. self_queue = getattr(self, queue_name)
  331. if self.receive_thread:
  332. recv_thread_queue = getattr(self.receive_thread, queue_name)
  333. self._queue_copy(recv_thread_queue, self_queue)
  334. return self._queue_read_all(self_queue)
  335. def stop_receive(self):
  336. if self.receive_thread:
  337. for name in ["performance_items", "exceptions"]:
  338. source_queue = getattr(self.receive_thread, name)
  339. dest_queue = getattr(self, name)
  340. self._queue_copy(source_queue, dest_queue)
  341. super(IDFDUT, self).stop_receive()
  342. def get_exceptions(self):
  343. """ Get exceptions detected by DUT receive thread. """
  344. return self._get_from_queue("exceptions")
  345. def get_performance_items(self):
  346. """
  347. DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``.
  348. This method is used to get all performance results.
  349. :return: a list of performance items.
  350. """
  351. return self._get_from_queue("performance_items")
  352. def close(self):
  353. super(IDFDUT, self).close()
  354. if not self.allow_dut_exception and self.get_exceptions():
  355. Utility.console_log("DUT exception detected on {}".format(self), color="red")
  356. raise IDFDUTException()
  357. class ESP32DUT(IDFDUT):
  358. TARGET = "esp32"
  359. TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
  360. @classmethod
  361. def _get_rom(cls):
  362. return esptool.ESP32ROM
  363. class ESP32S2DUT(IDFDUT):
  364. TARGET = "esp32s2"
  365. TOOLCHAIN_PREFIX = "xtensa-esp32s2-elf-"
  366. @classmethod
  367. def _get_rom(cls):
  368. return esptool.ESP32S2ROM
  369. class ESP8266DUT(IDFDUT):
  370. TARGET = "esp8266"
  371. TOOLCHAIN_PREFIX = "xtensa-lx106-elf-"
  372. @classmethod
  373. def _get_rom(cls):
  374. return esptool.ESP8266ROM
  375. def get_target_by_rom_class(cls):
  376. for c in [ESP32DUT, ESP32S2DUT, ESP8266DUT, IDFQEMUDUT]:
  377. if c._get_rom() == cls:
  378. return c.TARGET
  379. return None
  380. class IDFQEMUDUT(IDFDUT):
  381. TARGET = None
  382. TOOLCHAIN_PREFIX = None
  383. ERASE_NVS = True
  384. DEFAULT_EXPECT_TIMEOUT = 30 # longer timeout, since app startup takes more time in QEMU (due to slow SHA emulation)
  385. QEMU_SERIAL_PORT = 3334
  386. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  387. self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix=".bin", prefix="qemu_flash_img")
  388. self.app = app
  389. self.flash_size = 4 * 1024 * 1024
  390. self._write_flash_img()
  391. args = [
  392. "qemu-system-xtensa",
  393. "-nographic",
  394. "-machine", self.TARGET,
  395. "-drive", "file={},if=mtd,format=raw".format(self.flash_image.name),
  396. "-nic", "user,model=open_eth",
  397. "-serial", "tcp::{},server,nowait".format(self.QEMU_SERIAL_PORT),
  398. "-S",
  399. "-global driver=timer.esp32.timg,property=wdt_disable,value=true"]
  400. # TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU
  401. if "QEMU_BIOS_PATH" in os.environ:
  402. args += ["-L", os.environ["QEMU_BIOS_PATH"]]
  403. self.qemu = pexpect.spawn(" ".join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
  404. self.qemu.expect_exact(b"(qemu)")
  405. super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
  406. def _write_flash_img(self):
  407. self.flash_image.seek(0)
  408. self.flash_image.write(b'\x00' * self.flash_size)
  409. for offs, path in self.app.flash_files:
  410. with open(path, "rb") as flash_file:
  411. contents = flash_file.read()
  412. self.flash_image.seek(offs)
  413. self.flash_image.write(contents)
  414. self.flash_image.flush()
  415. @classmethod
  416. def _get_rom(cls):
  417. return esptool.ESP32ROM
  418. @classmethod
  419. def get_mac(cls, app, port):
  420. # TODO(IDF-1242): get this from QEMU/efuse binary
  421. return "11:22:33:44:55:66"
  422. @classmethod
  423. def confirm_dut(cls, port, **kwargs):
  424. return True, cls.TARGET
  425. def start_app(self, erase_nvs=ERASE_NVS):
  426. # TODO: implement erase_nvs
  427. # since the flash image is generated every time in the constructor, maybe this isn't needed...
  428. self.qemu.sendline(b"cont\n")
  429. self.qemu.expect_exact(b"(qemu)")
  430. def reset(self):
  431. self.qemu.sendline(b"system_reset\n")
  432. self.qemu.expect_exact(b"(qemu)")
  433. def erase_partition(self, partition):
  434. raise NotImplementedError("method not erase_partition not implemented")
  435. def dump_flush(self, output_file, **kwargs):
  436. raise NotImplementedError("method not dump_flush not implemented")
  437. @classmethod
  438. def list_available_ports(cls):
  439. return ["socket://localhost:{}".format(cls.QEMU_SERIAL_PORT)]
  440. def close(self):
  441. super(IDFQEMUDUT, self).close()
  442. self.qemu.sendline(b"q\n")
  443. self.qemu.expect_exact(b"(qemu)")
  444. for _ in range(self.DEFAULT_EXPECT_TIMEOUT):
  445. if not self.qemu.isalive():
  446. break
  447. time.sleep(1)
  448. else:
  449. self.qemu.terminate(force=True)
  450. class ESP32QEMUDUT(IDFQEMUDUT):
  451. TARGET = "esp32"
  452. TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"