| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2020 Espressif Systems (Shanghai) CO LTD
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import unicode_literals
- import filecmp
- import hashlib
- import os
- import random
- import struct
- import sys
- import tempfile
- import time
- import unittest
- from functools import partial
- from io import open
- from itertools import chain
- import pexpect
- try:
- from itertools import izip as zip
- except ImportError:
- # Python 3
- pass
- current_dir = os.path.dirname(os.path.realpath(__file__))
- mkuf2_dir = os.path.abspath(os.path.join(current_dir, '..'))
- mkuf2_path = os.path.join(mkuf2_dir, 'mkuf2.py')
- try:
- import mkuf2
- except ImportError:
- sys.path.append(mkuf2_dir)
- import mkuf2
- class UF2Block(object):
- def __init__(self, bs):
- self.length = len(bs)
- # See https://github.com/microsoft/uf2 for the format
- first_part = '<' + 'I' * 8
- # payload is between
- last_part = '<I'
- first_part_len = struct.calcsize(first_part)
- last_part_len = struct.calcsize(last_part)
- (self.magicStart0, self.magicStart1, self.flags, self.targetAddr, self.payloadSize, self.blockNo,
- self.numBlocks, self.familyID) = struct.unpack(first_part, bs[:first_part_len])
- self.data = bs[first_part_len:-last_part_len]
- (self.magicEnd, ) = struct.unpack(last_part, bs[-last_part_len:])
- def __len__(self):
- return self.length
- class UF2BlockReader(object):
- def __init__(self, f_name):
- self.f_name = f_name
- def get(self):
- with open(self.f_name, 'rb') as f:
- for chunk in iter(partial(f.read, mkuf2.UF2Writer.UF2_BLOCK_SIZE), b''):
- yield UF2Block(chunk)
- class BinaryWriter(object):
- def __init__(self, f_name):
- self.f_name = f_name
- def append(self, data):
- # File is reopened several times in order to make sure that won't left open
- with open(self.f_name, 'ab') as f:
- f.write(data)
- class BinaryTester(unittest.TestCase):
- def generate_binary(self, size):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- self.addCleanup(os.unlink, f.name)
- for _ in range(size):
- f.write(struct.pack('B', random.randrange(0, 1 << 8)))
- return f.name
- @staticmethod
- def generate_chipID():
- return random.randrange(0, 1 << 32)
- def generate_uf2(self, chip_id, iter_addr_offset_tuples, chunk_size=None):
- of_name = self.generate_binary(0)
- com_args = [mkuf2_path, 'write',
- '-o', of_name,
- '--chip-id', hex(chip_id)]
- com_args += [] if chunk_size is None else ['--chunk-size', str(chunk_size)]
- file_args = list(chain(*[(str(addr), f) for addr, f in iter_addr_offset_tuples]))
- p = pexpect.spawn(sys.executable, com_args + file_args, timeout=20)
- self.addCleanup(p.terminate, force=True)
- exp_list = ['Adding {} at {}'.format(f, hex(addr)) for addr, f in iter_addr_offset_tuples]
- exp_list += ['"{}" has been written.'.format(of_name)]
- for e in exp_list:
- p.expect_exact(e)
- # Do non-blocking wait instead of the blocking p.wait():
- for _ in range(10):
- if not p.isalive():
- break
- time.sleep(0.5)
- # else: will be terminated during cleanup
- return of_name
- def process_blocks(self, uf2block, expected_chip_id):
- flags = mkuf2.UF2Writer.UF2_FLAG_FAMILYID_PRESENT | mkuf2.UF2Writer.UF2_FLAG_MD5_PRESENT
- parsed_binaries = []
- block_list = [] # collect block numbers here
- total_blocks = set() # collect total block numbers here
- for block in UF2BlockReader(uf2block).get():
- if block.blockNo == 0:
- # new file has been detected
- base_addr = block.targetAddr
- current_addr = base_addr
- binary_writer = BinaryWriter(self.generate_binary(0))
- self.assertEqual(len(block), mkuf2.UF2Writer.UF2_BLOCK_SIZE)
- self.assertEqual(block.magicStart0, mkuf2.UF2Writer.UF2_FIRST_MAGIC)
- self.assertEqual(block.magicStart1, mkuf2.UF2Writer.UF2_SECOND_MAGIC)
- self.assertEqual(block.flags & flags, flags)
- self.assertEqual(len(block.data), mkuf2.UF2Writer.UF2_DATA_SIZE)
- payload = block.data[:block.payloadSize]
- md5_obj = hashlib.md5(payload)
- md5_part = block.data[block.payloadSize:block.payloadSize + mkuf2.UF2Writer.UF2_MD5_PART_SIZE]
- address, length = struct.unpack('<II', md5_part[:-md5_obj.digest_size])
- md5sum = md5_part[-md5_obj.digest_size:]
- self.assertEqual(address, block.targetAddr)
- self.assertEqual(length, block.payloadSize)
- self.assertEqual(md5sum, md5_obj.digest())
- self.assertEqual(block.familyID, expected_chip_id)
- self.assertEqual(block.magicEnd, mkuf2.UF2Writer.UF2_FINAL_MAGIC)
- self.assertEqual(current_addr, block.targetAddr)
- binary_writer.append(payload)
- block_list.append(block.blockNo)
- total_blocks.add(block.numBlocks)
- if block.blockNo == block.numBlocks - 1:
- self.assertEqual(block_list, list(range(block.numBlocks)))
- # we have found all blocks and in the right order
- self.assertEqual(total_blocks, {block.numBlocks}) # numBlocks are the same in all the blocks
- del block_list[:]
- total_blocks.clear()
- parsed_binaries += [(base_addr, binary_writer.f_name)]
- current_addr += block.payloadSize
- return parsed_binaries
- def common(self, t, chunk_size=None):
- chip_id = self.generate_chipID()
- parsed_t = self.process_blocks(self.generate_uf2(chip_id, t, chunk_size), chip_id)
- self.assertEqual(len(t), len(parsed_t))
- for (orig_addr, orig_fname), (addr, fname) in zip(t, parsed_t):
- self.assertEqual(orig_addr, addr)
- self.assertTrue(filecmp.cmp(orig_fname, fname))
- def test_simple(self):
- self.common([(0, self.generate_binary(1))])
- def test_more_files(self):
- self.common([(100, self.generate_binary(1)), (200, self.generate_binary(1))])
- def test_larger_files(self):
- self.common([(0x10, self.generate_binary(6)), (0x20, self.generate_binary(8))])
- def test_boundaries(self):
- self.common([(0x100, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE)),
- (0x200, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE + 1)),
- (0x300, self.generate_binary(mkuf2.UF2Writer.UF2_DATA_SIZE - 1))])
- def test_files_with_more_blocks(self):
- self.common([(0x100, self.generate_binary(3 * mkuf2.UF2Writer.UF2_DATA_SIZE)),
- (0x200, self.generate_binary(2 * mkuf2.UF2Writer.UF2_DATA_SIZE + 1)),
- (0x300, self.generate_binary(2 * mkuf2.UF2Writer.UF2_DATA_SIZE - 1))])
- def test_very_large_files(self):
- self.common([(0x100, self.generate_binary(20 * mkuf2.UF2Writer.UF2_DATA_SIZE + 5)),
- (0x10000, self.generate_binary(50 * mkuf2.UF2Writer.UF2_DATA_SIZE + 100)),
- (0x100000, self.generate_binary(100 * mkuf2.UF2Writer.UF2_DATA_SIZE))])
- def test_chunk_size(self):
- chunk_size = 256
- self.common([(0x100, self.generate_binary(chunk_size)),
- (0x200, self.generate_binary(chunk_size + 1)),
- (0x300, self.generate_binary(chunk_size - 1))],
- chunk_size)
- if __name__ == '__main__':
- unittest.main()
|