test_mkuf2.py 8.1 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2020 Espressif Systems (Shanghai) CO LTD
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. from __future__ import unicode_literals
  18. import filecmp
  19. import hashlib
  20. import os
  21. import random
  22. import struct
  23. import sys
  24. import tempfile
  25. import time
  26. import unittest
  27. from functools import partial
  28. from io import open
  29. from itertools import chain
  30. import pexpect
  31. try:
  32. from itertools import izip as zip
  33. except ImportError:
  34. # Python 3
  35. pass
  36. current_dir = os.path.dirname(os.path.realpath(__file__))
  37. mkuf2_dir = os.path.abspath(os.path.join(current_dir, '..'))
  38. mkuf2_path = os.path.join(mkuf2_dir, 'mkuf2.py')
  39. try:
  40. import mkuf2
  41. except ImportError:
  42. sys.path.append(mkuf2_dir)
  43. import mkuf2
  44. class UF2Block(object):
  45. def __init__(self, bs):
  46. self.length = len(bs)
  47. # See https://github.com/microsoft/uf2 for the format
  48. first_part = '<' + 'I' * 8
  49. # payload is between
  50. last_part = '<I'
  51. first_part_len = struct.calcsize(first_part)
  52. last_part_len = struct.calcsize(last_part)
  53. (self.magicStart0, self.magicStart1, self.flags, self.targetAddr, self.payloadSize, self.blockNo,
  54. self.numBlocks, self.familyID) = struct.unpack(first_part, bs[:first_part_len])
  55. self.data = bs[first_part_len:-last_part_len]
  56. (self.magicEnd, ) = struct.unpack(last_part, bs[-last_part_len:])
  57. def __len__(self):
  58. return self.length
  59. class UF2BlockReader(object):
  60. def __init__(self, f_name):
  61. self.f_name = f_name
  62. def get(self):
  63. with open(self.f_name, 'rb') as f:
  64. for chunk in iter(partial(f.read, mkuf2.UF2Writer.UF2_BLOCK_SIZE), b''):
  65. yield UF2Block(chunk)
  66. class BinaryWriter(object):
  67. def __init__(self, f_name):
  68. self.f_name = f_name
  69. def append(self, data):
  70. # File is reopened several times in order to make sure that won't left open
  71. with open(self.f_name, 'ab') as f:
  72. f.write(data)
  73. class BinaryTester(unittest.TestCase):
  74. def generate_binary(self, size):
  75. with tempfile.NamedTemporaryFile(delete=False) as f:
  76. self.addCleanup(os.unlink, f.name)
  77. for _ in range(size):
  78. f.write(struct.pack('B', random.randrange(0, 1 << 8)))
  79. return f.name
  80. @staticmethod
  81. def generate_chipID():
  82. return random.randrange(0, 1 << 32)
  83. def generate_uf2(self, chip_id, iter_addr_offset_tuples, chunk_size=None):
  84. of_name = self.generate_binary(0)
  85. com_args = [mkuf2_path, 'write',
  86. '-o', of_name,
  87. '--chip-id', hex(chip_id)]
  88. com_args += [] if chunk_size is None else ['--chunk-size', str(chunk_size)]
  89. file_args = list(chain(*[(str(addr), f) for addr, f in iter_addr_offset_tuples]))
  90. p = pexpect.spawn(sys.executable, com_args + file_args, timeout=20)
  91. self.addCleanup(p.terminate, force=True)
  92. exp_list = ['Adding {} at {}'.format(f, hex(addr)) for addr, f in iter_addr_offset_tuples]
  93. exp_list += ['"{}" has been written.'.format(of_name)]
  94. for e in exp_list:
  95. p.expect_exact(e)
  96. # Do non-blocking wait instead of the blocking p.wait():
  97. for _ in range(10):
  98. if not p.isalive():
  99. break
  100. time.sleep(0.5)
  101. # else: will be terminated during cleanup
  102. return of_name
  103. def process_blocks(self, uf2block, expected_chip_id):
  104. flags = mkuf2.UF2Writer.UF2_FLAG_FAMILYID_PRESENT | mkuf2.UF2Writer.UF2_FLAG_MD5_PRESENT
  105. parsed_binaries = []
  106. block_list = [] # collect block numbers here
  107. total_blocks = set() # collect total block numbers here
  108. for block in UF2BlockReader(uf2block).get():
  109. if block.blockNo == 0:
  110. # new file has been detected
  111. base_addr = block.targetAddr
  112. current_addr = base_addr
  113. binary_writer = BinaryWriter(self.generate_binary(0))
  114. self.assertEqual(len(block), mkuf2.UF2Writer.UF2_BLOCK_SIZE)
  115. self.assertEqual(block.magicStart0, mkuf2.UF2Writer.UF2_FIRST_MAGIC)
  116. self.assertEqual(block.magicStart1, mkuf2.UF2Writer.UF2_SECOND_MAGIC)
  117. self.assertEqual(block.flags & flags, flags)
  118. self.assertEqual(len(block.data), mkuf2.UF2Writer.UF2_DATA_SIZE)
  119. payload = block.data[:block.payloadSize]
  120. md5_obj = hashlib.md5(payload)
  121. md5_part = block.data[block.payloadSize:block.payloadSize + mkuf2.UF2Writer.UF2_MD5_PART_SIZE]
  122. address, length = struct.unpack('<II', md5_part[:-md5_obj.digest_size])
  123. md5sum = md5_part[-md5_obj.digest_size:]
  124. self.assertEqual(address, block.targetAddr)
  125. self.assertEqual(length, block.payloadSize)
  126. self.assertEqual(md5sum, md5_obj.digest())
  127. self.assertEqual(block.familyID, expected_chip_id)
  128. self.assertEqual(block.magicEnd, mkuf2.UF2Writer.UF2_FINAL_MAGIC)
  129. self.assertEqual(current_addr, block.targetAddr)
  130. binary_writer.append(payload)
  131. block_list.append(block.blockNo)
  132. total_blocks.add(block.numBlocks)
  133. if block.blockNo == block.numBlocks - 1:
  134. self.assertEqual(block_list, list(range(block.numBlocks)))
  135. # we have found all blocks and in the right order
  136. self.assertEqual(total_blocks, {block.numBlocks}) # numBlocks are the same in all the blocks
  137. del block_list[:]
  138. total_blocks.clear()
  139. parsed_binaries += [(base_addr, binary_writer.f_name)]
  140. current_addr += block.payloadSize
  141. return parsed_binaries
  142. def common(self, t, chunk_size=None):
  143. chip_id = self.generate_chipID()
  144. parsed_t = self.process_blocks(self.generate_uf2(chip_id, t, chunk_size), chip_id)
  145. self.assertEqual(len(t), len(parsed_t))
  146. for (orig_addr, orig_fname), (addr, fname) in zip(t, parsed_t):
  147. self.assertEqual(orig_addr, addr)
  148. self.assertTrue(filecmp.cmp(orig_fname, fname))
  149. def test_simple(self):
  150. self.common([(0, self.generate_binary(1))])
  151. def test_more_files(self):
  152. self.common([(100, self.generate_binary(1)), (200, self.generate_binary(1))])
  153. def test_larger_files(self):
  154. self.common([(0x10, self.generate_binary(6)), (0x20, self.generate_binary(8))])
  155. def test_boundaries(self):
  156. self.common([(0x100, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE)),
  157. (0x200, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE + 1)),
  158. (0x300, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE - 1))])
  159. def test_files_with_more_blocks(self):
  160. self.common([(0x100, self.generate_binary(3 * mkuf2.UF2Writer.UF2_DATA_SIZE)),
  161. (0x200, self.generate_binary(2 * mkuf2.UF2Writer.UF2_DATA_SIZE + 1)),
  162. (0x300, self.generate_binary(2 * mkuf2.UF2Writer.UF2_DATA_SIZE - 1))])
  163. def test_very_large_files(self):
  164. self.common([(0x100, self.generate_binary(20 * mkuf2.UF2Writer.UF2_DATA_SIZE + 5)),
  165. (0x10000, self.generate_binary(50 * mkuf2.UF2Writer.UF2_DATA_SIZE + 100)),
  166. (0x100000, self.generate_binary(100 * mkuf2.UF2Writer.UF2_DATA_SIZE))])
  167. def test_chunk_size(self):
  168. chunk_size = 256
  169. self.common([(0x100, self.generate_binary(chunk_size)),
  170. (0x200, self.generate_binary(chunk_size + 1)),
  171. (0x300, self.generate_binary(chunk_size - 1))],
  172. chunk_size)
  173. if __name__ == '__main__':
  174. unittest.main()