#!/usr/bin/env python3 # # Copyright (c) 2022 Project CHIP Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import argparse import base64 import binascii import csv import json import logging as logger import os import random import shutil import subprocess import sys import cbor2 as cbor import cryptography.hazmat.backends import cryptography.x509 import pyqrcode from intelhex import IntelHex TOOLS = { 'spake2p': None, 'chip-cert': None, 'chip-tool': None, } INVALID_PASSCODES = [00000000, 11111111, 22222222, 33333333, 44444444, 55555555, 66666666, 77777777, 88888888, 99999999, 12345678, 87654321] FACTORY_DATA_VERSION = 1 SERIAL_NUMBER_LEN = 32 # Lengths for manual pairing codes and qrcode SHORT_MANUALCODE_LEN = 11 LONG_MANUALCODE_LEN = 21 QRCODE_LEN = 22 ROTATING_DEVICE_ID_UNIQUE_ID_LEN = 16 HEX_PREFIX = "hex:" DEV_SN_CSV_HDR = "Serial Number,\n" NVS_MEMORY = dict() def nvs_memory_append(key, value): if isinstance(value, str): NVS_MEMORY[key] = value.encode("utf-8") else: NVS_MEMORY[key] = value def nvs_memory_update(key, value): if isinstance(value, str): NVS_MEMORY.update({key: value.encode("utf-8")}) else: NVS_MEMORY.update({key: value}) def check_tools_exists(args): if args.spake2_path: TOOLS['spake2p'] = shutil.which(args.spake2_path) else: TOOLS['spake2p'] = shutil.which('spake2p') if TOOLS['spake2p'] is None: logger.error('spake2p not found, please specify --spake2-path argument') sys.exit(1) # if the certs and keys are not in the generated partitions or the specific dac cert and key are used, # the chip-cert is not needed. if args.paa or (args.pai and (args.dac_cert is None and args.dac_key is None)): if args.chip_cert_path: TOOLS['chip-cert'] = shutil.which(args.chip_cert_path) else: TOOLS['chip-cert'] = shutil.which('chip-cert') if TOOLS['chip-cert'] is None: logger.error('chip-cert not found, please specify --chip-cert-path argument') sys.exit(1) if args.chip_tool_path: TOOLS['chip-tool'] = shutil.which(args.chip_tool_path) else: TOOLS['chip-tool'] = shutil.which('chip-tool') if TOOLS['chip-tool'] is None: logger.error('chip-tool not found, please specify --chip-tool-path argument') sys.exit(1) logger.debug('Using following tools:') logger.debug('spake2p: {}'.format(TOOLS['spake2p'])) logger.debug('chip-cert: {}'.format(TOOLS['chip-cert'])) logger.debug('chip-tool: {}'.format(TOOLS['chip-tool'])) def execute_cmd(cmd): logger.debug('Executing Command: {}'.format(cmd)) status = subprocess.run(cmd, capture_output=True) try: status.check_returncode() except subprocess.CalledProcessError as e: if status.stderr: logger.error('[stderr]: {}'.format(status.stderr.decode('utf-8').strip())) logger.error('Command failed with error: {}'.format(e)) sys.exit(1) def check_str_range(s, min_len, max_len, name): if s and ((len(s) < min_len) or (len(s) > max_len)): logger.error('%s must be between %d and %d characters', name, min_len, max_len) sys.exit(1) def check_int_range(value, min_value, max_value, name): if value and ((value < min_value) or (value > max_value)): logger.error('%s is out of range, should be in range [%d, %d]', name, min_value, max_value) sys.exit(1) def vid_pid_str(vid, pid): return '_'.join([hex(vid)[2:], hex(pid)[2:]]) def read_der_file(path: str): logger.debug("Reading der file {}...", path) try: with open(path, 'rb') as f: data = f.read() return data except IOError as e: logger.error(e) raise e def read_key_bin_file(path: str): try: with open(path, 'rb') as file: key_data = file.read() return key_data except IOError or ValueError: return None def setup_out_dir(out_dir_top, args, serial: str): out_dir = os.sep.join([out_dir_top, vid_pid_str(args.vendor_id, args.product_id)]) if args.in_tree: out_dir = out_dir_top os.makedirs(out_dir, exist_ok=True) dirs = { 'output': os.sep.join([out_dir, serial]), 'internal': os.sep.join([out_dir, serial, 'internal']), } if args.in_tree: dirs['output'] = out_dir dirs['internal'] = os.sep.join([out_dir, 'internal']) os.makedirs(dirs['output'], exist_ok=True) os.makedirs(dirs['internal'], exist_ok=True) return dirs def convert_x509_cert_from_pem_to_der(pem_file, out_der_file): with open(pem_file, 'rb') as f: pem_data = f.read() pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend()) der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER) with open(out_der_file, 'wb') as f: f.write(der_cert) def generate_passcode(args, out_dirs): salt_len_max = 32 cmd = [ TOOLS['spake2p'], 'gen-verifier', '--iteration-count', str(args.spake2_it), '--salt-len', str(salt_len_max), '--out', os.sep.join([out_dirs['output'], 'pin.csv']) ] # If passcode is provided, use it if (args.passcode): cmd.extend(['--pin-code', str(args.passcode)]) execute_cmd(cmd) def generate_discriminator(args, out_dirs): # If discriminator is provided, use it if args.discriminator: disc = args.discriminator else: disc = random.randint(0x0000, 0x0FFF) # Append discriminator to each line of the passcode file with open(os.sep.join([out_dirs['output'], 'pin.csv']), 'r') as fd: lines = fd.readlines() lines[0] = ','.join([lines[0].strip(), 'Discriminator']) for i in range(1, len(lines)): lines[i] = ','.join([lines[i].strip(), str(disc)]) with open(os.sep.join([out_dirs['output'], 'pin_disc.csv']), 'w') as fd: fd.write('\n'.join(lines) + '\n') os.remove(os.sep.join([out_dirs['output'], 'pin.csv'])) def generate_pai_certs(args, ca_key, ca_cert, out_key, out_cert): cmd = [ TOOLS['chip-cert'], 'gen-att-cert', '--type', 'i', '--subject-cn', '"{} PAI {}"'.format(args.cn_prefix, '00'), '--out-key', out_key, '--out', out_cert, ] if args.lifetime: cmd.extend(['--lifetime', str(args.lifetime)]) if args.valid_from: cmd.extend(['--valid-from', str(args.valid_from)]) cmd.extend([ '--subject-vid', hex(args.vendor_id)[2:], '--subject-pid', hex(args.product_id)[2:], '--ca-key', ca_key, '--ca-cert', ca_cert, ]) execute_cmd(cmd) logger.info('Generated PAI certificate: {}'.format(out_cert)) logger.info('Generated PAI private key: {}'.format(out_key)) def setup_root_certificates(args, dirs): pai_cert = { 'cert_pem': None, 'cert_der': None, 'key_pem': None, } # If PAA is passed as input, then generate PAI certificate if args.paa: # output file names pai_cert['cert_pem'] = os.sep.join([dirs['internal'], 'pai_cert.pem']) pai_cert['cert_der'] = os.sep.join([dirs['internal'], 'pai_cert.der']) pai_cert['key_pem'] = os.sep.join([dirs['internal'], 'pai_key.pem']) generate_pai_certs(args, args.key, args.cert, pai_cert['key_pem'], pai_cert['cert_pem']) convert_x509_cert_from_pem_to_der(pai_cert['cert_pem'], pai_cert['cert_der']) logger.info('Generated PAI certificate in DER format: {}'.format(pai_cert['cert_der'])) # If PAI is passed as input, generate DACs elif args.pai: pai_cert['cert_pem'] = args.cert pai_cert['key_pem'] = args.key pai_cert['cert_der'] = os.sep.join([dirs['internal'], 'pai_cert.der']) convert_x509_cert_from_pem_to_der(pai_cert['cert_pem'], pai_cert['cert_der']) logger.info('Generated PAI certificate in DER format: {}'.format(pai_cert['cert_der'])) return pai_cert # Generate the Public and Private key pair binaries def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin): with open(pem_file, 'rb') as f: pem_data = f.read() key_pem = cryptography.hazmat.primitives.serialization.load_pem_private_key(pem_data, None) private_number_val = key_pem.private_numbers().private_value public_number_x = key_pem.public_key().public_numbers().x public_number_y = key_pem.public_key().public_numbers().y public_key_first_byte = 0x04 with open(out_privkey_bin, 'wb') as f: f.write(private_number_val.to_bytes(32, byteorder='big')) with open(out_pubkey_bin, 'wb') as f: f.write(public_key_first_byte.to_bytes(1, byteorder='big')) f.write(public_number_x.to_bytes(32, byteorder='big')) f.write(public_number_y.to_bytes(32, byteorder='big')) def generate_dac_cert(iteration, args, out_dirs, discriminator, passcode, ca_key, ca_cert): out_key_pem = os.sep.join([out_dirs['internal'], 'DAC_key.pem']) out_cert_pem = out_key_pem.replace('key.pem', 'cert.pem') out_cert_der = out_key_pem.replace('key.pem', 'cert.der') out_private_key_bin = out_key_pem.replace('key.pem', 'private_key.bin') out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin') cmd = [ TOOLS['chip-cert'], 'gen-att-cert', '--type', 'd', '--subject-cn', '"{} DAC {}"'.format(args.cn_prefix, iteration), '--out-key', out_key_pem, '--out', out_cert_pem, ] if args.lifetime: cmd.extend(['--lifetime', str(args.lifetime)]) if args.valid_from: cmd.extend(['--valid-from', str(args.valid_from)]) cmd.extend(['--subject-vid', hex(args.vendor_id)[2:], '--subject-pid', hex(args.product_id)[2:], '--ca-key', ca_key, '--ca-cert', ca_cert, ]) execute_cmd(cmd) logger.info('Generated DAC certificate: {}'.format(out_cert_pem)) logger.info('Generated DAC private key: {}'.format(out_key_pem)) convert_x509_cert_from_pem_to_der(out_cert_pem, out_cert_der) logger.info('Generated DAC certificate in DER format: {}'.format(out_cert_der)) generate_keypair_bin(out_key_pem, out_private_key_bin, out_public_key_bin) logger.info('Generated DAC private key in binary format: {}'.format(out_private_key_bin)) logger.info('Generated DAC public key in binary format: {}'.format(out_public_key_bin)) return out_cert_der, out_private_key_bin, out_public_key_bin def use_dac_cert_from_args(args, out_dirs): logger.info('Using DAC from command line arguments...') logger.info('DAC Certificate: {}'.format(args.dac_cert)) logger.info('DAC Private Key: {}'.format(args.dac_key)) # There should be only one UUID in the UUIDs list if DAC is specified out_cert_der = os.sep.join([out_dirs['internal'], 'DAC_cert.der']) out_private_key_bin = out_cert_der.replace('cert.der', 'private_key.bin') out_public_key_bin = out_cert_der.replace('cert.der', 'public_key.bin') convert_x509_cert_from_pem_to_der(args.dac_cert, out_cert_der) logger.info('Generated DAC certificate in DER format: {}'.format(out_cert_der)) generate_keypair_bin(args.dac_key, out_private_key_bin, out_public_key_bin) logger.info('Generated DAC private key in binary format: {}'.format(out_private_key_bin)) logger.info('Generated DAC public key in binary format: {}'.format(out_public_key_bin)) return out_cert_der, out_private_key_bin, out_public_key_bin def get_manualcode_args(vid, pid, flow, discriminator, passcode): payload_args = list() payload_args.append('--discriminator') payload_args.append(str(discriminator)) payload_args.append('--setup-pin-code') payload_args.append(str(passcode)) payload_args.append('--version') payload_args.append('0') payload_args.append('--vendor-id') payload_args.append(str(vid)) payload_args.append('--product-id') payload_args.append(str(pid)) payload_args.append('--commissioning-mode') payload_args.append(str(flow)) return payload_args def get_qrcode_args(vid, pid, flow, discriminator, passcode, disc_mode): payload_args = get_manualcode_args(vid, pid, flow, discriminator, passcode) payload_args.append('--rendezvous') payload_args.append(str(1 << disc_mode)) return payload_args def get_chip_qrcode(chip_tool, vid, pid, flow, discriminator, passcode, disc_mode): payload_args = get_qrcode_args(vid, pid, flow, discriminator, passcode, disc_mode) cmd_args = [chip_tool, 'payload', 'generate-qrcode'] cmd_args.extend(payload_args) data = subprocess.check_output(cmd_args) # Command output is as below: # \x1b[0;32m[1655386003372] [23483:7823617] CHIP: [TOO] QR Code: MT:Y.K90-WB010E7648G00\x1b[0m return data.decode('utf-8').split('QR Code: ')[1][:QRCODE_LEN] def get_chip_manualcode(chip_tool, vid, pid, flow, discriminator, passcode): payload_args = get_manualcode_args(vid, pid, flow, discriminator, passcode) cmd_args = [chip_tool, 'payload', 'generate-manualcode'] cmd_args.extend(payload_args) data = subprocess.check_output(cmd_args) # Command output is as below: # \x1b[0;32m[1655386909774] [24424:7837894] CHIP: [TOO] Manual Code: 749721123365521327689\x1b[0m\n # OR # \x1b[0;32m[1655386926028] [24458:7838229] CHIP: [TOO] Manual Code: 34972112338\x1b[0m\n # Length of manual code depends on the commissioning flow: # For standard commissioning flow it is 11 digits # For User-intent and custom commissioning flow it is 21 digits manual_code_len = LONG_MANUALCODE_LEN if flow else SHORT_MANUALCODE_LEN return data.decode('utf-8').split('Manual Code: ')[1][:manual_code_len] def generate_onboarding_data(args, out_dirs, discriminator, passcode): chip_manualcode = get_chip_manualcode(TOOLS['chip-tool'], args.vendor_id, args.product_id, args.commissioning_flow, discriminator, passcode) chip_qrcode = get_chip_qrcode(TOOLS['chip-tool'], args.vendor_id, args.product_id, args.commissioning_flow, discriminator, passcode, args.discovery_mode) logger.info('Generated QR code: ' + chip_qrcode) logger.info('Generated manual code: ' + chip_manualcode) csv_data = 'qrcode,manualcode,discriminator,passcode\n' csv_data += chip_qrcode + ',' + chip_manualcode + ',' + str(discriminator) + ',' + str(passcode) + '\n' onboarding_data_file = os.sep.join([out_dirs['output'], 'onb_codes.csv']) with open(onboarding_data_file, 'w') as f: f.write(csv_data) # Create QR code image as mentioned in the spec qrcode_file = os.sep.join([out_dirs['output'], 'qrcode.png']) chip_qr = pyqrcode.create(chip_qrcode, version=2, error='M') chip_qr.png(qrcode_file, scale=6) logger.info('Generated onboarding data and QR Code') # This function generates the DACs, picks the commissionable data from the already present csv file, # and generates the onboarding payloads, and writes everything to the master csv def write_device_unique_data(args, out_dirs, pai_cert): with open(os.sep.join([out_dirs['output'], 'pin_disc.csv']), 'r') as csvf: pin_disc_dict = csv.DictReader(csvf) row = pin_disc_dict.__next__() nvs_memory_append('discriminator', int(row['Discriminator'])) nvs_memory_append('spake2_it', int(row['Iteration Count'])) nvs_memory_append('spake2_salt', base64.b64decode(row['Salt'])) nvs_memory_append('spake2_verifier', base64.b64decode(row['Verifier'])) nvs_memory_append('passcode', int(row['PIN Code'])) if args.paa or args.pai: if args.dac_key is not None and args.dac_cert is not None: dacs = use_dac_cert_from_args(args, out_dirs) else: dacs = generate_dac_cert(int(row['Index']), args, out_dirs, int(row['Discriminator']), int(row['PIN Code']), pai_cert['key_pem'], pai_cert['cert_pem']) nvs_memory_append('dac_cert', read_der_file(dacs[0])) nvs_memory_append('dac_key', read_key_bin_file(dacs[1])) nvs_memory_append('pai_cert', read_der_file(pai_cert['cert_der'])) nvs_memory_append('cert_dclrn', read_der_file(args.cert_dclrn)) if (args.enable_rotating_device_id is True) and (args.rd_id_uid is None): nvs_memory_update('rd_uid', os.urandom(ROTATING_DEVICE_ID_UNIQUE_ID_LEN)) # Generate onboarding data generate_onboarding_data(args, out_dirs, int(row['Discriminator']), int(row['PIN Code'])) return dacs def generate_partition(args, out_dirs): logger.info('Generating partition image: offset: 0x{:X} size: 0x{:X}'.format(args.offset, args.size)) cbor_data = cbor.dumps(NVS_MEMORY) # Create hex file if len(cbor_data) > args.size: raise ValueError("generated CBOR file exceeds declared maximum partition size! {} > {}".format(len(cbor_data), args.size)) ih = IntelHex() ih.putsz(args.offset, cbor_data) ih.write_hex_file(os.sep.join([out_dirs['output'], 'factory_data.hex']), True) ih.tobinfile(os.sep.join([out_dirs['output'], 'factory_data.bin'])) def generate_json_summary(args, out_dirs, pai_certs, dacs_cert, serial_num: str): json_dict = dict() json_dict['serial_num'] = serial_num for key, nvs_value in NVS_MEMORY.items(): if (not isinstance(nvs_value, bytes) and not isinstance(nvs_value, bytearray)): json_dict[key] = nvs_value with open(os.sep.join([out_dirs['output'], 'pin_disc.csv']), 'r') as csvf: pin_disc_dict = csv.DictReader(csvf) row = pin_disc_dict.__next__() json_dict['passcode'] = row['PIN Code'] json_dict['spake2_salt'] = row['Salt'] json_dict['spake2_verifier'] = row['Verifier'] with open(os.sep.join([out_dirs['output'], 'onb_codes.csv']), 'r') as csvf: pin_disc_dict = csv.DictReader(csvf) row = pin_disc_dict.__next__() for key, value in row.items(): json_dict[key] = value for key, value in pai_certs.items(): json_dict[key] = value if dacs_cert is not None: json_dict['dac_cert'] = dacs_cert[0] json_dict['dac_priv_key'] = dacs_cert[1] json_dict['dac_pub_key'] = dacs_cert[2] json_dict['cert_dclrn'] = args.cert_dclrn # Format vid & pid as hex json_dict['vendor_id'] = hex(json_dict['vendor_id']) json_dict['product_id'] = hex(json_dict['product_id']) with open(os.sep.join([out_dirs['output'], 'summary.json']), 'w') as json_file: json.dump(json_dict, json_file, indent=4) def add_additional_kv(args, serial_num): # Device instance information if args.vendor_id is not None: nvs_memory_append('vendor_id', args.vendor_id) if args.vendor_name is not None: nvs_memory_append('vendor_name', args.vendor_name) if args.product_id is not None: nvs_memory_append('product_id', args.product_id) if args.product_name is not None: nvs_memory_append('product_name', args.product_name) if args.hw_ver is not None: nvs_memory_append('hw_ver', args.hw_ver) if args.hw_ver_str is not None: nvs_memory_append('hw_ver_str', args.hw_ver_str) if args.mfg_date is not None: nvs_memory_append('date', args.mfg_date) if args.enable_rotating_device_id: nvs_memory_append('rd_uid', args.rd_id_uid) # Add the serial-num nvs_memory_append('sn', serial_num) nvs_memory_append('version', FACTORY_DATA_VERSION) if args.enable_key: nvs_memory_append('enable_key', args.enable_key) # Keys from basic clusters if args.product_label is not None: nvs_memory_append('product_label', args.product_label) if args.product_url is not None: nvs_memory_append('product_url', args.product_url) if args.part_number is not None: nvs_memory_append('part_number', args.part_number) def get_and_validate_args(): def allow_any_int(i): return int(i, 0) def base64_str(s): return base64.b64decode(s) parser = argparse.ArgumentParser(description='Manufacuring partition generator tool', formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=50)) # General options general_args = parser.add_argument_group('General options') general_args.add_argument('-n', '--count', type=allow_any_int, default=1, help='The number of manufacturing partition binaries to generate. Default is 1.') general_args.add_argument("--output", type=str, required=False, default="out", help="[string] Output path where generated data will be stored.") general_args.add_argument("--spake2-path", type=str, required=False, help="[string] Provide Spake2+ tool path") general_args.add_argument("--chip-tool-path", type=str, required=False, help="[string] Provide chip-tool path") general_args.add_argument("--chip-cert-path", type=str, required=False, help="[string] Provide chip-cert path") general_args.add_argument("--overwrite", action="store_true", default=False, help="If output directory exist this argument allows to generate new factory data and overwrite it.") general_args.add_argument("--in-tree", action="store_true", default=False, help="Use it only when building factory data from Matter source code.") general_args.add_argument("--enable-key", type=str, help="[hex string] [128-bit hex-encoded] The Enable Key is a 128-bit value that triggers manufacturer-specific action while invoking the TestEventTrigger Command." "This value is used during Certification Tests, and should not be present on production devices.") # Commissioning options commissioning_args = parser.add_argument_group('Commisioning options') commissioning_args.add_argument('--passcode', type=allow_any_int, help='The passcode for pairing. Randomly generated if not specified.') commissioning_args.add_argument("--spake2-it", type=allow_any_int, default=1000, help="[int] Provide Spake2+ iteration count.") commissioning_args.add_argument('--discriminator', type=allow_any_int, help='The discriminator for pairing. Randomly generated if not specified.') commissioning_args.add_argument('-cf', '--commissioning-flow', type=allow_any_int, default=0, help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. \ Default is 0.', choices=[0, 1, 2]) commissioning_args.add_argument('-dm', '--discovery-mode', type=allow_any_int, default=1, help='Commissionable device discovery networking technology. \ 0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2]) # Device insrance information dev_inst_args = parser.add_argument_group('Device instance information options') dev_inst_args.add_argument('-v', '--vendor-id', type=allow_any_int, required=False, help='Vendor id') dev_inst_args.add_argument('--vendor-name', type=str, required=False, help='Vendor name') dev_inst_args.add_argument('-p', '--product-id', type=allow_any_int, required=False, help='Product id') dev_inst_args.add_argument('--product-name', type=str, required=False, help='Product name') dev_inst_args.add_argument('--hw-ver', type=allow_any_int, required=False, help='Hardware version') dev_inst_args.add_argument('--hw-ver-str', type=str, required=False, help='Hardware version string') dev_inst_args.add_argument('--mfg-date', type=str, required=False, help='Manufacturing date in format YYYY-MM-DD') dev_inst_args.add_argument('--serial-num', type=str, required=False, help='Serial number in hex format') dev_inst_args.add_argument('--enable-rotating-device-id', action='store_true', help='Enable Rotating device id in the generated binaries') dev_inst_args.add_argument('--rd-id-uid', type=str, required=False, help='128-bit unique identifier for generating rotating device identifier, provide 32-byte hex string, e.g. "1234567890abcdef1234567890abcdef"') dac_args = parser.add_argument_group('Device attestation credential options') # If DAC is present then PAI key is not required, so it is marked as not required here # but, if DAC is not present then PAI key is required and that case is validated in validate_args() dac_args.add_argument('-c', '--cert', type=str, required=False, help='The input certificate file in PEM format.') dac_args.add_argument('-k', '--key', type=str, required=False, help='The input key file in PEM format.') dac_args.add_argument('-cd', '--cert-dclrn', type=str, required=True, help='The certificate declaration file in DER format.') dac_args.add_argument('--dac-cert', type=str, help='The input DAC certificate file in PEM format.') dac_args.add_argument('--dac-key', type=str, help='The input DAC private key file in PEM format.') dac_args.add_argument('-cn', '--cn-prefix', type=str, default='Telink', help='The common name prefix of the subject of the generated certificate.') dac_args.add_argument('-lt', '--lifetime', default=4294967295, type=allow_any_int, help='Lifetime of the generated certificate. Default is 4294967295 if not specified, \ this indicate that certificate does not have well defined expiration date.') dac_args.add_argument('-vf', '--valid-from', type=str, help='The start date for the certificate validity period in format --
[ :: ]. \ Default is current date.') input_cert_group = dac_args.add_mutually_exclusive_group(required=False) input_cert_group.add_argument('--paa', action='store_true', help='Use input certificate as PAA certificate.') input_cert_group.add_argument('--pai', action='store_true', help='Use input certificate as PAI certificate.') basic_args = parser.add_argument_group('Few more Basic clusters options') basic_args.add_argument('--product-label', type=str, required=False, help='Product label') basic_args.add_argument('--product-url', type=str, required=False, help='Product URL') basic_args.add_argument('--part_number', type=str, required=False, help='Provide human-readable product number') part_gen_args = parser.add_argument_group('Partition generator options') part_gen_args.add_argument('--offset', type=allow_any_int, default=0x107000, help='Partition offset - an address in devices NVM memory, where factory data will be stored') part_gen_args.add_argument('--size', type=allow_any_int, default=0x1000, help='The maximum partition size') args = parser.parse_args() # Validate in-tree parameter if args.count > 1 and args.in_tree: logger.error('Option --in-tree can not be use together with --count > 1') sys.exit(1) # Validate discriminator and passcode check_int_range(args.discriminator, 0x0000, 0x0FFF, 'Discriminator') if args.passcode is not None: if ((args.passcode < 0x0000001 and args.passcode > 0x5F5E0FE) or (args.passcode in INVALID_PASSCODES)): logger.error('Invalid passcode' + str(args.passcode)) sys.exit(1) # Validate the device instance information check_int_range(args.product_id, 0x0000, 0xFFFF, 'Product id') check_int_range(args.vendor_id, 0x0000, 0xFFFF, 'Vendor id') check_int_range(args.hw_ver, 0x0000, 0xFFFF, 'Hardware version') check_int_range(args.spake2_it, 1, 10000, 'Spake2+ iteration count') check_str_range(args.serial_num, 1, SERIAL_NUMBER_LEN, 'Serial number') check_str_range(args.vendor_name, 1, 32, 'Vendor name') check_str_range(args.product_name, 1, 32, 'Product name') check_str_range(args.hw_ver_str, 1, 64, 'Hardware version string') check_str_range(args.mfg_date, 8, 16, 'Manufacturing date') check_str_range(args.rd_id_uid, 16, 32, 'Rotating device Unique id') # Validates the attestation related arguments # DAC key and DAC cert both should be present or none if (args.dac_key is not None) != (args.dac_cert is not None): logger.error("dac_key and dac_cert should be both present or none") sys.exit(1) else: # Make sure PAI certificate is present if DAC is present if (args.dac_key is not None) and (args.pai is False): logger.error('Please provide PAI certificate along with DAC certificate and DAC key') sys.exit(1) # Validate the input certificate type, if DAC is not present if args.dac_key is None and args.dac_cert is None: if args.paa: logger.info('Input Root certificate type PAA') elif args.pai: logger.info('Input Root certificate type PAI') else: logger.info('Do not include the device attestation certificates and keys in partition binaries') # Check if Key and certificate are present if (args.paa or args.pai) and (args.key is None or args.cert is None): logger.error('CA key and certificate are required to generate DAC key and certificate') sys.exit(1) check_str_range(args.product_label, 1, 64, 'Product Label') check_str_range(args.product_url, 1, 256, 'Product URL') check_str_range(args.part_number, 1, 32, 'Part Number') return args def main(): logger.basicConfig(format='[%(asctime)s] [%(levelname)7s] - %(message)s', level=logger.INFO) args = get_and_validate_args() check_tools_exists(args) if os.path.exists(args.output): if args.overwrite: logger.info("Output directory already exists. All data will be overwritten.") shutil.rmtree(args.output) else: logger.error("Output directory exists! Please use different or remove existing.") exit(1) # If serial number is not passed, then generate one if args.serial_num is None: serial_num_int = int(binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)), 16) logger.info("Serial number not provided. Using generated one: {}".format(hex(serial_num_int))) else: serial_num_int = int(args.serial_num, 16) out_dir_top = os.path.realpath(args.output) os.makedirs(out_dir_top, exist_ok=True) dev_sn_file = open(os.sep.join([out_dir_top, "device_sn.csv"]), "w") dev_sn_file.write(DEV_SN_CSV_HDR) for i in range(args.count): pai_cert = {} serial_num_str = format(serial_num_int + i, 'x') logger.info("Generating for {}".format(serial_num_str)) dev_sn_file.write(serial_num_str + '\n') out_dirs = setup_out_dir(out_dir_top, args, serial_num_str) add_additional_kv(args, serial_num_str) generate_passcode(args, out_dirs) generate_discriminator(args, out_dirs) if args.paa or args.pai: pai_cert = setup_root_certificates(args, out_dirs) dacs_cert = write_device_unique_data(args, out_dirs, pai_cert) generate_partition(args, out_dirs) generate_json_summary(args, out_dirs, pai_cert, dacs_cert, serial_num_str) dev_sn_file.close() if __name__ == "__main__": main()