| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- #!/usr/bin/env python3
- # SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Apache-2.0
- from typing import Dict, List
- from nvs_logger import NVS_Logger
- from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
- def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
- used_namespaces: Dict[int, None] = {}
- found_namespaces: Dict[int, str] = {}
- blobs: Dict = {}
- blob_chunks: List[NVS_Entry] = []
- empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
- # Partition size check
- if len(nvs_partition.pages) < 3:
- nvs_log.info(
- nvs_log.yellow(
- 'NVS Partition must contain 3 pages (sectors) at least to function properly!'
- )
- )
- # Free/empty page check
- if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
- nvs_log.info(
- nvs_log.red(
- '''No free (empty) page found in the NVS partition,
- at least one free page is required for proper function!'''
- )
- )
- nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
- for page in nvs_partition.pages:
- # page: NVS_Page
- # Print page header
- if page.header['status'] == 'Empty':
- nvs_log.info(nvs_log.cyan('Page Empty'))
- # Check if page is truly empty
- if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
- nvs_log.info(
- nvs_log.red(
- 'The page is reported as Empty but its entry state bitmap is not empty!'
- )
- )
- if any([not e.is_empty for e in page.entries]):
- nvs_log.info(
- nvs_log.red('The page is reported as Empty but there are data written!')
- )
- else:
- # Check page header CRC32
- if page.header['crc']['original'] == page.header['crc']['computed']:
- nvs_log.info(
- nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK'
- )
- else:
- nvs_log.info(
- nvs_log.cyan(f'Page no. {page.header["page_index"]}'),
- f'Original CRC32:',
- nvs_log.red(f'{page.header["crc"]["original"]:x}'),
- f'Generated CRC32:',
- nvs_log.green(f'{page.header["crc"]["computed"]:x}'),
- )
- # Check all entries
- seen_written_entires: Dict[str, list[NVS_Entry]] = {}
- for entry in page.entries:
- # entry: NVS_Entry
- # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
- # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless)
- # and are stored in as entries inside string/blob data entry 'entry.children' list
- # Duplicate entry check (1) - same key, different index - find duplicates
- if entry.state == 'Written':
- if entry.key in seen_written_entires:
- seen_written_entires[entry.key].append(entry)
- else:
- seen_written_entires[entry.key] = [entry]
- # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
- if entry.is_empty:
- if entry.state == 'Written':
- nvs_log.info(
- nvs_log.red(
- f' Entry #{entry.index:03d} is reported as Written but it is empty!'
- )
- )
- continue
- elif entry.state == 'Erased':
- nvs_log.info(
- nvs_log.yellow(
- f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)'
- )
- )
- if entry.state == 'Written':
- # Entry CRC32 check
- if (
- entry.metadata['crc']['original']
- != entry.metadata['crc']['computed']
- ):
- nvs_log.info(
- nvs_log.red(
- f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
- ),
- f'Written:',
- nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
- f'Generated:',
- nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
- )
- # Entry children CRC32 check
- if (
- entry.metadata['span'] > 1
- and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed'])
- ):
- nvs_log.info(
- nvs_log.red(
- f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!'
- ),
- f'Written:',
- nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'),
- f'Generated:',
- nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'),
- )
- # Entry type check
- if entry.metadata['type'] not in [
- nvs_const.item_type[key] for key in nvs_const.item_type
- ]:
- nvs_log.info(
- nvs_log.yellow(
- f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
- ),
- f'Type: {entry.metadata["type"]}',
- )
- # Span check
- if (
- entry.index + entry.metadata['span'] - 1
- >= int(nvs_const.page_size / nvs_const.entry_size) - 2
- ):
- nvs_log.info(
- nvs_log.red(
- f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
- )
- )
- # Spanned entry state checks
- elif entry.metadata['span'] > 1:
- parent_state = entry.state
- for kid in entry.children:
- if parent_state != kid.state:
- nvs_log.info(
- nvs_log.yellow(' Inconsistent data state!'),
- f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
- f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
- )
- # Gather blobs & namespaces
- if entry.metadata['type'] == 'blob_index':
- blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
- empty_entry
- ] * entry.data['chunk_count']
- elif entry.metadata['type'] == 'blob_data':
- blob_chunks.append(entry)
- if entry.metadata['namespace'] == 0:
- found_namespaces[entry.data['value']] = entry.key
- else:
- used_namespaces[entry.metadata['namespace']] = None
- # Duplicate entry check (2) - same key, different index - print duplicates
- duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1]
- for duplicate_entries in duplicate_entries_list:
- # duplicate_entries: list[NVS_Entry]
- nvs_log.info(
- nvs_log.red(
- f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]}
- with status {page.header["status"]} is used by the following entries:'''
- )
- )
- for entry in duplicate_entries:
- nvs_log.info(
- nvs_log.red(
- f'Entry #{entry.index:03d} {entry.key} is a duplicate!'
- )
- )
- nvs_log.info()
- # Blob checks
- # Assemble blobs
- for chunk in blob_chunks:
- parent = blobs.get(
- f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry]
- )[0]
- # Blob chunk without blob index check
- if parent is empty_entry:
- nvs_log.info(
- nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
- f'Namespace index: {chunk.metadata["namespace"]:03d}',
- f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
- f'Chunk Index: {chunk.metadata["chunk_index"]:03d}',
- )
- else:
- blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
- chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
- blobs[blob_key][chunk_index + 1] = chunk
- # Blob data check
- for blob_key in blobs:
- blob_index = blobs[blob_key][0]
- blob_chunks = blobs[blob_key][1:]
- blob_size = blob_index.data['size']
- for i, chunk in enumerate(blob_chunks):
- # Blob missing chunk check
- if chunk is empty_entry:
- nvs_log.info(
- nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
- f'Namespace index: {blob_index.metadata["namespace"]:03d}',
- f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
- f'Chunk Index: {i:03d}',
- )
- else:
- blob_size -= len(chunk.children) * nvs_const.entry_size
- # Blob missing data check
- if blob_size > 0:
- nvs_log.info(
- nvs_log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
- f'Namespace index: {blob_index.metadata["namespace"]:03d}',
- )
- # Namespace checks
- # Undefined namespace index check
- for used_ns in used_namespaces:
- key = found_namespaces.pop(used_ns, '')
- if key == '':
- nvs_log.info(
- nvs_log.red('Undefined namespace index!'),
- f'Namespace index: {used_ns:03d}',
- f'[undefined]',
- )
- # Unused namespace index check
- for unused_ns in found_namespaces:
- nvs_log.info(
- nvs_log.yellow('Found unused namespace.'),
- f'Namespace index: {unused_ns:03d}',
- f'[{found_namespaces[unused_ns]}]',
- )
|