IDFDUT.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. # Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http:#www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ DUT for IDF applications """
  15. import os
  16. import os.path
  17. import sys
  18. import re
  19. import functools
  20. import tempfile
  21. import subprocess
  22. # python2 and python3 queue package name is different
  23. try:
  24. import Queue as _queue
  25. except ImportError:
  26. import queue as _queue
  27. from serial.tools import list_ports
  28. from tiny_test_fw import DUT, Utility
  29. try:
  30. import esptool
  31. except ImportError: # cheat and use IDF's copy of esptool if available
  32. idf_path = os.getenv("IDF_PATH")
  33. if not idf_path or not os.path.exists(idf_path):
  34. raise
  35. sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
  36. import esptool
  37. class IDFToolError(OSError):
  38. pass
  39. class IDFDUTException(RuntimeError):
  40. pass
  41. class IDFRecvThread(DUT.RecvThread):
  42. PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
  43. EXCEPTION_PATTERNS = [
  44. re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
  45. re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
  46. re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
  47. ]
  48. BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
  49. BACKTRACE_ADDRESS_PATTERN = re.compile(r"(0x[0-9a-f]{8}):0x[0-9a-f]{8}")
  50. def __init__(self, read, dut):
  51. super(IDFRecvThread, self).__init__(read, dut)
  52. self.exceptions = _queue.Queue()
  53. self.performance_items = _queue.Queue()
  54. def collect_performance(self, comp_data):
  55. matches = self.PERFORMANCE_PATTERN.findall(comp_data)
  56. for match in matches:
  57. Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
  58. color="orange")
  59. self.performance_items.put((match[0], match[1]))
  60. def detect_exception(self, comp_data):
  61. for pattern in self.EXCEPTION_PATTERNS:
  62. start = 0
  63. while True:
  64. match = pattern.search(comp_data, pos=start)
  65. if match:
  66. start = match.end()
  67. self.exceptions.put(match.group(0))
  68. Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
  69. else:
  70. break
  71. def detect_backtrace(self, comp_data):
  72. start = 0
  73. while True:
  74. match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
  75. if match:
  76. start = match.end()
  77. Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
  78. # translate backtrace
  79. addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
  80. translated_backtrace = ""
  81. for addr in addresses:
  82. ret = self.dut.lookup_pc_address(addr)
  83. if ret:
  84. translated_backtrace += ret + "\n"
  85. if translated_backtrace:
  86. Utility.console_log("Translated backtrace\n:" + translated_backtrace, color="yellow")
  87. else:
  88. Utility.console_log("Failed to translate backtrace", color="yellow")
  89. else:
  90. break
  91. CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
  92. def _uses_esptool(func):
  93. """ Suspend listener thread, connect with esptool,
  94. call target function with esptool instance,
  95. then resume listening for output
  96. """
  97. @functools.wraps(func)
  98. def handler(self, *args, **kwargs):
  99. self.stop_receive()
  100. settings = self.port_inst.get_settings()
  101. try:
  102. rom = esptool.ESP32ROM(self.port_inst)
  103. rom.connect('hard_reset')
  104. esp = rom.run_stub()
  105. ret = func(self, esp, *args, **kwargs)
  106. # do hard reset after use esptool
  107. esp.hard_reset()
  108. finally:
  109. # always need to restore port settings
  110. self.port_inst.apply_settings(settings)
  111. self.start_receive()
  112. return ret
  113. return handler
  114. class IDFDUT(DUT.SerialDUT):
  115. """ IDF DUT, extends serial with esptool methods
  116. (Becomes aware of IDFApp instance which holds app-specific data)
  117. """
  118. # /dev/ttyAMA0 port is listed in Raspberry Pi
  119. # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
  120. INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
  121. # if need to erase NVS partition in start app
  122. ERASE_NVS = True
  123. RECV_THREAD_CLS = IDFRecvThread
  124. TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
  125. def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
  126. super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
  127. self.allow_dut_exception = allow_dut_exception
  128. self.exceptions = _queue.Queue()
  129. self.performance_items = _queue.Queue()
  130. @classmethod
  131. def get_mac(cls, port):
  132. """
  133. get MAC address via esptool
  134. :param port: serial port as string
  135. :return: MAC address or None
  136. """
  137. esp = None
  138. try:
  139. esp = esptool.ESP32ROM(port)
  140. esp.connect()
  141. return esp.read_mac()
  142. except RuntimeError:
  143. return None
  144. finally:
  145. if esp:
  146. # do hard reset after use esptool
  147. esp.hard_reset()
  148. esp._port.close()
  149. @classmethod
  150. def confirm_dut(cls, port, **kwargs):
  151. return cls.get_mac(port) is not None
  152. @_uses_esptool
  153. def _try_flash(self, esp, erase_nvs, baud_rate):
  154. """
  155. Called by start_app() to try flashing at a particular baud rate.
  156. Structured this way so @_uses_esptool will reconnect each time
  157. """
  158. try:
  159. # note: opening here prevents us from having to seek back to 0 each time
  160. flash_files = [(offs, open(path, "rb")) for (offs, path) in self.app.flash_files]
  161. if erase_nvs:
  162. address = self.app.partition_table["nvs"]["offset"]
  163. size = self.app.partition_table["nvs"]["size"]
  164. nvs_file = tempfile.TemporaryFile()
  165. nvs_file.write(b'\xff' * size)
  166. nvs_file.seek(0)
  167. flash_files.append((int(address, 0), nvs_file))
  168. # fake flasher args object, this is a hack until
  169. # esptool Python API is improved
  170. class FlashArgs(object):
  171. def __init__(self, attributes):
  172. for key, value in attributes.items():
  173. self.__setattr__(key, value)
  174. flash_args = FlashArgs({
  175. 'flash_size': self.app.flash_settings["flash_size"],
  176. 'flash_mode': self.app.flash_settings["flash_mode"],
  177. 'flash_freq': self.app.flash_settings["flash_freq"],
  178. 'addr_filename': flash_files,
  179. 'no_stub': False,
  180. 'compress': True,
  181. 'verify': False,
  182. 'encrypt': self.app.flash_settings.get("encrypt", False),
  183. 'erase_all': False,
  184. })
  185. esp.change_baud(baud_rate)
  186. esptool.detect_flash_size(esp, flash_args)
  187. esptool.write_flash(esp, flash_args)
  188. finally:
  189. for (_, f) in flash_files:
  190. f.close()
  191. def start_app(self, erase_nvs=ERASE_NVS):
  192. """
  193. download and start app.
  194. :param: erase_nvs: whether erase NVS partition during flash
  195. :return: None
  196. """
  197. for baud_rate in [921600, 115200]:
  198. try:
  199. self._try_flash(erase_nvs, baud_rate)
  200. break
  201. except RuntimeError:
  202. continue
  203. else:
  204. raise IDFToolError()
  205. @_uses_esptool
  206. def reset(self, esp):
  207. """
  208. hard reset DUT
  209. :return: None
  210. """
  211. # decorator `_use_esptool` will do reset
  212. # so we don't need to do anything in this method
  213. pass
  214. @_uses_esptool
  215. def erase_partition(self, esp, partition):
  216. """
  217. :param partition: partition name to erase
  218. :return: None
  219. """
  220. raise NotImplementedError() # TODO: implement this
  221. # address = self.app.partition_table[partition]["offset"]
  222. size = self.app.partition_table[partition]["size"]
  223. # TODO can use esp.erase_region() instead of this, I think
  224. with open(".erase_partition.tmp", "wb") as f:
  225. f.write(chr(0xFF) * size)
  226. @_uses_esptool
  227. def dump_flush(self, esp, output_file, **kwargs):
  228. """
  229. dump flush
  230. :param output_file: output file name, if relative path, will use sdk path as base path.
  231. :keyword partition: partition name, dump the partition.
  232. ``partition`` is preferred than using ``address`` and ``size``.
  233. :keyword address: dump from address (need to be used with size)
  234. :keyword size: dump size (need to be used with address)
  235. :return: None
  236. """
  237. if os.path.isabs(output_file) is False:
  238. output_file = os.path.relpath(output_file, self.app.get_log_folder())
  239. if "partition" in kwargs:
  240. partition = self.app.partition_table[kwargs["partition"]]
  241. _address = partition["offset"]
  242. _size = partition["size"]
  243. elif "address" in kwargs and "size" in kwargs:
  244. _address = kwargs["address"]
  245. _size = kwargs["size"]
  246. else:
  247. raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
  248. content = esp.read_flash(_address, _size)
  249. with open(output_file, "wb") as f:
  250. f.write(content)
  251. @classmethod
  252. def list_available_ports(cls):
  253. ports = [x.device for x in list_ports.comports()]
  254. espport = os.getenv('ESPPORT')
  255. if not espport:
  256. # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`:
  257. # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`.
  258. # This means we need to make all 3 conditions fail, to filter out the port.
  259. # So some part of the filters will not be straight forward to users.
  260. # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand.
  261. # Filter out invalid port by our own will be much simpler.
  262. return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)]
  263. # On MacOs with python3.6: type of espport is already utf8
  264. if isinstance(espport, type(u'')):
  265. port_hint = espport
  266. else:
  267. port_hint = espport.decode('utf8')
  268. # If $ESPPORT is a valid port, make it appear first in the list
  269. if port_hint in ports:
  270. ports.remove(port_hint)
  271. return [port_hint] + ports
  272. # On macOS, user may set ESPPORT to /dev/tty.xxx while
  273. # pySerial lists only the corresponding /dev/cu.xxx port
  274. if sys.platform == 'darwin' and 'tty.' in port_hint:
  275. port_hint = port_hint.replace('tty.', 'cu.')
  276. if port_hint in ports:
  277. ports.remove(port_hint)
  278. return [port_hint] + ports
  279. return ports
  280. def lookup_pc_address(self, pc_addr):
  281. cmd = ["%saddr2line" % self.TOOLCHAIN_PREFIX,
  282. "-pfiaC", "-e", self.app.elf_file, pc_addr]
  283. ret = ""
  284. try:
  285. translation = subprocess.check_output(cmd)
  286. ret = translation.decode()
  287. except OSError:
  288. pass
  289. return ret
  290. @staticmethod
  291. def _queue_read_all(source_queue):
  292. output = []
  293. while True:
  294. try:
  295. output.append(source_queue.get(timeout=0))
  296. except _queue.Empty:
  297. break
  298. return output
  299. def _queue_copy(self, source_queue, dest_queue):
  300. data = self._queue_read_all(source_queue)
  301. for d in data:
  302. dest_queue.put(d)
  303. def _get_from_queue(self, queue_name):
  304. self_queue = getattr(self, queue_name)
  305. if self.receive_thread:
  306. recv_thread_queue = getattr(self.receive_thread, queue_name)
  307. self._queue_copy(recv_thread_queue, self_queue)
  308. return self._queue_read_all(self_queue)
  309. def stop_receive(self):
  310. if self.receive_thread:
  311. for name in ["performance_items", "exceptions"]:
  312. source_queue = getattr(self.receive_thread, name)
  313. dest_queue = getattr(self, name)
  314. self._queue_copy(source_queue, dest_queue)
  315. super(IDFDUT, self).stop_receive()
  316. def get_exceptions(self):
  317. """ Get exceptions detected by DUT receive thread. """
  318. return self._get_from_queue("exceptions")
  319. def get_performance_items(self):
  320. """
  321. DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``.
  322. This method is used to get all performance results.
  323. :return: a list of performance items.
  324. """
  325. return self._get_from_queue("performance_items")
  326. def close(self):
  327. super(IDFDUT, self).close()
  328. if not self.allow_dut_exception and self.get_exceptions():
  329. Utility.console_log("DUT exception detected on {}".format(self), color="red")
  330. raise IDFDUTException()