IDFDUT.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http:#www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ DUT for IDF applications """
  15. import os
  16. import sys
  17. import re
  18. import subprocess
  19. import functools
  20. import random
  21. import tempfile
  22. from serial.tools import list_ports
  23. import DUT
  24. class IDFToolError(OSError):
  25. pass
  26. def _tool_method(func):
  27. """ close port, execute tool method and then reopen port """
  28. @functools.wraps(func)
  29. def handler(self, *args, **kwargs):
  30. self.close()
  31. ret = func(self, *args, **kwargs)
  32. self.open()
  33. return ret
  34. return handler
  35. class IDFDUT(DUT.SerialDUT):
  36. """ IDF DUT, extends serial with ESPTool methods """
  37. CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
  38. # /dev/ttyAMA0 port is listed in Raspberry Pi
  39. # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
  40. INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
  41. # if need to erase NVS partition in start app
  42. ERASE_NVS = True
  43. def __init__(self, name, port, log_file, app, **kwargs):
  44. self.download_config, self.partition_table = app.process_app_info()
  45. super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
  46. @classmethod
  47. def get_chip(cls, app, port):
  48. """
  49. get chip id via esptool
  50. :param app: application instance (to get tool)
  51. :param port: comport
  52. :return: chip ID or None
  53. """
  54. try:
  55. output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"])
  56. except subprocess.CalledProcessError:
  57. output = bytes()
  58. if isinstance(output, bytes):
  59. output = output.decode()
  60. chip_type = cls.CHIP_TYPE_PATTERN.search(output)
  61. return chip_type.group(1) if chip_type else None
  62. @classmethod
  63. def confirm_dut(cls, port, app, **kwargs):
  64. return cls.get_chip(app, port) is not None
  65. @_tool_method
  66. def start_app(self, erase_nvs=ERASE_NVS):
  67. """
  68. download and start app.
  69. :param: erase_nvs: whether erase NVS partition during flash
  70. :return: None
  71. """
  72. if erase_nvs:
  73. address = self.partition_table["nvs"]["offset"]
  74. size = self.partition_table["nvs"]["size"]
  75. nvs_file = tempfile.NamedTemporaryFile()
  76. nvs_file.write(chr(0xFF) * size)
  77. nvs_file.flush()
  78. download_config = self.download_config + [address, nvs_file.name]
  79. else:
  80. download_config = self.download_config
  81. retry_baud_rates = ["921600", "115200"]
  82. error = IDFToolError()
  83. try:
  84. for baud_rate in retry_baud_rates:
  85. try:
  86. subprocess.check_output(["python", self.app.esptool,
  87. "--port", self.port, "--baud", baud_rate]
  88. + download_config)
  89. break
  90. except subprocess.CalledProcessError as error:
  91. continue
  92. else:
  93. raise error
  94. finally:
  95. if erase_nvs:
  96. nvs_file.close()
  97. @_tool_method
  98. def reset(self):
  99. """
  100. reset DUT with esptool
  101. :return: None
  102. """
  103. subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"])
  104. @_tool_method
  105. def erase_partition(self, partition):
  106. """
  107. :param partition: partition name to erase
  108. :return: None
  109. """
  110. address = self.partition_table[partition]["offset"]
  111. size = self.partition_table[partition]["size"]
  112. with open(".erase_partition.tmp", "wb") as f:
  113. f.write(chr(0xFF) * size)
  114. @_tool_method
  115. def dump_flush(self, output_file, **kwargs):
  116. """
  117. dump flush
  118. :param output_file: output file name, if relative path, will use sdk path as base path.
  119. :keyword partition: partition name, dump the partition.
  120. ``partition`` is preferred than using ``address`` and ``size``.
  121. :keyword address: dump from address (need to be used with size)
  122. :keyword size: dump size (need to be used with address)
  123. :return: None
  124. """
  125. if os.path.isabs(output_file) is False:
  126. output_file = os.path.relpath(output_file, self.app.get_log_folder())
  127. if "partition" in kwargs:
  128. partition = self.partition_table[kwargs["partition"]]
  129. _address = partition["offset"]
  130. _size = partition["size"]
  131. elif "address" in kwargs and "size" in kwargs:
  132. _address = kwargs["address"]
  133. _size = kwargs["size"]
  134. else:
  135. raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
  136. subprocess.check_output(
  137. ["python", self.app.esptool, "--port", self.port, "--baud", "921600",
  138. "--before", "default_reset", "--after", "hard_reset", "read_flash",
  139. _address, _size, output_file]
  140. )
  141. @classmethod
  142. def list_available_ports(cls):
  143. ports = [x.device for x in list_ports.comports()]
  144. espport = os.getenv('ESPPORT')
  145. if not espport:
  146. # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`:
  147. # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`.
  148. # This means we need to make all 3 conditions fail, to filter out the port.
  149. # So some part of the filters will not be straight forward to users.
  150. # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand.
  151. # Filter out invalid port by our own will be much simpler.
  152. return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)]
  153. port_hint = espport.decode('utf8')
  154. # If $ESPPORT is a valid port, make it appear first in the list
  155. if port_hint in ports:
  156. ports.remove(port_hint)
  157. return [port_hint] + ports
  158. # On macOS, user may set ESPPORT to /dev/tty.xxx while
  159. # pySerial lists only the corresponding /dev/cu.xxx port
  160. if sys.platform == 'darwin' and 'tty.' in port_hint:
  161. port_hint = port_hint.replace('tty.', 'cu.')
  162. if port_hint in ports:
  163. ports.remove(port_hint)
  164. return [port_hint] + ports
  165. return ports