otatool.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. #!/usr/bin/env python
  2. #
  3. # otatool is used to perform ota-level operations - flashing ota partition
  4. # erasing ota partition and switching ota partition
  5. #
  6. # SPDX-FileCopyrightText: 2018-2021 Espressif Systems (Shanghai) CO LTD
  7. # SPDX-License-Identifier: Apache-2.0
  8. from __future__ import division, print_function
  9. import argparse
  10. import binascii
  11. import collections
  12. import os
  13. import struct
  14. import sys
  15. import tempfile
  16. try:
  17. from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget
  18. except ImportError:
  19. COMPONENTS_PATH = os.path.expandvars(os.path.join('$IDF_PATH', 'components'))
  20. PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, 'partition_table')
  21. sys.path.append(PARTTOOL_DIR)
  22. from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget
  23. __version__ = '2.0'
  24. SPI_FLASH_SEC_SIZE = 0x2000
  25. quiet = False
  26. def status(msg):
  27. if not quiet:
  28. print(msg)
  29. class OtatoolTarget():
  30. OTADATA_PARTITION = PartitionType('data', 'ota')
  31. def __init__(self, port=None, baud=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None,
  32. spi_flash_sec_size=SPI_FLASH_SEC_SIZE, esptool_args=[], esptool_write_args=[],
  33. esptool_read_args=[], esptool_erase_args=[]):
  34. self.target = ParttoolTarget(port, baud, partition_table_offset, partition_table_file, esptool_args,
  35. esptool_write_args, esptool_read_args, esptool_erase_args)
  36. self.spi_flash_sec_size = spi_flash_sec_size
  37. temp_file = tempfile.NamedTemporaryFile(delete=False)
  38. temp_file.close()
  39. try:
  40. self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
  41. with open(temp_file.name, 'rb') as f:
  42. self.otadata = f.read()
  43. finally:
  44. os.unlink(temp_file.name)
  45. def _check_otadata_partition(self):
  46. if not self.otadata:
  47. raise Exception('No otadata partition found')
  48. def erase_otadata(self):
  49. self._check_otadata_partition()
  50. self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
  51. def _get_otadata_info(self):
  52. info = []
  53. otadata_info = collections.namedtuple('otadata_info', 'seq crc')
  54. for i in range(2):
  55. start = i * (self.spi_flash_sec_size >> 1)
  56. seq = bytearray(self.otadata[start:start + 4])
  57. crc = bytearray(self.otadata[start + 28:start + 32])
  58. seq = struct.unpack('I', seq)
  59. crc = struct.unpack('I', crc)
  60. info.append(otadata_info(seq[0], crc[0]))
  61. return info
  62. def _get_partition_id_from_ota_id(self, ota_id):
  63. if isinstance(ota_id, int):
  64. return PartitionType('app', 'ota_' + str(ota_id))
  65. else:
  66. return PartitionName(ota_id)
  67. def switch_ota_partition(self, ota_id):
  68. self._check_otadata_partition()
  69. import gen_esp32part as gen
  70. def is_otadata_info_valid(status):
  71. seq = status.seq % (1 << 32)
  72. crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32)
  73. return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
  74. partition_table = self.target.partition_table
  75. ota_partitions = list()
  76. for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
  77. ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
  78. try:
  79. ota_partitions.append(list(ota_partition)[0])
  80. except IndexError:
  81. break
  82. ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
  83. if not ota_partitions:
  84. raise Exception('No ota app partitions found')
  85. # Look for the app partition to switch to
  86. ota_partition_next = None
  87. try:
  88. if isinstance(ota_id, int):
  89. ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions)
  90. else:
  91. ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
  92. ota_partition_next = list(ota_partition_next)[0]
  93. except IndexError:
  94. raise Exception('Partition to switch to not found')
  95. otadata_info = self._get_otadata_info()
  96. # Find the copy to base the computation for ota sequence number on
  97. otadata_compute_base = -1
  98. # Both are valid, take the max as computation base
  99. if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
  100. if otadata_info[0].seq >= otadata_info[1].seq:
  101. otadata_compute_base = 0
  102. else:
  103. otadata_compute_base = 1
  104. # Only one copy is valid, use that
  105. elif is_otadata_info_valid(otadata_info[0]):
  106. otadata_compute_base = 0
  107. elif is_otadata_info_valid(otadata_info[1]):
  108. otadata_compute_base = 1
  109. # Both are invalid (could be initial state - all 0xFF's)
  110. else:
  111. pass
  112. ota_seq_next = 0
  113. ota_partitions_num = len(ota_partitions)
  114. target_seq = (ota_partition_next.subtype & 0x0F) + 1
  115. # Find the next ota sequence number
  116. if otadata_compute_base == 0 or otadata_compute_base == 1:
  117. base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
  118. i = 0
  119. while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
  120. i += 1
  121. ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
  122. else:
  123. ota_seq_next = target_seq
  124. # Create binary data from computed values
  125. ota_seq_next = struct.pack('I', ota_seq_next)
  126. ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
  127. ota_seq_crc_next = struct.pack('I', ota_seq_crc_next)
  128. temp_file = tempfile.NamedTemporaryFile(delete=False)
  129. temp_file.close()
  130. try:
  131. with open(temp_file.name, 'wb') as otadata_next_file:
  132. start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
  133. otadata_next_file.write(self.otadata)
  134. otadata_next_file.seek(start)
  135. otadata_next_file.write(ota_seq_next)
  136. otadata_next_file.seek(start + 28)
  137. otadata_next_file.write(ota_seq_crc_next)
  138. otadata_next_file.flush()
  139. self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
  140. finally:
  141. os.unlink(temp_file.name)
  142. def read_ota_partition(self, ota_id, output):
  143. self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
  144. def write_ota_partition(self, ota_id, input):
  145. self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input)
  146. def erase_ota_partition(self, ota_id):
  147. self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
  148. def _read_otadata(target):
  149. target._check_otadata_partition()
  150. otadata_info = target._get_otadata_info()
  151. print(' {:8s} \t {:8s} | \t {:8s} \t {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC'))
  152. print('Firmware: 0x{:08x} \t0x{:08x} | \t0x{:08x} \t 0x{:08x}'.format(otadata_info[0].seq, otadata_info[0].crc,
  153. otadata_info[1].seq, otadata_info[1].crc))
  154. def _erase_otadata(target):
  155. target.erase_otadata()
  156. status('Erased ota_data partition contents')
  157. def _switch_ota_partition(target, ota_id):
  158. target.switch_ota_partition(ota_id)
  159. def _read_ota_partition(target, ota_id, output):
  160. target.read_ota_partition(ota_id, output)
  161. status('Read ota partition contents to file {}'.format(output))
  162. def _write_ota_partition(target, ota_id, input):
  163. target.write_ota_partition(ota_id, input)
  164. status('Written contents of file {} to ota partition'.format(input))
  165. def _erase_ota_partition(target, ota_id):
  166. target.erase_ota_partition(ota_id)
  167. status('Erased contents of ota partition')
  168. def main():
  169. global quiet
  170. parser = argparse.ArgumentParser('ESP-IDF OTA Partitions Tool')
  171. parser.add_argument('--quiet', '-q', help='suppress stderr messages', action='store_true')
  172. parser.add_argument('--esptool-args', help='additional main arguments for esptool', nargs='+')
  173. parser.add_argument('--esptool-write-args', help='additional subcommand arguments for esptool write_flash', nargs='+')
  174. parser.add_argument('--esptool-read-args', help='additional subcommand arguments for esptool read_flash', nargs='+')
  175. parser.add_argument('--esptool-erase-args', help='additional subcommand arguments for esptool erase_region', nargs='+')
  176. # There are two possible sources for the partition table: a device attached to the host
  177. # or a partition table CSV/binary file. These sources are mutually exclusive.
  178. parser.add_argument('--port', '-p', help='port where the device to read the partition table from is attached')
  179. parser.add_argument('--baud', '-b', help='baudrate to use', type=int)
  180. parser.add_argument('--partition-table-offset', '-o', help='offset to read the partition table from', type=str)
  181. parser.add_argument('--partition-table-file', '-f', help='file (CSV/binary) to read the partition table from; \
  182. overrides device attached to specified port as the partition table source when defined')
  183. subparsers = parser.add_subparsers(dest='operation', help='run otatool -h for additional help')
  184. spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
  185. spi_flash_sec_size.add_argument('--spi-flash-sec-size', help='value of SPI_FLASH_SEC_SIZE macro', type=str)
  186. # Specify the supported operations
  187. subparsers.add_parser('read_otadata', help='read otadata partition', parents=[spi_flash_sec_size])
  188. subparsers.add_parser('erase_otadata', help='erase otadata partition')
  189. slot_or_name_parser = argparse.ArgumentParser(add_help=False)
  190. slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
  191. slot_or_name_parser_args.add_argument('--slot', help='slot number of the ota partition', type=int)
  192. slot_or_name_parser_args.add_argument('--name', help='name of the ota partition')
  193. subparsers.add_parser('switch_ota_partition', help='switch otadata partition', parents=[slot_or_name_parser, spi_flash_sec_size])
  194. read_ota_partition_subparser = subparsers.add_parser('read_ota_partition', help='read contents of an ota partition', parents=[slot_or_name_parser])
  195. read_ota_partition_subparser.add_argument('--output', help='file to write the contents of the ota partition to', required=True)
  196. write_ota_partition_subparser = subparsers.add_parser('write_ota_partition', help='write contents to an ota partition', parents=[slot_or_name_parser])
  197. write_ota_partition_subparser.add_argument('--input', help='file whose contents to write to the ota partition')
  198. subparsers.add_parser('erase_ota_partition', help='erase contents of an ota partition', parents=[slot_or_name_parser])
  199. args = parser.parse_args()
  200. quiet = args.quiet
  201. # No operation specified, display help and exit
  202. if args.operation is None:
  203. if not quiet:
  204. parser.print_help()
  205. sys.exit(1)
  206. target_args = {}
  207. if args.port:
  208. target_args['port'] = args.port
  209. if args.partition_table_file:
  210. target_args['partition_table_file'] = args.partition_table_file
  211. if args.partition_table_offset:
  212. target_args['partition_table_offset'] = int(args.partition_table_offset, 0)
  213. try:
  214. if args.spi_flash_sec_size:
  215. target_args['spi_flash_sec_size'] = int(args.spi_flash_sec_size, 0)
  216. except AttributeError:
  217. pass
  218. if args.esptool_args:
  219. target_args['esptool_args'] = args.esptool_args
  220. if args.esptool_write_args:
  221. target_args['esptool_write_args'] = args.esptool_write_args
  222. if args.esptool_read_args:
  223. target_args['esptool_read_args'] = args.esptool_read_args
  224. if args.esptool_erase_args:
  225. target_args['esptool_erase_args'] = args.esptool_erase_args
  226. if args.baud:
  227. target_args['baud'] = args.baud
  228. target = OtatoolTarget(**target_args)
  229. # Create the operation table and execute the operation
  230. common_args = {'target':target}
  231. ota_id = []
  232. try:
  233. if args.name is not None:
  234. ota_id = ['name']
  235. else:
  236. if args.slot is not None:
  237. ota_id = ['slot']
  238. except AttributeError:
  239. pass
  240. otatool_ops = {
  241. 'read_otadata':(_read_otadata, []),
  242. 'erase_otadata':(_erase_otadata, []),
  243. 'switch_ota_partition':(_switch_ota_partition, ota_id),
  244. 'read_ota_partition':(_read_ota_partition, ['output'] + ota_id),
  245. 'write_ota_partition':(_write_ota_partition, ['input'] + ota_id),
  246. 'erase_ota_partition':(_erase_ota_partition, ota_id)
  247. }
  248. (op, op_args) = otatool_ops[args.operation]
  249. for op_arg in op_args:
  250. common_args.update({op_arg:vars(args)[op_arg]})
  251. try:
  252. common_args['ota_id'] = common_args.pop('name')
  253. except KeyError:
  254. try:
  255. common_args['ota_id'] = common_args.pop('slot')
  256. except KeyError:
  257. pass
  258. if quiet:
  259. # If exceptions occur, suppress and exit quietly
  260. try:
  261. op(**common_args)
  262. except Exception:
  263. sys.exit(2)
  264. else:
  265. op(**common_args)
  266. if __name__ == '__main__':
  267. main()