otatool.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #!/usr/bin/env python
  2. #
  3. # otatool is used to perform ota-level operations - flashing ota partition
  4. # erasing ota partition and switching ota partition
  5. #
  6. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  7. #
  8. # Licensed under the Apache License, Version 2.0 (the "License");
  9. # you may not use this file except in compliance with the License.
  10. # You may obtain a copy of the License at
  11. #
  12. # http:#www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS,
  16. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. # See the License for the specific language governing permissions and
  18. # limitations under the License.
  19. from __future__ import print_function, division
  20. import argparse
  21. import os
  22. import sys
  23. import binascii
  24. import subprocess
  25. import tempfile
  26. import collections
  27. import struct
  28. __version__ = '1.0'
  29. IDF_COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
  30. PARTTOOL_PY = os.path.join(IDF_COMPONENTS_PATH, "partition_table", "parttool.py")
  31. SPI_FLASH_SEC_SIZE = 0x2000
  32. quiet = False
  33. def status(msg):
  34. if not quiet:
  35. print(msg)
  36. def _invoke_parttool(parttool_args, args, output=False, partition=None):
  37. invoke_args = []
  38. if partition:
  39. invoke_args += [sys.executable, PARTTOOL_PY] + partition
  40. else:
  41. invoke_args += [sys.executable, PARTTOOL_PY, "--partition-type", "data", "--partition-subtype", "ota"]
  42. if quiet:
  43. invoke_args += ["-q"]
  44. if args.port != "":
  45. invoke_args += ["--port", args.port]
  46. if args.partition_table_file:
  47. invoke_args += ["--partition-table-file", args.partition_table_file]
  48. if args.partition_table_offset:
  49. invoke_args += ["--partition-table-offset", args.partition_table_offset]
  50. invoke_args += parttool_args
  51. if output:
  52. return subprocess.check_output(invoke_args)
  53. else:
  54. return subprocess.check_call(invoke_args)
  55. def _get_otadata_contents(args, check=True):
  56. global quiet
  57. if check:
  58. check_args = ["get_partition_info", "--info", "offset", "size"]
  59. quiet = True
  60. output = _invoke_parttool(check_args, args, True).split(b" ")
  61. quiet = args.quiet
  62. if not output:
  63. raise RuntimeError("No ota_data partition found")
  64. with tempfile.NamedTemporaryFile(delete=False) as f:
  65. f_name = f.name
  66. try:
  67. invoke_args = ["read_partition", "--output", f_name]
  68. _invoke_parttool(invoke_args, args)
  69. with open(f_name, "rb") as f:
  70. contents = f.read()
  71. finally:
  72. os.unlink(f_name)
  73. return contents
  74. def _get_otadata_status(otadata_contents):
  75. status = []
  76. otadata_status = collections.namedtuple("otadata_status", "seq crc")
  77. for i in range(2):
  78. start = i * (SPI_FLASH_SEC_SIZE >> 1)
  79. seq = bytearray(otadata_contents[start:start + 4])
  80. crc = bytearray(otadata_contents[start + 28:start + 32])
  81. seq = struct.unpack('>I', seq)
  82. crc = struct.unpack('>I', crc)
  83. status.append(otadata_status(seq[0], crc[0]))
  84. return status
  85. def read_otadata(args):
  86. status("Reading ota_data partition contents...")
  87. otadata_info = _get_otadata_contents(args)
  88. otadata_info = _get_otadata_status(otadata_info)
  89. print(otadata_info)
  90. print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
  91. print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
  92. otadata_info[1].seq, otadata_info[1].crc))
  93. def erase_otadata(args):
  94. status("Erasing ota_data partition contents...")
  95. _invoke_parttool(["erase_partition"], args)
  96. status("Erased ota_data partition contents")
  97. def switch_otadata(args):
  98. sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
  99. import gen_esp32part as gen
  100. with tempfile.NamedTemporaryFile(delete=False) as f:
  101. f_name = f.name
  102. try:
  103. def is_otadata_status_valid(status):
  104. seq = status.seq % (1 << 32)
  105. crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
  106. return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
  107. status("Looking for ota app partitions...")
  108. # In order to get the number of ota app partitions, we need the partition table
  109. partition_table = None
  110. invoke_args = ["get_partition_info", "--table", f_name]
  111. _invoke_parttool(invoke_args, args)
  112. partition_table = open(f_name, "rb").read()
  113. partition_table = gen.PartitionTable.from_binary(partition_table)
  114. ota_partitions = list()
  115. for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
  116. ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
  117. try:
  118. ota_partitions.append(list(ota_partition)[0])
  119. except IndexError:
  120. break
  121. ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
  122. if not ota_partitions:
  123. raise RuntimeError("No ota app partitions found")
  124. status("Verifying partition to switch to exists...")
  125. # Look for the app partition to switch to
  126. ota_partition_next = None
  127. try:
  128. if args.name:
  129. ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
  130. else:
  131. ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
  132. ota_partition_next = list(ota_partition_next)[0]
  133. except IndexError:
  134. raise RuntimeError("Partition to switch to not found")
  135. otadata_contents = _get_otadata_contents(args)
  136. otadata_status = _get_otadata_status(otadata_contents)
  137. # Find the copy to base the computation for ota sequence number on
  138. otadata_compute_base = -1
  139. # Both are valid, take the max as computation base
  140. if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
  141. if otadata_status[0].seq >= otadata_status[1].seq:
  142. otadata_compute_base = 0
  143. else:
  144. otadata_compute_base = 1
  145. # Only one copy is valid, use that
  146. elif is_otadata_status_valid(otadata_status[0]):
  147. otadata_compute_base = 0
  148. elif is_otadata_status_valid(otadata_status[1]):
  149. otadata_compute_base = 1
  150. # Both are invalid (could be initial state - all 0xFF's)
  151. else:
  152. pass
  153. ota_seq_next = 0
  154. ota_partitions_num = len(ota_partitions)
  155. target_seq = (ota_partition_next.subtype & 0x0F) + 1
  156. # Find the next ota sequence number
  157. if otadata_compute_base == 0 or otadata_compute_base == 1:
  158. base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
  159. i = 0
  160. while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
  161. i += 1
  162. ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
  163. else:
  164. ota_seq_next = target_seq
  165. # Create binary data from computed values
  166. ota_seq_next = struct.pack("I", ota_seq_next)
  167. ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
  168. ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
  169. with open(f_name, "wb") as otadata_next_file:
  170. start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
  171. otadata_next_file.write(otadata_contents)
  172. otadata_next_file.seek(start)
  173. otadata_next_file.write(ota_seq_next)
  174. otadata_next_file.seek(start + 28)
  175. otadata_next_file.write(ota_seq_crc_next)
  176. otadata_next_file.flush()
  177. _invoke_parttool(["write_partition", "--input", f_name], args)
  178. status("Updated ota_data partition")
  179. finally:
  180. os.unlink(f_name)
  181. def _get_partition_specifier(args):
  182. if args.name:
  183. return ["--partition-name", args.name]
  184. else:
  185. return ["--partition-type", "app", "--partition-subtype", "ota_" + str(args.slot)]
  186. def read_ota_partition(args):
  187. invoke_args = ["read_partition", "--output", args.output]
  188. _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
  189. status("Read ota partition contents to file {}".format(args.output))
  190. def write_ota_partition(args):
  191. invoke_args = ["write_partition", "--input", args.input]
  192. _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
  193. status("Written contents of file {} to ota partition".format(args.input))
  194. def erase_ota_partition(args):
  195. invoke_args = ["erase_partition"]
  196. _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
  197. status("Erased contents of ota partition")
  198. def main():
  199. if sys.version_info[0] < 3:
  200. print("WARNING: Support for Python 2 is deprecated and will be removed in future versions.")
  201. elif sys.version_info[0] == 3 and sys.version_info[1] < 6:
  202. print("WARNING: Python 3 versions older than 3.6 are not supported.")
  203. global quiet
  204. parser = argparse.ArgumentParser("ESP-IDF OTA Partitions Tool")
  205. parser.add_argument("--quiet", "-q", help="suppress stderr messages", action="store_true")
  206. # There are two possible sources for the partition table: a device attached to the host
  207. # or a partition table CSV/binary file. These sources are mutually exclusive.
  208. partition_table_info_source_args = parser.add_mutually_exclusive_group()
  209. partition_table_info_source_args.add_argument("--port", "-p", help="port where the device to read the partition table from is attached", default="")
  210. partition_table_info_source_args.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from", default="")
  211. parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", default="0x8000")
  212. subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help")
  213. # Specify the supported operations
  214. subparsers.add_parser("read_otadata", help="read otadata partition")
  215. subparsers.add_parser("erase_otadata", help="erase otadata partition")
  216. slot_or_name_parser = argparse.ArgumentParser(add_help=False)
  217. slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
  218. slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int)
  219. slot_or_name_parser_args.add_argument("--name", help="name of the ota partition")
  220. subparsers.add_parser("switch_otadata", help="switch otadata partition", parents=[slot_or_name_parser])
  221. read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser])
  222. read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to")
  223. write_ota_partition_subparser = subparsers.add_parser("write_ota_partition", help="write contents to an ota partition", parents=[slot_or_name_parser])
  224. write_ota_partition_subparser.add_argument("--input", help="file whose contents to write to the ota partition")
  225. subparsers.add_parser("erase_ota_partition", help="erase contents of an ota partition", parents=[slot_or_name_parser])
  226. args = parser.parse_args()
  227. quiet = args.quiet
  228. # No operation specified, display help and exit
  229. if args.operation is None:
  230. if not quiet:
  231. parser.print_help()
  232. sys.exit(1)
  233. # Else execute the operation
  234. operation_func = globals()[args.operation]
  235. if quiet:
  236. # If exceptions occur, suppress and exit quietly
  237. try:
  238. operation_func(args)
  239. except Exception:
  240. sys.exit(2)
  241. else:
  242. operation_func(args)
  243. if __name__ == '__main__':
  244. main()