test_mkuf2.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # SPDX-FileCopyrightText: 2020-2022 Espressif Systems (Shanghai) CO LTD
  5. # SPDX-License-Identifier: Apache-2.0
  6. from __future__ import unicode_literals
  7. import filecmp
  8. import hashlib
  9. import os
  10. import random
  11. import struct
  12. import sys
  13. import tempfile
  14. import time
  15. import unittest
  16. from functools import partial
  17. from io import open
  18. from itertools import chain
  19. import pexpect
  20. current_dir = os.path.dirname(os.path.realpath(__file__))
  21. mkuf2_dir = os.path.abspath(os.path.join(current_dir, '..'))
  22. mkuf2_path = os.path.join(mkuf2_dir, 'mkuf2.py')
  23. try:
  24. import mkuf2
  25. except ImportError:
  26. sys.path.append(mkuf2_dir)
  27. import mkuf2
  28. class UF2Block(object):
  29. def __init__(self, bs):
  30. self.length = len(bs)
  31. # See https://github.com/microsoft/uf2 for the format
  32. first_part = '<' + 'I' * 8
  33. # payload is between
  34. last_part = '<I'
  35. first_part_len = struct.calcsize(first_part)
  36. last_part_len = struct.calcsize(last_part)
  37. (self.magicStart0, self.magicStart1, self.flags, self.targetAddr, self.payloadSize, self.blockNo,
  38. self.numBlocks, self.familyID) = struct.unpack(first_part, bs[:first_part_len])
  39. self.data = bs[first_part_len:-last_part_len]
  40. (self.magicEnd, ) = struct.unpack(last_part, bs[-last_part_len:])
  41. def __len__(self):
  42. return self.length
  43. class UF2BlockReader(object):
  44. def __init__(self, f_name):
  45. self.f_name = f_name
  46. def get(self):
  47. with open(self.f_name, 'rb') as f:
  48. for chunk in iter(partial(f.read, mkuf2.UF2Writer.UF2_BLOCK_SIZE), b''):
  49. yield UF2Block(chunk)
  50. class BinaryWriter(object):
  51. def __init__(self, f_name):
  52. self.f_name = f_name
  53. def append(self, data):
  54. # File is reopened several times in order to make sure that won't left open
  55. with open(self.f_name, 'ab') as f:
  56. f.write(data)
  57. class BinaryTester(unittest.TestCase):
  58. def generate_binary(self, size):
  59. with tempfile.NamedTemporaryFile(delete=False) as f:
  60. self.addCleanup(os.unlink, f.name)
  61. for _ in range(size):
  62. f.write(struct.pack('B', random.randrange(0, 1 << 8)))
  63. return f.name
  64. @staticmethod
  65. def generate_chipID():
  66. return random.randrange(0, 1 << 32)
  67. def generate_uf2(self, chip_id, iter_addr_offset_tuples, chunk_size=None):
  68. of_name = self.generate_binary(0)
  69. com_args = [mkuf2_path, 'write',
  70. '-o', of_name,
  71. '--chip-id', hex(chip_id)]
  72. com_args += [] if chunk_size is None else ['--chunk-size', str(chunk_size)]
  73. file_args = list(chain(*[(str(addr), f) for addr, f in iter_addr_offset_tuples]))
  74. p = pexpect.spawn(sys.executable, com_args + file_args, timeout=20)
  75. self.addCleanup(p.terminate, force=True)
  76. exp_list = ['Adding {} at {}'.format(f, hex(addr)) for addr, f in iter_addr_offset_tuples]
  77. exp_list += ['"{}" has been written.'.format(of_name)]
  78. for e in exp_list:
  79. p.expect_exact(e)
  80. # Do non-blocking wait instead of the blocking p.wait():
  81. for _ in range(10):
  82. if not p.isalive():
  83. break
  84. time.sleep(0.5)
  85. # else: will be terminated during cleanup
  86. return of_name
  87. def process_blocks(self, uf2block, expected_chip_id):
  88. flags = mkuf2.UF2Writer.UF2_FLAG_FAMILYID_PRESENT | mkuf2.UF2Writer.UF2_FLAG_MD5_PRESENT
  89. parsed_binaries = []
  90. block_list = [] # collect block numbers here
  91. total_blocks = set() # collect total block numbers here
  92. for block in UF2BlockReader(uf2block).get():
  93. if block.blockNo == 0:
  94. # new file has been detected
  95. base_addr = block.targetAddr
  96. current_addr = base_addr
  97. binary_writer = BinaryWriter(self.generate_binary(0))
  98. self.assertEqual(len(block), mkuf2.UF2Writer.UF2_BLOCK_SIZE)
  99. self.assertEqual(block.magicStart0, mkuf2.UF2Writer.UF2_FIRST_MAGIC)
  100. self.assertEqual(block.magicStart1, mkuf2.UF2Writer.UF2_SECOND_MAGIC)
  101. self.assertEqual(block.flags & flags, flags)
  102. self.assertEqual(len(block.data), mkuf2.UF2Writer.UF2_DATA_SIZE)
  103. payload = block.data[:block.payloadSize]
  104. md5_obj = hashlib.md5(payload)
  105. md5_part = block.data[block.payloadSize:block.payloadSize + mkuf2.UF2Writer.UF2_MD5_PART_SIZE]
  106. address, length = struct.unpack('<II', md5_part[:-md5_obj.digest_size])
  107. md5sum = md5_part[-md5_obj.digest_size:]
  108. self.assertEqual(address, block.targetAddr)
  109. self.assertEqual(length, block.payloadSize)
  110. self.assertEqual(md5sum, md5_obj.digest())
  111. self.assertEqual(block.familyID, expected_chip_id)
  112. self.assertEqual(block.magicEnd, mkuf2.UF2Writer.UF2_FINAL_MAGIC)
  113. self.assertEqual(current_addr, block.targetAddr)
  114. binary_writer.append(payload)
  115. block_list.append(block.blockNo)
  116. total_blocks.add(block.numBlocks)
  117. if block.blockNo == block.numBlocks - 1:
  118. self.assertEqual(block_list, list(range(block.numBlocks)))
  119. # we have found all blocks and in the right order
  120. self.assertEqual(total_blocks, {block.numBlocks}) # numBlocks are the same in all the blocks
  121. del block_list[:]
  122. total_blocks.clear()
  123. parsed_binaries += [(base_addr, binary_writer.f_name)]
  124. current_addr += block.payloadSize
  125. return parsed_binaries
  126. def common(self, t, chunk_size=None):
  127. chip_id = self.generate_chipID()
  128. parsed_t = self.process_blocks(self.generate_uf2(chip_id, t, chunk_size), chip_id)
  129. self.assertEqual(len(t), len(parsed_t))
  130. for (orig_addr, orig_fname), (addr, fname) in zip(t, parsed_t):
  131. self.assertEqual(orig_addr, addr)
  132. self.assertTrue(filecmp.cmp(orig_fname, fname))
  133. def test_simple(self):
  134. self.common([(0, self.generate_binary(1))])
  135. def test_more_files(self):
  136. self.common([(100, self.generate_binary(1)), (200, self.generate_binary(1))])
  137. def test_larger_files(self):
  138. self.common([(0x10, self.generate_binary(6)), (0x20, self.generate_binary(8))])
  139. def test_boundaries(self):
  140. self.common([(0x100, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE)),
  141. (0x200, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE + 1)),
  142. (0x300, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE - 1))])
  143. def test_files_with_more_blocks(self):
  144. self.common([(0x100, self.generate_binary(3 * mkuf2.UF2Writer.UF2_DATA_SIZE)),
  145. (0x200, self.generate_binary(2 * mkuf2.UF2Writer.UF2_DATA_SIZE + 1)),
  146. (0x300, self.generate_binary(2 * mkuf2.UF2Writer.UF2_DATA_SIZE - 1))])
  147. def test_very_large_files(self):
  148. self.common([(0x100, self.generate_binary(20 * mkuf2.UF2Writer.UF2_DATA_SIZE + 5)),
  149. (0x10000, self.generate_binary(50 * mkuf2.UF2Writer.UF2_DATA_SIZE + 100)),
  150. (0x100000, self.generate_binary(100 * mkuf2.UF2Writer.UF2_DATA_SIZE))])
  151. def test_chunk_size(self):
  152. chunk_size = 256
  153. self.common([(0x100, self.generate_binary(chunk_size)),
  154. (0x200, self.generate_binary(chunk_size + 1)),
  155. (0x300, self.generate_binary(chunk_size - 1))],
  156. chunk_size)
  157. if __name__ == '__main__':
  158. unittest.main()