mkdfu.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. # This program creates archives compatible with ESP32-S* ROM DFU implementation.
  18. #
  19. # The archives are in CPIO format. Each file which needs to be flashed is added to the archive
  20. # as a separate file. In addition to that, a special index file, 'dfuinfo0.dat', is created.
  21. # This file must be the first one in the archive. It contains binary structures describing each
  22. # subsequent file (for example, where the file needs to be flashed/loaded).
  23. from __future__ import print_function, unicode_literals
  24. import argparse
  25. import hashlib
  26. import json
  27. import os
  28. import struct
  29. import zlib
  30. from collections import namedtuple
  31. from functools import partial
  32. from future.utils import iteritems
  33. try:
  34. import typing
  35. except ImportError:
  36. # Only used for type annotations
  37. pass
  38. try:
  39. from itertools import izip as zip # type: ignore
  40. except ImportError:
  41. # Python 3
  42. pass
  43. # CPIO ("new ASCII") format related things
  44. CPIO_MAGIC = b'070701'
  45. CPIO_STRUCT = b'=6s' + b'8s' * 13
  46. CPIOHeader = namedtuple(
  47. 'CPIOHeader',
  48. [
  49. 'magic',
  50. 'ino',
  51. 'mode',
  52. 'uid',
  53. 'gid',
  54. 'nlink',
  55. 'mtime',
  56. 'filesize',
  57. 'devmajor',
  58. 'devminor',
  59. 'rdevmajor',
  60. 'rdevminor',
  61. 'namesize',
  62. 'check',
  63. ],
  64. )
  65. CPIO_TRAILER = 'TRAILER!!!'
  66. def make_cpio_header(
  67. filename_len, file_len, is_trailer=False
  68. ): # type: (int, int, bool) -> CPIOHeader
  69. """ Returns CPIOHeader for the given file name and file size """
  70. def as_hex(val): # type: (int) -> bytes
  71. return '{:08x}'.format(val).encode('ascii')
  72. hex_0 = as_hex(0)
  73. mode = hex_0 if is_trailer else as_hex(0o0100644)
  74. nlink = as_hex(1) if is_trailer else hex_0
  75. return CPIOHeader(
  76. magic=CPIO_MAGIC,
  77. ino=hex_0,
  78. mode=mode,
  79. uid=hex_0,
  80. gid=hex_0,
  81. nlink=nlink,
  82. mtime=hex_0,
  83. filesize=as_hex(file_len),
  84. devmajor=hex_0,
  85. devminor=hex_0,
  86. rdevmajor=hex_0,
  87. rdevminor=hex_0,
  88. namesize=as_hex(filename_len),
  89. check=hex_0,
  90. )
  91. # DFU format related things
  92. # Structure of one entry in dfuinfo0.dat
  93. DFUINFO_STRUCT = b'<I I 64s 16s'
  94. DFUInfo = namedtuple('DFUInfo', ['address', 'flags', 'name', 'md5'])
  95. DFUINFO_FILE = 'dfuinfo0.dat'
  96. # Structure which gets added at the end of the entire DFU file
  97. DFUSUFFIX_STRUCT = b'<H H H H 3s B'
  98. DFUSuffix = namedtuple(
  99. 'DFUSuffix', ['bcd_device', 'pid', 'vid', 'bcd_dfu', 'sig', 'len']
  100. )
  101. ESPRESSIF_VID = 12346
  102. # This CRC32 gets added after DFUSUFFIX_STRUCT
  103. DFUCRC_STRUCT = b'<I'
  104. def dfu_crc(data, crc=0): # type: (bytes, int) -> int
  105. """ Calculate CRC32/JAMCRC of data, with an optional initial value """
  106. uint32_max = 0xFFFFFFFF
  107. return uint32_max - (zlib.crc32(data, crc) & uint32_max)
  108. def pad_bytes(b, multiple, padding=b'\x00'): # type: (bytes, int, bytes) -> bytes
  109. """ Pad 'b' to a length divisible by 'multiple' """
  110. padded_len = (len(b) + multiple - 1) // multiple * multiple
  111. return b + padding * (padded_len - len(b))
  112. class EspDfuWriter(object):
  113. def __init__(self, dest_file, pid, part_size): # type: (typing.BinaryIO, int, int) -> None
  114. self.dest = dest_file
  115. self.pid = pid
  116. self.part_size = part_size
  117. self.entries = [] # type: typing.List[bytes]
  118. self.index = [] # type: typing.List[DFUInfo]
  119. def add_file(self, flash_addr, path): # type: (int, str) -> None
  120. """
  121. Add file to be written into flash at given address
  122. Files are split up into chunks in order avoid timing-out during erasing large regions. Instead of adding
  123. "app.bin" at flash_addr it will add:
  124. 1. app.bin at flash_addr # sizeof(app.bin) == self.part_size
  125. 2. app.bin.1 at flash_addr + self.part_size
  126. 3. app.bin.2 at flash_addr + 2 * self.part_size
  127. ...
  128. """
  129. f_name = os.path.basename(path)
  130. with open(path, 'rb') as f:
  131. for i, chunk in enumerate(iter(partial(f.read, self.part_size), b'')):
  132. n = f_name if i == 0 else '.'.join([f_name, str(i)])
  133. self._add_cpio_flash_entry(n, flash_addr, chunk)
  134. flash_addr += len(chunk)
  135. def finish(self): # type: () -> None
  136. """ Write DFU file """
  137. # Prepare and add dfuinfo0.dat file
  138. dfuinfo = b''.join([struct.pack(DFUINFO_STRUCT, *item) for item in self.index])
  139. self._add_cpio_entry(DFUINFO_FILE, dfuinfo, first=True)
  140. # Add CPIO archive trailer
  141. self._add_cpio_entry(CPIO_TRAILER, b'', trailer=True)
  142. # Combine all the entries and pad the file
  143. out_data = b''.join(self.entries)
  144. cpio_block_size = 10240
  145. out_data = pad_bytes(out_data, cpio_block_size)
  146. # Add DFU suffix and CRC
  147. dfu_suffix = DFUSuffix(0xFFFF, self.pid, ESPRESSIF_VID, 0x0100, b'UFD', 16)
  148. out_data += struct.pack(DFUSUFFIX_STRUCT, *dfu_suffix)
  149. out_data += struct.pack(DFUCRC_STRUCT, dfu_crc(out_data))
  150. # Finally write the entire binary
  151. self.dest.write(out_data)
  152. def _add_cpio_flash_entry(
  153. self, filename, flash_addr, data
  154. ): # type: (str, int, bytes) -> None
  155. md5 = hashlib.md5()
  156. md5.update(data)
  157. self.index.append(
  158. DFUInfo(
  159. address=flash_addr,
  160. flags=0,
  161. name=filename.encode('utf-8'),
  162. md5=md5.digest(),
  163. )
  164. )
  165. self._add_cpio_entry(filename, data)
  166. def _add_cpio_entry(
  167. self, filename, data, first=False, trailer=False
  168. ): # type: (str, bytes, bool, bool) -> None
  169. filename_b = filename.encode('utf-8') + b'\x00'
  170. cpio_header = make_cpio_header(len(filename_b), len(data), is_trailer=trailer)
  171. entry = pad_bytes(
  172. struct.pack(CPIO_STRUCT, *cpio_header) + filename_b, 4
  173. ) + pad_bytes(data, 4)
  174. if not first:
  175. self.entries.append(entry)
  176. else:
  177. self.entries.insert(0, entry)
  178. def action_write(args): # type: (typing.Mapping[str, typing.Any]) -> None
  179. writer = EspDfuWriter(args['output_file'], args['pid'], args['part_size'])
  180. for addr, f in args['files']:
  181. print('Adding {} at {:#x}'.format(f, addr))
  182. writer.add_file(addr, f)
  183. writer.finish()
  184. print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name))
  185. if args['part_size'] % (4 * 1024) != 0:
  186. print('WARNING: Partition size of DFU is not multiple of 4k (4096). You might get unexpected behavior.')
  187. def main(): # type: () -> None
  188. parser = argparse.ArgumentParser()
  189. # Provision to add "info" command
  190. subparsers = parser.add_subparsers(dest='command')
  191. write_parser = subparsers.add_parser('write')
  192. write_parser.add_argument('-o', '--output-file',
  193. help='Filename for storing the output DFU image',
  194. required=True,
  195. type=argparse.FileType('wb'))
  196. write_parser.add_argument('--pid',
  197. required=True,
  198. type=lambda h: int(h, 16),
  199. help='Hexa-decimal product indentificator')
  200. write_parser.add_argument('--json',
  201. help='Optional file for loading "flash_files" dictionary with <address> <file> items')
  202. write_parser.add_argument('--part-size',
  203. default=os.environ.get('ESP_DFU_PART_SIZE', 512 * 1024),
  204. type=lambda x: int(x, 0),
  205. help='Larger files are split-up into smaller partitions of this size')
  206. write_parser.add_argument('files',
  207. metavar='<address> <file>', help='Add <file> at <address>',
  208. nargs='*')
  209. args = parser.parse_args()
  210. def check_file(file_name): # type: (str) -> str
  211. if not os.path.isfile(file_name):
  212. raise RuntimeError('{} is not a regular file!'.format(file_name))
  213. return file_name
  214. files = []
  215. if args.files:
  216. files += [(int(addr, 0), check_file(f_name)) for addr, f_name in zip(args.files[::2], args.files[1::2])]
  217. if args.json:
  218. json_dir = os.path.dirname(os.path.abspath(args.json))
  219. def process_json_file(path): # type: (str) -> str
  220. '''
  221. The input path is relative to json_dir. This function makes it relative to the current working
  222. directory.
  223. '''
  224. return check_file(os.path.relpath(os.path.join(json_dir, path), start=os.curdir))
  225. with open(args.json) as f:
  226. files += [(int(addr, 0),
  227. process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])]
  228. files = sorted([(addr, f_name.decode('utf-8') if isinstance(f_name, type(b'')) else f_name) for addr, f_name in iteritems(dict(files))],
  229. key=lambda x: x[0]) # remove possible duplicates and sort based on the address
  230. cmd_args = {'output_file': args.output_file,
  231. 'files': files,
  232. 'pid': args.pid,
  233. 'part_size': args.part_size,
  234. }
  235. {'write': action_write
  236. }[args.command](cmd_args)
  237. if __name__ == '__main__':
  238. main()