otatool.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. #!/usr/bin/env python
  2. #
  3. # otatool is used to perform ota-level operations - flashing ota partition
  4. # erasing ota partition and switching ota partition
  5. #
  6. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  7. #
  8. # Licensed under the Apache License, Version 2.0 (the "License");
  9. # you may not use this file except in compliance with the License.
  10. # You may obtain a copy of the License at
  11. #
  12. # http:#www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS,
  16. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. # See the License for the specific language governing permissions and
  18. # limitations under the License.
  19. from __future__ import print_function, division
  20. import argparse
  21. import os
  22. import sys
  23. import binascii
  24. import subprocess
  25. import tempfile
  26. import collections
  27. import struct
  28. __version__ = '1.0'
  29. IDF_COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
  30. PARTTOOL_PY = os.path.join(IDF_COMPONENTS_PATH, "partition_table", "parttool.py")
  31. SPI_FLASH_SEC_SIZE = 0x2000
  32. quiet = False
  33. def status(msg):
  34. if not quiet:
  35. print(msg)
  36. def _invoke_parttool(parttool_args, args, output=False, partition=None):
  37. invoke_args = []
  38. if partition:
  39. invoke_args += [sys.executable, PARTTOOL_PY] + partition
  40. else:
  41. invoke_args += [sys.executable, PARTTOOL_PY, "--partition-type", "data", "--partition-subtype", "ota"]
  42. if quiet:
  43. invoke_args += ["-q"]
  44. if args.port != "":
  45. invoke_args += ["--port", args.port]
  46. if args.partition_table_file:
  47. invoke_args += ["--partition-table-file", args.partition_table_file]
  48. if args.partition_table_offset:
  49. invoke_args += ["--partition-table-offset", args.partition_table_offset]
  50. invoke_args += parttool_args
  51. if output:
  52. return subprocess.check_output(invoke_args)
  53. else:
  54. return subprocess.check_call(invoke_args)
  55. def _get_otadata_contents(args, check=True):
  56. global quiet
  57. if check:
  58. check_args = ["get_partition_info", "--info", "offset", "size"]
  59. quiet = True
  60. output = _invoke_parttool(check_args, args, True).split(b" ")
  61. quiet = args.quiet
  62. if not output:
  63. raise RuntimeError("No ota_data partition found")
  64. with tempfile.NamedTemporaryFile() as otadata_file:
  65. invoke_args = ["read_partition", "--output", otadata_file.name]
  66. _invoke_parttool(invoke_args, args)
  67. return otadata_file.read()
  68. def _get_otadata_status(otadata_contents):
  69. status = []
  70. otadata_status = collections.namedtuple("otadata_status", "seq crc")
  71. for i in range(2):
  72. start = i * (SPI_FLASH_SEC_SIZE >> 1)
  73. seq = bytearray(otadata_contents[start:start + 4])
  74. crc = bytearray(otadata_contents[start + 28:start + 32])
  75. seq = struct.unpack('>I', seq)
  76. crc = struct.unpack('>I', crc)
  77. status.append(otadata_status(seq[0], crc[0]))
  78. return status
  79. def read_otadata(args):
  80. status("Reading ota_data partition contents...")
  81. otadata_info = _get_otadata_contents(args)
  82. otadata_info = _get_otadata_status(otadata_info)
  83. print(otadata_info)
  84. print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
  85. print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
  86. otadata_info[1].seq, otadata_info[1].crc))
  87. def erase_otadata(args):
  88. status("Erasing ota_data partition contents...")
  89. _invoke_parttool(["erase_partition"], args)
  90. status("Erased ota_data partition contents")
  91. def switch_otadata(args):
  92. sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
  93. import gen_esp32part as gen
  94. def is_otadata_status_valid(status):
  95. seq = status.seq % (1 << 32)
  96. crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
  97. return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
  98. status("Looking for ota app partitions...")
  99. # In order to get the number of ota app partitions, we need the partition table
  100. partition_table = None
  101. with tempfile.NamedTemporaryFile() as partition_table_file:
  102. invoke_args = ["get_partition_info", "--table", partition_table_file.name]
  103. _invoke_parttool(invoke_args, args)
  104. partition_table = partition_table_file.read()
  105. partition_table = gen.PartitionTable.from_binary(partition_table)
  106. ota_partitions = list()
  107. for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
  108. ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
  109. try:
  110. ota_partitions.append(list(ota_partition)[0])
  111. except IndexError:
  112. break
  113. ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
  114. if not ota_partitions:
  115. raise RuntimeError("No ota app partitions found")
  116. status("Verifying partition to switch to exists...")
  117. # Look for the app partition to switch to
  118. ota_partition_next = None
  119. try:
  120. if args.name:
  121. ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
  122. else:
  123. ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
  124. ota_partition_next = list(ota_partition_next)[0]
  125. except IndexError:
  126. raise RuntimeError("Partition to switch to not found")
  127. otadata_contents = _get_otadata_contents(args)
  128. otadata_status = _get_otadata_status(otadata_contents)
  129. # Find the copy to base the computation for ota sequence number on
  130. otadata_compute_base = -1
  131. # Both are valid, take the max as computation base
  132. if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
  133. if otadata_status[0].seq >= otadata_status[1].seq:
  134. otadata_compute_base = 0
  135. else:
  136. otadata_compute_base = 1
  137. # Only one copy is valid, use that
  138. elif is_otadata_status_valid(otadata_status[0]):
  139. otadata_compute_base = 0
  140. elif is_otadata_status_valid(otadata_status[1]):
  141. otadata_compute_base = 1
  142. # Both are invalid (could be initial state - all 0xFF's)
  143. else:
  144. pass
  145. ota_seq_next = 0
  146. ota_partitions_num = len(ota_partitions)
  147. target_seq = (ota_partition_next.subtype & 0x0F) + 1
  148. # Find the next ota sequence number
  149. if otadata_compute_base == 0 or otadata_compute_base == 1:
  150. base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
  151. i = 0
  152. while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
  153. i += 1
  154. ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
  155. else:
  156. ota_seq_next = target_seq
  157. # Create binary data from computed values
  158. ota_seq_next = struct.pack("I", ota_seq_next)
  159. ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
  160. ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
  161. with tempfile.NamedTemporaryFile() as otadata_next_file:
  162. start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
  163. otadata_next_file.write(otadata_contents)
  164. otadata_next_file.seek(start)
  165. otadata_next_file.write(ota_seq_next)
  166. otadata_next_file.seek(start + 28)
  167. otadata_next_file.write(ota_seq_crc_next)
  168. otadata_next_file.flush()
  169. _invoke_parttool(["write_partition", "--input", otadata_next_file.name], args)
  170. status("Updated ota_data partition")
  171. def _get_partition_specifier(args):
  172. if args.name:
  173. return ["--partition-name", args.name]
  174. else:
  175. return ["--partition-type", "app", "--partition-subtype", "ota_" + str(args.slot)]
  176. def read_ota_partition(args):
  177. invoke_args = ["read_partition", "--output", args.output]
  178. _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
  179. status("Read ota partition contents to file {}".format(args.output))
  180. def write_ota_partition(args):
  181. invoke_args = ["write_partition", "--input", args.input]
  182. _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
  183. status("Written contents of file {} to ota partition".format(args.input))
  184. def erase_ota_partition(args):
  185. invoke_args = ["erase_partition"]
  186. _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
  187. status("Erased contents of ota partition")
  188. def main():
  189. global quiet
  190. parser = argparse.ArgumentParser("ESP-IDF OTA Partitions Tool")
  191. parser.add_argument("--quiet", "-q", help="suppress stderr messages", action="store_true")
  192. # There are two possible sources for the partition table: a device attached to the host
  193. # or a partition table CSV/binary file. These sources are mutually exclusive.
  194. partition_table_info_source_args = parser.add_mutually_exclusive_group()
  195. partition_table_info_source_args.add_argument("--port", "-p", help="port where the device to read the partition table from is attached", default="")
  196. partition_table_info_source_args.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from", default="")
  197. parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", default="0x8000")
  198. subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help")
  199. # Specify the supported operations
  200. subparsers.add_parser("read_otadata", help="read otadata partition")
  201. subparsers.add_parser("erase_otadata", help="erase otadata partition")
  202. slot_or_name_parser = argparse.ArgumentParser(add_help=False)
  203. slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
  204. slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int)
  205. slot_or_name_parser_args.add_argument("--name", help="name of the ota partition")
  206. subparsers.add_parser("switch_otadata", help="switch otadata partition", parents=[slot_or_name_parser])
  207. read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser])
  208. read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to")
  209. write_ota_partition_subparser = subparsers.add_parser("write_ota_partition", help="write contents to an ota partition", parents=[slot_or_name_parser])
  210. write_ota_partition_subparser.add_argument("--input", help="file whose contents to write to the ota partition")
  211. subparsers.add_parser("erase_ota_partition", help="erase contents of an ota partition", parents=[slot_or_name_parser])
  212. args = parser.parse_args()
  213. quiet = args.quiet
  214. # No operation specified, display help and exit
  215. if args.operation is None:
  216. if not quiet:
  217. parser.print_help()
  218. sys.exit(1)
  219. # Else execute the operation
  220. operation_func = globals()[args.operation]
  221. if quiet:
  222. # If exceptions occur, suppress and exit quietly
  223. try:
  224. operation_func(args)
  225. except Exception:
  226. sys.exit(2)
  227. else:
  228. operation_func(args)
  229. if __name__ == '__main__':
  230. main()