otatool.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. #!/usr/bin/env python
  2. #
  3. # otatool is used to perform ota-level operations - flashing ota partition
  4. # erasing ota partition and switching ota partition
  5. #
  6. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  7. #
  8. # Licensed under the Apache License, Version 2.0 (the "License");
  9. # you may not use this file except in compliance with the License.
  10. # You may obtain a copy of the License at
  11. #
  12. # http:#www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS,
  16. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. # See the License for the specific language governing permissions and
  18. # limitations under the License.
  19. from __future__ import print_function, division
  20. import argparse
  21. import os
  22. import sys
  23. import binascii
  24. import tempfile
  25. import collections
  26. import struct
  27. try:
  28. from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET
  29. except ImportError:
  30. COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
  31. PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, "partition_table")
  32. sys.path.append(PARTTOOL_DIR)
  33. from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET
  34. __version__ = '2.0'
  35. SPI_FLASH_SEC_SIZE = 0x2000
  36. quiet = False
  37. def status(msg):
  38. if not quiet:
  39. print(msg)
  40. class OtatoolTarget():
  41. OTADATA_PARTITION = PartitionType("data", "ota")
  42. def __init__(self, port=None, baud=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None,
  43. spi_flash_sec_size=SPI_FLASH_SEC_SIZE, esptool_args=[], esptool_write_args=[],
  44. esptool_read_args=[], esptool_erase_args=[]):
  45. self.target = ParttoolTarget(port, baud, partition_table_offset, partition_table_file, esptool_args,
  46. esptool_write_args, esptool_read_args, esptool_erase_args)
  47. self.spi_flash_sec_size = spi_flash_sec_size
  48. temp_file = tempfile.NamedTemporaryFile(delete=False)
  49. temp_file.close()
  50. try:
  51. self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
  52. with open(temp_file.name, "rb") as f:
  53. self.otadata = f.read()
  54. except Exception:
  55. self.otadata = None
  56. finally:
  57. os.unlink(temp_file.name)
  58. def _check_otadata_partition(self):
  59. if not self.otadata:
  60. raise Exception("No otadata partition found")
  61. def erase_otadata(self):
  62. self._check_otadata_partition()
  63. self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
  64. def _get_otadata_info(self):
  65. info = []
  66. otadata_info = collections.namedtuple("otadata_info", "seq crc")
  67. for i in range(2):
  68. start = i * (self.spi_flash_sec_size >> 1)
  69. seq = bytearray(self.otadata[start:start + 4])
  70. crc = bytearray(self.otadata[start + 28:start + 32])
  71. seq = struct.unpack('I', seq)
  72. crc = struct.unpack('I', crc)
  73. info.append(otadata_info(seq[0], crc[0]))
  74. return info
  75. def _get_partition_id_from_ota_id(self, ota_id):
  76. if isinstance(ota_id, int):
  77. return PartitionType("app", "ota_" + str(ota_id))
  78. else:
  79. return PartitionName(ota_id)
  80. def switch_ota_partition(self, ota_id):
  81. self._check_otadata_partition()
  82. sys.path.append(PARTTOOL_DIR)
  83. import gen_esp32part as gen
  84. def is_otadata_info_valid(status):
  85. seq = status.seq % (1 << 32)
  86. crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32)
  87. return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
  88. partition_table = self.target.partition_table
  89. ota_partitions = list()
  90. for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
  91. ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
  92. try:
  93. ota_partitions.append(list(ota_partition)[0])
  94. except IndexError:
  95. break
  96. ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
  97. if not ota_partitions:
  98. raise Exception("No ota app partitions found")
  99. # Look for the app partition to switch to
  100. ota_partition_next = None
  101. try:
  102. if isinstance(ota_id, int):
  103. ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions)
  104. else:
  105. ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
  106. ota_partition_next = list(ota_partition_next)[0]
  107. except IndexError:
  108. raise Exception("Partition to switch to not found")
  109. otadata_info = self._get_otadata_info()
  110. # Find the copy to base the computation for ota sequence number on
  111. otadata_compute_base = -1
  112. # Both are valid, take the max as computation base
  113. if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
  114. if otadata_info[0].seq >= otadata_info[1].seq:
  115. otadata_compute_base = 0
  116. else:
  117. otadata_compute_base = 1
  118. # Only one copy is valid, use that
  119. elif is_otadata_info_valid(otadata_info[0]):
  120. otadata_compute_base = 0
  121. elif is_otadata_info_valid(otadata_info[1]):
  122. otadata_compute_base = 1
  123. # Both are invalid (could be initial state - all 0xFF's)
  124. else:
  125. pass
  126. ota_seq_next = 0
  127. ota_partitions_num = len(ota_partitions)
  128. target_seq = (ota_partition_next.subtype & 0x0F) + 1
  129. # Find the next ota sequence number
  130. if otadata_compute_base == 0 or otadata_compute_base == 1:
  131. base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
  132. i = 0
  133. while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
  134. i += 1
  135. ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
  136. else:
  137. ota_seq_next = target_seq
  138. # Create binary data from computed values
  139. ota_seq_next = struct.pack("I", ota_seq_next)
  140. ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
  141. ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
  142. temp_file = tempfile.NamedTemporaryFile(delete=False)
  143. temp_file.close()
  144. try:
  145. with open(temp_file.name, "wb") as otadata_next_file:
  146. start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
  147. otadata_next_file.write(self.otadata)
  148. otadata_next_file.seek(start)
  149. otadata_next_file.write(ota_seq_next)
  150. otadata_next_file.seek(start + 28)
  151. otadata_next_file.write(ota_seq_crc_next)
  152. otadata_next_file.flush()
  153. self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
  154. finally:
  155. os.unlink(temp_file.name)
  156. def read_ota_partition(self, ota_id, output):
  157. self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
  158. def write_ota_partition(self, ota_id, input):
  159. self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input)
  160. def erase_ota_partition(self, ota_id):
  161. self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
  162. def _read_otadata(target):
  163. target._check_otadata_partition()
  164. otadata_info = target._get_otadata_info()
  165. print(' {:8s} \t {:8s} | \t {:8s} \t {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC'))
  166. print('Firmware: 0x{:08x} \t0x{:08x} | \t0x{:08x} \t 0x{:08x}'.format(otadata_info[0].seq, otadata_info[0].crc,
  167. otadata_info[1].seq, otadata_info[1].crc))
  168. def _erase_otadata(target):
  169. target.erase_otadata()
  170. status("Erased ota_data partition contents")
  171. def _switch_ota_partition(target, ota_id):
  172. target.switch_ota_partition(ota_id)
  173. def _read_ota_partition(target, ota_id, output):
  174. target.read_ota_partition(ota_id, output)
  175. status("Read ota partition contents to file {}".format(output))
  176. def _write_ota_partition(target, ota_id, input):
  177. target.write_ota_partition(ota_id, input)
  178. status("Written contents of file {} to ota partition".format(input))
  179. def _erase_ota_partition(target, ota_id):
  180. target.erase_ota_partition(ota_id)
  181. status("Erased contents of ota partition")
  182. def main():
  183. if sys.version_info[0] < 3:
  184. print("WARNING: Support for Python 2 is deprecated and will be removed in future versions.", file=sys.stderr)
  185. elif sys.version_info[0] == 3 and sys.version_info[1] < 6:
  186. print("WARNING: Python 3 versions older than 3.6 are not supported.", file=sys.stderr)
  187. global quiet
  188. parser = argparse.ArgumentParser("ESP-IDF OTA Partitions Tool")
  189. parser.add_argument("--quiet", "-q", help="suppress stderr messages", action="store_true")
  190. parser.add_argument("--esptool-args", help="additional main arguments for esptool", nargs="+")
  191. parser.add_argument("--esptool-write-args", help="additional subcommand arguments for esptool write_flash", nargs="+")
  192. parser.add_argument("--esptool-read-args", help="additional subcommand arguments for esptool read_flash", nargs="+")
  193. parser.add_argument("--esptool-erase-args", help="additional subcommand arguments for esptool erase_region", nargs="+")
  194. # There are two possible sources for the partition table: a device attached to the host
  195. # or a partition table CSV/binary file. These sources are mutually exclusive.
  196. parser.add_argument("--port", "-p", help="port where the device to read the partition table from is attached")
  197. parser.add_argument("--baud", "-b", help="baudrate to use", type=int)
  198. parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", type=str)
  199. parser.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from; \
  200. overrides device attached to specified port as the partition table source when defined")
  201. subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help")
  202. spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
  203. spi_flash_sec_size.add_argument("--spi-flash-sec-size", help="value of SPI_FLASH_SEC_SIZE macro", type=str)
  204. # Specify the supported operations
  205. subparsers.add_parser("read_otadata", help="read otadata partition", parents=[spi_flash_sec_size])
  206. subparsers.add_parser("erase_otadata", help="erase otadata partition")
  207. slot_or_name_parser = argparse.ArgumentParser(add_help=False)
  208. slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
  209. slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int)
  210. slot_or_name_parser_args.add_argument("--name", help="name of the ota partition")
  211. subparsers.add_parser("switch_ota_partition", help="switch otadata partition", parents=[slot_or_name_parser, spi_flash_sec_size])
  212. read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser])
  213. read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to")
  214. write_ota_partition_subparser = subparsers.add_parser("write_ota_partition", help="write contents to an ota partition", parents=[slot_or_name_parser])
  215. write_ota_partition_subparser.add_argument("--input", help="file whose contents to write to the ota partition")
  216. subparsers.add_parser("erase_ota_partition", help="erase contents of an ota partition", parents=[slot_or_name_parser])
  217. args = parser.parse_args()
  218. quiet = args.quiet
  219. # No operation specified, display help and exit
  220. if args.operation is None:
  221. if not quiet:
  222. parser.print_help()
  223. sys.exit(1)
  224. target_args = {}
  225. if args.port:
  226. target_args["port"] = args.port
  227. if args.partition_table_file:
  228. target_args["partition_table_file"] = args.partition_table_file
  229. if args.partition_table_offset:
  230. target_args["partition_table_offset"] = int(args.partition_table_offset, 0)
  231. try:
  232. if args.spi_flash_sec_size:
  233. target_args["spi_flash_sec_size"] = int(args.spi_flash_sec_size, 0)
  234. except AttributeError:
  235. pass
  236. if args.esptool_args:
  237. target_args["esptool_args"] = args.esptool_args
  238. if args.esptool_write_args:
  239. target_args["esptool_write_args"] = args.esptool_write_args
  240. if args.esptool_read_args:
  241. target_args["esptool_read_args"] = args.esptool_read_args
  242. if args.esptool_erase_args:
  243. target_args["esptool_erase_args"] = args.esptool_erase_args
  244. if args.baud:
  245. target_args["baud"] = args.baud
  246. target = OtatoolTarget(**target_args)
  247. # Create the operation table and execute the operation
  248. common_args = {'target':target}
  249. ota_id = []
  250. try:
  251. if args.name is not None:
  252. ota_id = ["name"]
  253. else:
  254. if args.slot is not None:
  255. ota_id = ["slot"]
  256. except AttributeError:
  257. pass
  258. otatool_ops = {
  259. 'read_otadata':(_read_otadata, []),
  260. 'erase_otadata':(_erase_otadata, []),
  261. 'switch_ota_partition':(_switch_ota_partition, ota_id),
  262. 'read_ota_partition':(_read_ota_partition, ["output"] + ota_id),
  263. 'write_ota_partition':(_write_ota_partition, ["input"] + ota_id),
  264. 'erase_ota_partition':(_erase_ota_partition, ota_id)
  265. }
  266. (op, op_args) = otatool_ops[args.operation]
  267. for op_arg in op_args:
  268. common_args.update({op_arg:vars(args)[op_arg]})
  269. try:
  270. common_args['ota_id'] = common_args.pop('name')
  271. except KeyError:
  272. try:
  273. common_args['ota_id'] = common_args.pop('slot')
  274. except KeyError:
  275. pass
  276. if quiet:
  277. # If exceptions occur, suppress and exit quietly
  278. try:
  279. op(**common_args)
  280. except Exception:
  281. sys.exit(2)
  282. else:
  283. op(**common_args)
  284. if __name__ == '__main__':
  285. main()