| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- #!/usr/bin/env python
- #
- # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from __future__ import print_function
- from future.moves.itertools import zip_longest
- from io import open
- import sys
- import os
- import argparse
- import distutils.dir_util
- try:
- sys.path.insert(0, os.getenv('IDF_PATH') + "/components/nvs_flash/nvs_partition_generator/")
- import nvs_partition_gen
- except Exception as e:
- print(e)
- sys.exit("Please check IDF_PATH")
- def verify_values_exist(input_values_file, values_file_data, key_count_in_values_file, line_no=1):
- """ Verify all keys have corresponding values in values file
- """
- if len(values_file_data) != key_count_in_values_file:
- raise SystemExit("\nError: Number of values is not equal to number of keys in file: %s at line No:%s\n"
- % (str(input_values_file), str(line_no)))
- def verify_keys_exist(values_file_keys, config_file_data):
- """ Verify all keys from config file are present in values file
- """
- keys_missing = []
- for line_no, config_data in enumerate(config_file_data,1):
- if not isinstance(config_data, str):
- config_data = config_data.encode('utf-8')
- config_data_line = config_data.strip().split(',')
- if 'namespace' not in config_data_line:
- if values_file_keys:
- if config_data_line[0] == values_file_keys[0]:
- del values_file_keys[0]
- else:
- keys_missing.append([config_data_line[0], line_no])
- else:
- keys_missing.append([config_data_line[0], line_no])
- if keys_missing:
- for key, line_no in keys_missing:
- print("Key:`", str(key), "` at line no:", str(line_no),
- " in config file is not found in values file.")
- raise SystemExit(1)
- def verify_datatype_encoding(input_config_file, config_file_data):
- """ Verify datatype and encodings from config file is valid
- """
- valid_encodings = ["string", "binary", "hex2bin","u8", "i8", "u16", "u32", "i32","base64"]
- valid_datatypes = ["file","data","namespace"]
- line_no = 0
- for data in config_file_data:
- line_no += 1
- if not isinstance(data, str):
- data = data.encode('utf-8')
- line = data.strip().split(',')
- if line[1] not in valid_datatypes:
- raise SystemExit("Error: config file: %s has invalid datatype at line no:%s\n"
- % (str(input_config_file), str(line_no)))
- if 'namespace' not in line:
- if line[2] not in valid_encodings:
- raise SystemExit("Error: config file: %s has invalid encoding at line no:%s\n"
- % (str(input_config_file), str(line_no)))
- def verify_file_data_count(cfg_file_data, keys_repeat):
- """ Verify count of data on each line in config file is equal to 3
- (as format must be: <key,type and encoding>)
- """
- line_no = 0
- for data in cfg_file_data:
- line_no += 1
- if not isinstance(data, str):
- data = data.encode('utf-8')
- line = data.strip().split(',')
- if len(line) != 3 and line[0] not in keys_repeat:
- raise SystemExit("Error: data missing in config file at line no:%s <format needed:key,type,encoding>\n"
- % str(line_no))
- def verify_data_in_file(input_config_file, input_values_file, config_file_keys, keys_in_values_file, keys_repeat):
- """ Verify count of data on each line in config file is equal to 3 \
- (as format must be: <key,type and encoding>)
- Verify datatype and encodings from config file is valid
- Verify all keys from config file are present in values file and \
- Verify each key has corresponding value in values file
- """
- try:
- values_file_keys = []
- values_file_line = None
- # Get keys from values file present in config files
- values_file_keys = get_keys(keys_in_values_file, config_file_keys)
- with open(input_config_file, 'r', newline='\n') as cfg_file:
- cfg_file_data = cfg_file.readlines()
- verify_file_data_count(cfg_file_data, keys_repeat)
- verify_datatype_encoding(input_config_file, cfg_file_data)
- verify_keys_exist(values_file_keys, cfg_file_data)
- with open(input_values_file, 'r', newline='\n') as values_file:
- key_count_in_values_file = len(keys_in_values_file)
- lineno = 0
- # Read first keys(header) line
- values_file_data = values_file.readline()
- lineno += 1
- while values_file_data:
- # Read values line
- values_file_line = values_file.readline()
- if not isinstance(values_file_line, str):
- values_file_line = values_file_line.encode('utf-8')
- values_file_data = values_file_line.strip().split(',')
- lineno += 1
- if len(values_file_data) == 1 and '' in values_file_data:
- break
- verify_values_exist(input_values_file, values_file_data, key_count_in_values_file, line_no=lineno)
- except Exception as err:
- print(err)
- exit(1)
- def get_keys(keys_in_values_file, config_file_keys):
- """ Get keys from values file present in config file
- """
- values_file_keys = []
- for key in keys_in_values_file:
- if key in config_file_keys:
- values_file_keys.append(key)
- return values_file_keys
- def add_config_data_per_namespace(input_config_file):
- """ Add config data per namespace to `config_data_to_write` list
- """
- config_data_to_write = []
- config_data_per_namespace = []
- with open(input_config_file, 'r', newline='\n') as cfg_file:
- config_data = cfg_file.readlines()
- # `config_data_per_namespace` is added to `config_data_to_write` list after reading next namespace
- for data in config_data:
- if not isinstance(data, str):
- data = data.encode('utf-8')
- cfg_data = data.strip().split(',')
- if 'REPEAT' in cfg_data:
- cfg_data.remove('REPEAT')
- if 'namespace' in cfg_data:
- if config_data_per_namespace:
- config_data_to_write.append(config_data_per_namespace)
- config_data_per_namespace = []
- config_data_per_namespace.append(cfg_data)
- else:
- config_data_per_namespace.append(cfg_data)
- else:
- config_data_per_namespace.append(cfg_data)
- # `config_data_per_namespace` is added to `config_data_to_write` list as EOF is reached
- if (not config_data_to_write) or (config_data_to_write and config_data_per_namespace):
- config_data_to_write.append(config_data_per_namespace)
- return config_data_to_write
- def get_fileid_val(file_identifier, keys_in_config_file, keys_in_values_file,
- values_data_line, key_value_data, fileid_value):
- """ Get file identifier value
- """
- file_id_found = False
- for key in key_value_data:
- if file_identifier and not file_id_found and file_identifier in key:
- fileid_value = key[1]
- file_id_found = True
- if not file_id_found:
- fileid_value = str(int(fileid_value) + 1)
- return fileid_value
- def add_data_to_file(config_data_to_write, key_value_pair, output_csv_file):
- """ Add data to csv target file
- """
- header = ['key', 'type', 'encoding', 'value']
- data_to_write = []
- newline = u"\n"
- target_csv_file = open(output_csv_file, 'w', newline=None)
- line_to_write = u",".join(header)
- target_csv_file.write(line_to_write)
- target_csv_file.write(newline)
- for namespace_config_data in config_data_to_write:
- for data in namespace_config_data:
- data_to_write = data[:]
- if 'namespace' in data:
- data_to_write.append('')
- line_to_write = u",".join(data_to_write)
- target_csv_file.write(line_to_write)
- target_csv_file.write(newline)
- else:
- key = data[0]
- while key not in key_value_pair[0]:
- del key_value_pair[0]
- if key in key_value_pair[0]:
- value = key_value_pair[0][1]
- data_to_write.append(value)
- del key_value_pair[0]
- line_to_write = u",".join(data_to_write)
- target_csv_file.write(line_to_write)
- target_csv_file.write(newline)
- # Set index to start of file
- target_csv_file.seek(0)
- target_csv_file.close()
- def create_dir(filetype, output_dir_path):
- """ Create new directory(if doesn't exist) to store file generated
- """
- output_target_dir = os.path.join(output_dir_path,filetype,'')
- if not os.path.isdir(output_target_dir):
- distutils.dir_util.mkpath(output_target_dir)
- return output_target_dir
- def set_repeat_value(total_keys_repeat, keys, csv_file, target_filename):
- key_val_pair = []
- key_repeated = []
- line = None
- newline = u"\n"
- with open(csv_file, 'r', newline=None) as read_from, open(target_filename,'w', newline=None) as write_to:
- headers = read_from.readline()
- values = read_from.readline()
- write_to.write(headers)
- write_to.write(values)
- if not isinstance(values, str):
- values = values.encode('utf-8')
- values = values.strip().split(',')
- total_keys_values = list(zip_longest(keys, values))
- # read new data, add value if key has repeat tag, write to new file
- line = read_from.readline()
- if not isinstance(line, str):
- line = line.encode('utf-8')
- row = line.strip().split(',')
- while row:
- index = -1
- key_val_new = list(zip_longest(keys, row))
- key_val_pair = total_keys_values[:]
- key_repeated = total_keys_repeat[:]
- while key_val_new and key_repeated:
- index = index + 1
- # if key has repeat tag, get its corresponding value, write to file
- if key_val_new[0][0] == key_repeated[0]:
- val = key_val_pair[0][1]
- row[index] = val
- del key_repeated[0]
- del key_val_new[0]
- del key_val_pair[0]
- line_to_write = u",".join(row)
- write_to.write(line_to_write)
- write_to.write(newline)
- # Read next line
- line = read_from.readline()
- if not isinstance(line, str):
- line = line.encode('utf-8')
- row = line.strip().split(',')
- if len(row) == 1 and '' in row:
- break
- return target_filename
- def create_intermediate_csv(args, keys_in_config_file, keys_in_values_file, keys_repeat, is_encr=False):
- file_identifier_value = '0'
- csv_str = 'csv'
- bin_str = 'bin'
- line = None
- set_output_keyfile = False
- # Add config data per namespace to `config_data_to_write` list
- config_data_to_write = add_config_data_per_namespace(args.conf)
- try:
- with open(args.values, 'r', newline=None) as csv_values_file:
- # first line must be keys in file
- line = csv_values_file.readline()
- if not isinstance(line, str):
- line = line.encode('utf-8')
- keys = line.strip().split(',')
- filename, file_ext = os.path.splitext(args.values)
- target_filename = filename + "_created" + file_ext
- if keys_repeat:
- target_values_file = set_repeat_value(keys_repeat, keys, args.values, target_filename)
- else:
- target_values_file = args.values
- csv_values_file = open(target_values_file, 'r', newline=None)
- # Read header line
- csv_values_file.readline()
- # Create new directory(if doesn't exist) to store csv file generated
- output_csv_target_dir = create_dir(csv_str, args.outdir)
- # Create new directory(if doesn't exist) to store bin file generated
- output_bin_target_dir = create_dir(bin_str, args.outdir)
- if args.keygen:
- set_output_keyfile = True
- line = csv_values_file.readline()
- if not isinstance(line, str):
- line = line.encode('utf-8')
- values_data_line = line.strip().split(',')
- while values_data_line:
- key_value_data = list(zip_longest(keys_in_values_file, values_data_line))
- # Get file identifier value from values file
- file_identifier_value = get_fileid_val(args.fileid, keys_in_config_file,
- keys_in_values_file, values_data_line, key_value_data,
- file_identifier_value)
- key_value_pair = key_value_data[:]
- # Verify if output csv file does not exist
- csv_filename = args.prefix + "-" + file_identifier_value + "." + csv_str
- output_csv_file = output_csv_target_dir + csv_filename
- if os.path.isfile(output_csv_file):
- raise SystemExit("Target csv file: %s already exists.`" % output_csv_file)
- # Add values corresponding to each key to csv intermediate file
- add_data_to_file(config_data_to_write, key_value_pair, output_csv_file)
- print("\nCreated CSV file: ===>", output_csv_file)
- # Verify if output bin file does not exist
- bin_filename = args.prefix + "-" + file_identifier_value + "." + bin_str
- output_bin_file = output_bin_target_dir + bin_filename
- if os.path.isfile(output_bin_file):
- raise SystemExit("Target binary file: %s already exists.`" % output_bin_file)
- args.input = output_csv_file
- args.output = os.path.join(bin_str, bin_filename)
- if set_output_keyfile:
- args.keyfile = "keys-" + args.prefix + "-" + file_identifier_value
- if is_encr:
- nvs_partition_gen.encrypt(args)
- else:
- nvs_partition_gen.generate(args)
- # Read next line
- line = csv_values_file.readline()
- if not isinstance(line, str):
- line = line.encode('utf-8')
- values_data_line = line.strip().split(',')
- if len(values_data_line) == 1 and '' in values_data_line:
- break
- print("\nFiles generated in %s ..." % args.outdir)
- except Exception as e:
- print(e)
- exit(1)
- finally:
- csv_values_file.close()
- def verify_empty_lines_exist(file_name, input_file_data):
- for data in input_file_data:
- if not isinstance(data, str):
- data = data.encode('utf-8')
- cfg_data = data.strip().split(',')
- if len(cfg_data) == 1 and '' in cfg_data:
- raise SystemExit("Error: file: %s cannot have empty lines. " % file_name)
- def verify_file_format(args):
- keys_in_config_file = []
- keys_in_values_file = []
- keys_repeat = []
- file_data_keys = None
- # Verify config file is not empty
- if os.stat(args.conf).st_size == 0:
- raise SystemExit("Error: config file: %s is empty." % args.conf)
- # Verify values file is not empty
- if os.stat(args.values).st_size == 0:
- raise SystemExit("Error: values file: %s is empty." % args.values)
- # Verify config file does not have empty lines
- with open(args.conf, 'r', newline='\n') as csv_config_file:
- try:
- file_data = csv_config_file.readlines()
- verify_empty_lines_exist(args.conf, file_data)
- csv_config_file.seek(0)
- # Extract keys from config file
- for data in file_data:
- if not isinstance(data, str):
- data = data.encode('utf-8')
- line_data = data.strip().split(',')
- if 'namespace' not in line_data:
- keys_in_config_file.append(line_data[0])
- if 'REPEAT' in line_data:
- keys_repeat.append(line_data[0])
- except Exception as e:
- print(e)
- # Verify values file does not have empty lines
- with open(args.values, 'r', newline='\n') as csv_values_file:
- try:
- # Extract keys from values file (first line of file)
- file_data = [csv_values_file.readline()]
- file_data_keys = file_data[0]
- if not isinstance(file_data_keys, str):
- file_data_keys = file_data_keys.encode('utf-8')
- keys_in_values_file = file_data_keys.strip().split(',')
- while file_data:
- verify_empty_lines_exist(args.values, file_data)
- file_data = [csv_values_file.readline()]
- if '' in file_data:
- break
- except Exception as e:
- print(e)
- # Verify file identifier exists in values file
- if args.fileid:
- if args.fileid not in keys_in_values_file:
- raise SystemExit('Error: target_file_identifier: %s does not exist in values file.\n' % args.fileid)
- else:
- args.fileid = 1
- return keys_in_config_file, keys_in_values_file, keys_repeat
- def generate(args):
- keys_in_config_file = []
- keys_in_values_file = []
- keys_repeat = []
- encryption_enabled = False
- args.outdir = os.path.join(args.outdir, '')
- # Verify input config and values file format
- keys_in_config_file, keys_in_values_file, keys_repeat = verify_file_format(args)
- # Verify data in the input_config_file and input_values_file
- verify_data_in_file(args.conf, args.values, keys_in_config_file,
- keys_in_values_file, keys_repeat)
- if (args.keygen or args.inputkey):
- encryption_enabled = True
- print("\nGenerating encrypted NVS binary images...")
- # Create intermediate csv file
- create_intermediate_csv(args, keys_in_config_file, keys_in_values_file,
- keys_repeat, is_encr=encryption_enabled)
- def generate_key(args):
- nvs_partition_gen.generate_key(args)
- def main():
- try:
- parser = argparse.ArgumentParser(description="\nESP Manufacturing Utility", formatter_class=argparse.RawTextHelpFormatter)
- subparser = parser.add_subparsers(title='Commands',
- dest='command',
- help='\nRun mfg_gen.py {command} -h for additional help\n\n')
- parser_gen = subparser.add_parser('generate',
- help='Generate NVS partition',
- formatter_class=argparse.RawTextHelpFormatter)
- parser_gen.set_defaults(func=generate)
- parser_gen.add_argument('conf',
- default=None,
- help='Path to configuration csv file to parse')
- parser_gen.add_argument('values',
- default=None,
- help='Path to values csv file to parse')
- parser_gen.add_argument('prefix',
- default=None,
- help='Unique name for each output filename prefix')
- parser_gen.add_argument('size',
- default=None,
- help='Size of NVS partition in bytes\
- \n(must be multiple of 4096)')
- parser_gen.add_argument('--fileid',
- default=None,
- help='''Unique file identifier(any key in values file) \
- \nfor each filename suffix (Default: numeric value(1,2,3...)''')
- parser_gen.add_argument('--version',
- choices=[1, 2],
- default=2,
- type=int,
- help='''Set multipage blob version.\
- \nVersion 1 - Multipage blob support disabled.\
- \nVersion 2 - Multipage blob support enabled.\
- \nDefault: Version 2 ''')
- parser_gen.add_argument('--keygen',
- action="store_true",
- default=False,
- help='Generates key for encrypting NVS partition')
- parser_gen.add_argument('--keyfile',
- default=None,
- help=argparse.SUPPRESS)
- parser_gen.add_argument('--inputkey',
- default=None,
- help='File having key for encrypting NVS partition')
- parser_gen.add_argument('--outdir',
- default=os.getcwd(),
- help='Output directory to store files created\
- \n(Default: current directory)')
- parser_gen.add_argument('--input',
- default=None,
- help=argparse.SUPPRESS)
- parser_gen.add_argument('--output',
- default=None,
- help=argparse.SUPPRESS)
- parser_gen_key = subparser.add_parser('generate-key',
- help='Generate keys for encryption',
- formatter_class=argparse.RawTextHelpFormatter)
- parser_gen_key.set_defaults(func=generate_key)
- parser_gen_key.add_argument('--keyfile',
- default=None,
- help='Path to output encryption keys file')
- parser_gen_key.add_argument('--outdir',
- default=os.getcwd(),
- help='Output directory to store files created.\
- \n(Default: current directory)')
- args = parser.parse_args()
- args.func(args)
- except ValueError as err:
- print(err)
- except Exception as e:
- print(e)
- if __name__ == "__main__":
- main()
|