| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- #!/usr/bin/env python3
- #
- # SPDX-FileCopyrightText: Copyright 2010-2023 Arm Limited and/or its affiliates <open-source-office@arm.com>
- #
- # SPDX-License-Identifier: Apache-2.0
- #
- # Licensed under the Apache License, Version 2.0 (the License); you may
- # not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an AS IS BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import sys
- import json
- import argparse
- import subprocess
- import numpy as np
- import tensorflow as tf
- from conv_settings import ConvSettings
- from softmax_settings import SoftmaxSettings
- from fully_connected_settings import FullyConnectedSettings
- class MODEL_EXTRACTOR(SoftmaxSettings, FullyConnectedSettings, ConvSettings):
- def __init__(self, dataset, schema_file, tflite_model):
- super().__init__(dataset, None, True, True, True, schema_file)
- self.tflite_model = tflite_model
- (self.quantized_multiplier, self.quantized_shift) = 0, 0
- self.is_int16xint8 = False # Only 8-bit supported.
- self.diff_min, self.input_multiplier, self.input_left_shift = 0, 0, 0
- self.supported_ops = ["CONV_2D", "DEPTHWISE_CONV_2D", "FULLY_CONNECTED", "AVERAGE_POOL_2D", "SOFTMAX"]
- def from_bytes(self, tensor_data, type_size) -> list:
- result = []
- tmp_ints = []
- if not (type_size == 1 or type_size == 2 or type_size == 4):
- raise RuntimeError("Size not supported: {}".format(type_size))
- count = 0
- for val in tensor_data:
- tmp_ints.append(val)
- count = count + 1
- if count % type_size == 0:
- tmp_bytes = bytearray(tmp_ints)
- result.append(int.from_bytes(tmp_bytes, 'little', signed=True))
- tmp_ints.clear()
- return result
- def tflite_to_json(self, tflite_input, schema):
- name_without_ext, ext = os.path.splitext(tflite_input)
- new_name = name_without_ext + '.json'
- dirname = os.path.dirname(tflite_input)
- if schema is None:
- raise RuntimeError("A schema file is required.")
- command = f"flatc -o {dirname} --strict-json -t {schema} -- {tflite_input}"
- command_list = command.split(' ')
- try:
- process = subprocess.run(command_list)
- if process.returncode != 0:
- print(f"ERROR: {command = }")
- sys.exit(1)
- except Exception as e:
- raise RuntimeError(f"{e} from: {command = }. Did you install flatc?")
- return new_name
- def write_c_config_header(self, name_prefix, op_name, op_index) -> None:
- filename = f"{name_prefix}_config_data.h"
- self.generated_header_files.append(filename)
- filepath = self.headers_dir + filename
- prefix = f'{op_name}_{op_index}'
- print("Writing C header with config data {}...".format(filepath))
- with open(filepath, "w+") as f:
- self.write_c_common_header(f)
- f.write("#define {}_OUT_CH {}\n".format(prefix, self.output_ch))
- f.write("#define {}_IN_CH {}\n".format(prefix, self.input_ch))
- f.write("#define {}_INPUT_W {}\n".format(prefix, self.x_input))
- f.write("#define {}_INPUT_H {}\n".format(prefix, self.y_input))
- f.write("#define {}_DST_SIZE {}\n".format(prefix,
- self.x_output * self.y_output * self.output_ch * self.batches))
- if op_name == "SOFTMAX":
- f.write("#define {}_NUM_ROWS {}\n".format(prefix, self.y_input))
- f.write("#define {}_ROW_SIZE {}\n".format(prefix, self.x_input))
- f.write("#define {}_MULT {}\n".format(prefix, self.input_multiplier))
- f.write("#define {}_SHIFT {}\n".format(prefix, self.input_left_shift))
- if not self.is_int16xint8:
- f.write("#define {}_DIFF_MIN {}\n".format(prefix, -self.diff_min))
- else:
- f.write("#define {}_FILTER_X {}\n".format(prefix, self.filter_x))
- f.write("#define {}_FILTER_Y {}\n".format(prefix, self.filter_y))
- f.write("#define {}_FILTER_W {}\n".format(prefix, self.filter_x))
- f.write("#define {}_FILTER_H {}\n".format(prefix, self.filter_y))
- f.write("#define {}_STRIDE_X {}\n".format(prefix, self.stride_x))
- f.write("#define {}_STRIDE_Y {}\n".format(prefix, self.stride_y))
- f.write("#define {}_STRIDE_W {}\n".format(prefix, self.stride_x))
- f.write("#define {}_STRIDE_H {}\n".format(prefix, self.stride_y))
- f.write("#define {}_PAD_X {}\n".format(prefix, self.pad_x))
- f.write("#define {}_PAD_Y {}\n".format(prefix, self.pad_y))
- f.write("#define {}_PAD_W {}\n".format(prefix, self.pad_x))
- f.write("#define {}_PAD_H {}\n".format(prefix, self.pad_y))
- f.write("#define {}_OUTPUT_W {}\n".format(prefix, self.x_output))
- f.write("#define {}_OUTPUT_H {}\n".format(prefix, self.y_output))
- f.write("#define {}_INPUT_OFFSET {}\n".format(prefix, -self.input_zero_point))
- f.write("#define {}_INPUT_SIZE {}\n".format(prefix, self.x_input * self.y_input * self.input_ch))
- f.write("#define {}_OUT_ACTIVATION_MIN {}\n".format(prefix, self.out_activation_min))
- f.write("#define {}_OUT_ACTIVATION_MAX {}\n".format(prefix, self.out_activation_max))
- f.write("#define {}_INPUT_BATCHES {}\n".format(prefix, self.batches))
- f.write("#define {}_OUTPUT_OFFSET {}\n".format(prefix, self.output_zero_point))
- f.write("#define {}_DILATION_X {}\n".format(prefix, self.dilation_x))
- f.write("#define {}_DILATION_Y {}\n".format(prefix, self.dilation_y))
- f.write("#define {}_DILATION_W {}\n".format(prefix, self.dilation_x))
- f.write("#define {}_DILATION_H {}\n".format(prefix, self.dilation_y))
- if op_name == "FULLY_CONNECTED":
- f.write("#define {}_OUTPUT_MULTIPLIER {}\n".format(prefix, self.quantized_multiplier))
- f.write("#define {}_OUTPUT_SHIFT {}\n".format(prefix, self.quantized_shift))
- if op_name == "DEPTHWISE_CONV_2D":
- f.write("#define {}_ACCUMULATION_DEPTH {}\n".format(prefix,
- self.input_ch * self.x_input * self.y_input))
- self.format_output_file(filepath)
- def shape_to_config(self, input_shape, filter_shape, output_shape, layer_name):
- if layer_name == "AVERAGE_POOL_2D":
- [_, self.filter_y, self.filter_x, _] = input_shape
- elif layer_name == "CONV_2D" or layer_name == "DEPTHWISE_CONV_2D":
- [self.batches, self.y_input, self.x_input, self.input_ch] = input_shape
- [output_ch, self.filter_y, self.filter_x, self.input_ch] = filter_shape
- elif layer_name == "FULLY_CONNECTED":
- [self.batches, self.input_ch] = input_shape
- [self.input_ch, self.output_ch] = filter_shape
- [self.y_output, self.x_output] = output_shape
- self.x_input = 1
- self.y_input = 1
- elif layer_name == "SOFTMAX":
- [self.y_input, self.x_input] = input_shape
- if len(input_shape) == 4:
- if len(output_shape) == 2:
- [self.y_output, self.x_output] = output_shape
- else:
- [d, self.y_output, self.x_output, d1] = output_shape
- self.calculate_padding(self.x_output, self.y_output, self.x_input, self.y_input)
- def extract_from_model(self, json_file, tensor_details):
- with open(json_file, 'r') as in_file:
- data = in_file.read()
- data = json.loads(data)
- tensors = data['subgraphs'][0]['tensors']
- operators = data['subgraphs'][0]['operators']
- operator_codes = data['operator_codes']
- buffers = data['buffers']
- op_index = 0
- for op in operators:
- if 'opcode_index' in op:
- builtin_name = operator_codes[op['opcode_index']]['builtin_code']
- else:
- builtin_name = ""
- # Get stride and padding.
- if 'builtin_options' in op:
- builtin_options = op['builtin_options']
- if 'stride_w' in builtin_options:
- self.stride_x = builtin_options['stride_w']
- if 'stride_h' in builtin_options:
- self.stride_y = builtin_options['stride_h']
- if 'padding' in builtin_options:
- self.has_padding = False
- self.padding = 'VALID'
- else:
- self.has_padding = True
- self.padding = 'SAME'
- # Generate weights, bias, multipliers, shifts and config.
- if builtin_name not in self.supported_ops:
- print(f"WARNING: skipping unsupported operator {builtin_name}")
- else:
- input_index = op['inputs'][0]
- output_index = op['outputs'][0]
- input_tensor = tensor_details[input_index]
- output_tensor = tensor_details[output_index]
- input_scale = input_tensor['quantization'][0]
- output_scale = output_tensor['quantization'][0]
- self.input_zero_point = input_tensor['quantization'][1]
- self.output_zero_point = output_tensor['quantization'][1]
- input_shape = input_tensor['shape']
- output_shape = output_tensor['shape']
- if builtin_name == "CONV_2D" or builtin_name == "DEPTHWISE_CONV_2D" \
- or builtin_name == "FULLY_CONNECTED":
- weights_index = op['inputs'][1]
- bias_index = op['inputs'][2]
- weight_tensor = tensor_details[weights_index]
- scaling_factors = weight_tensor['quantization_parameters']['scales'].tolist()
- bias = tensors[bias_index]
- weights = tensors[weights_index]
- weights_data_index = weights['buffer']
- weights_data_buffer = buffers[weights_data_index]
- weights_data = self.from_bytes(weights_data_buffer['data'], 1)
- bias_data_index = bias['buffer']
- bias_data_buffer = buffers[bias_data_index]
- bias_data = self.from_bytes(bias_data_buffer['data'], 4)
- self.output_ch = len(scaling_factors)
- filter_shape = weights['shape']
- else:
- filter_shape = []
- self.input_scale, self.output_scale = input_scale, output_scale
- if builtin_name == "SOFTMAX":
- self.calc_softmax_params()
- self.shape_to_config(input_shape, filter_shape, output_shape, builtin_name)
- nice_name = 'layer_' + str(op_index) + '_' + builtin_name.lower()
- if builtin_name == "CONV_2D" or builtin_name == "DEPTHWISE_CONV_2D" \
- or builtin_name == "FULLY_CONNECTED":
- self.generate_c_array(nice_name + "_weights", weights_data)
- self.generate_c_array(nice_name + "_bias", bias_data, datatype='int32_t')
- if builtin_name == "FULLY_CONNECTED":
- self.weights_scale = scaling_factors[0]
- self.quantize_multiplier()
- elif builtin_name == "CONV_2D" or builtin_name == "DEPTHWISE_CONV_2D":
- self.scaling_factors = scaling_factors
- per_channel_multiplier, per_channel_shift = self.generate_quantize_per_channel_multiplier()
- self.generate_c_array(f"{nice_name}_output_mult", per_channel_multiplier, datatype='int32_t')
- self.generate_c_array(f"{nice_name}_output_shift", per_channel_shift, datatype='int32_t')
- self.write_c_config_header(nice_name, builtin_name, op_index)
- op_index = op_index + 1
- def generate_data(self, input_data=None, weights=None, biases=None) -> None:
- interpreter = self.Interpreter(model_path=str(self.tflite_model),
- experimental_op_resolver_type=self.OpResolverType.BUILTIN_REF)
- interpreter.allocate_tensors()
- # Needed for input/output scale/zp as equivalant json file data has too low precision.
- tensor_details = interpreter.get_tensor_details()
- output_details = interpreter.get_output_details()
- (self.output_scale, self.output_zero_point) = output_details[0]['quantization']
- input_details = interpreter.get_input_details()
- if len(input_details) != 1:
- raise RuntimeError("Only single input supported.")
- input_shape = input_details[0]['shape']
- input_data = self.get_randomized_input_data(input_data, input_shape)
- interpreter.set_tensor(input_details[0]["index"], tf.cast(input_data, tf.int8))
- self.generate_c_array("input", input_data)
- json_file = self.tflite_to_json(self.tflite_model, self.schema_file)
- self.extract_from_model(json_file, tensor_details)
- interpreter.invoke()
- output_data = interpreter.get_tensor(output_details[0]["index"])
- self.generate_c_array("output_ref", np.clip(output_data, self.out_activation_min, self.out_activation_max))
- self.write_c_header_wrapper()
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description="Extract operator data from given model if operator is supported."
- "This provides a way for CMSIS-NN to directly process a model.")
- parser.add_argument('--schema-file', type=str, required=True, help="Path to schema file.")
- parser.add_argument('--tflite-model', type=str, required=True, help="Path to tflite file.")
- parser.add_argument('--model-name',
- type=str,
- help="Descriptive model name. If left out it will be inferred from actual model.")
- args = parser.parse_args()
- schema_file = args.schema_file
- tflite_model = args.tflite_model
- if args.model_name:
- dataset = args.model_name
- else:
- dataset, _ = os.path.splitext(os.path.basename(tflite_model))
- model_extractor = MODEL_EXTRACTOR(dataset, schema_file, tflite_model)
- model_extractor.generate_data()
|