#!/usr/bin/env python3 # # Copyright (c) 2022 Project CHIP Authors # All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import argparse import hashlib import logging import subprocess import sys from custom import (CertDeclaration, DacCert, DacPKey, Discriminator, HardwareVersion, HardwareVersionStr, IterationCount, ManufacturingDate, PaiCert, PartNumber, ProductId, ProductLabel, ProductName, ProductURL, Salt, SerialNum, SetupPasscode, StrArgument, UniqueId, VendorId, VendorName, Verifier) from default import InputArgument def set_logger(): stdout_handler = logging.StreamHandler(stream=sys.stdout) logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] %(message)s', handlers=[stdout_handler] ) class Spake2p: def __init__(self): pass def generate(self, args): params = self._generate_params(args) args.spake2p_verifier = Verifier(params["Verifier"]) args.salt = Salt(params["Salt"]) args.it = IterationCount(params["Iteration Count"]) def _generate_params(self, args): cmd = [ args.spake2p_path, "gen-verifier", "--iteration-count", str(args.it.val), "--salt", args.salt.encode(), "--pin-code", str(args.passcode.val), "--out", "-" ] out = subprocess.run(cmd, check=True, stdout=subprocess.PIPE).stdout out = out.decode("utf-8").splitlines() return dict(zip(out[0].split(','), out[1].split(','))) class KlvGenerator: def __init__(self, args): self.args = args self._validate_args() self.spake2p = Spake2p() if self.args.spake2p_verifier is None: self.spake2p.generate(self.args) self.args.dac_key.generate_private_key(self.args.dac_key_password) def _validate_args(self): if self.args.dac_key_password is None: logging.warning( "DAC Key password not provided. It means DAC Key is not protected." ) str_args = [obj for key, obj in vars(self.args).items() if isinstance(obj, StrArgument)] for str_arg in str_args: logging.info("key: {} len: {} maxlen: {}".format(str_arg.key(), str_arg.length(), str_arg.max_length())) assert str_arg.length() <= str_arg.max_length() def generate(self): '''Return a list of (K, L, V) tuples. args is essentially a dict, so the entries are not ordered. Sort the objects to ensure the same order of KLV data every time (sorted by key), thus ensuring that SHA256 can be used correctly to compare two output binaries. The new list will contain only InputArgument objects, which generate a (K, L, V) tuple through output() method. ''' data = list() data = [obj for key, obj in vars(self.args).items() if isinstance(obj, InputArgument)] data = [arg.output() for arg in sorted(data, key=lambda x: x.key())] return data def to_bin(self, klv, out, aes128_key): fullContent = bytearray() with open(out, "wb") as file: for entry in klv: fullContent += entry[0].to_bytes(1, "little") fullContent += entry[1].to_bytes(2, "little") fullContent += entry[2] size = len(fullContent) if (aes128_key is None): # Calculate 4 bytes of hashing hashing = hashlib.sha256(fullContent).hexdigest() hashing = hashing[0:8] logging.info("4 byte section hash (for integrity check): {}".format(hashing)) # Add 4 bytes of hashing to generated binary to check for integrity fullContent = bytearray.fromhex(hashing) + fullContent # Add length of data to binary to know how to calculate SHA on embedded fullContent = size.to_bytes(4, "little") + fullContent # Add hash id hashId = bytearray.fromhex("CE47BA5E") hashId.reverse() fullContent = hashId + fullContent size = len(fullContent) logging.info("Size of final generated binary is: {} bytes".format(size)) file.write(fullContent) else: # In case a aes128_key is given the data will be encrypted # Always add a padding to be 16 bytes aligned padding_len = size % 16 padding_len = 16 - padding_len padding_bytes = bytearray(padding_len) logging.info("(Before padding) Size of generated binary is: {} bytes".format(size)) fullContent += padding_bytes size = len(fullContent) logging.info("(After padding) Size of generated binary is: {} bytes".format(size)) from Crypto.Cipher import AES cipher = AES.new(bytes.fromhex(aes128_key), AES.MODE_ECB) fullContentCipher = cipher.encrypt(fullContent) # Add 4 bytes of hashing to generated binary to check for integrity hashing = hashlib.sha256(fullContent).hexdigest() hashing = hashing[0:8] logging.info("4 byte section hash (for integrity check): {}".format(hashing)) fullContentCipher = bytearray.fromhex(hashing) + fullContentCipher # Add length of data to binary to know how to calculate SHA on embedded fullContentCipher = size.to_bytes(4, "little") + fullContentCipher # Add hash id hashId = bytearray.fromhex("CE47BA5E") hashId.reverse() fullContentCipher = hashId.reverse() + fullContentCipher size = len(fullContentCipher) logging.info("Size of final generated binary is: {} bytes".format(size)) file.write(fullContentCipher) out_hash = hashlib.sha256(fullContent).hexdigest() logging.info("SHA256 of generated binary: {}".format(out_hash)) def main(): set_logger() parser = argparse.ArgumentParser(description="NXP Factory Data Generator") optional = parser required = parser.add_argument_group("required arguments") required.add_argument("-i", "--it", required=True, type=IterationCount, help="[int | hex] Spake2 Iteration Counter") required.add_argument("-s", "--salt", required=True, type=Salt, help="[base64 str] Spake2 Salt") required.add_argument("-p", "--passcode", required=True, type=SetupPasscode, help="[int | hex] PASE session passcode") required.add_argument("-d", "--discriminator", required=True, type=Discriminator, help="[int | hex] BLE Pairing discriminator") required.add_argument("--vid", required=True, type=VendorId, help="[int | hex] Vendor Identifier (VID)") required.add_argument("--pid", required=True, type=ProductId, help="[int | hex] Product Identifier (PID)") required.add_argument("--vendor_name", required=True, type=VendorName, help="[str] Vendor Name") required.add_argument("--product_name", required=True, type=ProductName, help="[str] Product Name") required.add_argument("--hw_version", required=True, type=HardwareVersion, help="[int | hex] Hardware version as number") required.add_argument("--hw_version_str", required=True, type=HardwareVersionStr, help="[str] Hardware version as string") required.add_argument("--cert_declaration", required=True, type=CertDeclaration, help="[path] Path to Certification Declaration in DER format") required.add_argument("--dac_cert", required=True, type=DacCert, help="[path] Path to DAC certificate in DER format") required.add_argument("--dac_key", required=True, type=DacPKey, help="[path] Path to DAC key in DER format") required.add_argument("--pai_cert", required=True, type=PaiCert, help="[path] Path to PAI certificate in DER format") required.add_argument("--spake2p_path", required=True, type=str, help="[path] Path to spake2p tool") required.add_argument("--out", required=True, type=str, help="[path] Path to output binary") optional.add_argument("--dac_key_password", type=str, help="[path] Password to decode DAC Key if available") optional.add_argument("--spake2p_verifier", type=Verifier, help="[base64 str] Already generated spake2p verifier") optional.add_argument("--aes128_key", help="[hex] AES 128 bits key used to encrypt the whole dataset") optional.add_argument("--date", type=ManufacturingDate, help="[str] Manufacturing Date (YYYY-MM-DD)") optional.add_argument("--part_number", type=PartNumber, help="[str] PartNumber as String") optional.add_argument("--product_url", type=ProductURL, help="[str] ProductURL as String") optional.add_argument("--product_label", type=ProductLabel, help="[str] ProductLabel as String") optional.add_argument("--serial_num", type=SerialNum, help="[str] Serial Number") optional.add_argument("--unique_id", type=UniqueId, help="[str] Unique identifier for the device") args = parser.parse_args() klv = KlvGenerator(args) data = klv.generate() klv.to_bin(data, args.out, args.aes128_key) if __name__ == "__main__": main()