nvs_check.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. #!/usr/bin/env python3
  2. # SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
  3. # SPDX-License-Identifier: Apache-2.0
  4. from typing import Dict, List
  5. from nvs_logger import NVS_Logger
  6. from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
  7. def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
  8. used_namespaces: Dict[int, None] = {}
  9. found_namespaces: Dict[int, str] = {}
  10. blobs: Dict = {}
  11. blob_chunks: List[NVS_Entry] = []
  12. empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
  13. # Partition size check
  14. if len(nvs_partition.pages) < 3:
  15. nvs_log.info(
  16. nvs_log.yellow(
  17. 'NVS Partition must contain 3 pages (sectors) at least to function properly!'
  18. )
  19. )
  20. # Free/empty page check
  21. if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
  22. nvs_log.info(
  23. nvs_log.red(
  24. '''No free (empty) page found in the NVS partition,
  25. at least one free page is required for proper function!'''
  26. )
  27. )
  28. nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
  29. for page in nvs_partition.pages:
  30. # page: NVS_Page
  31. # Print page header
  32. if page.header['status'] == 'Empty':
  33. nvs_log.info(nvs_log.cyan('Page Empty'))
  34. # Check if page is truly empty
  35. if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
  36. nvs_log.info(
  37. nvs_log.red(
  38. 'The page is reported as Empty but its entry state bitmap is not empty!'
  39. )
  40. )
  41. if any([not e.is_empty for e in page.entries]):
  42. nvs_log.info(
  43. nvs_log.red('The page is reported as Empty but there are data written!')
  44. )
  45. else:
  46. # Check page header CRC32
  47. if page.header['crc']['original'] == page.header['crc']['computed']:
  48. nvs_log.info(
  49. nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK'
  50. )
  51. else:
  52. nvs_log.info(
  53. nvs_log.cyan(f'Page no. {page.header["page_index"]}'),
  54. f'Original CRC32:',
  55. nvs_log.red(f'{page.header["crc"]["original"]:x}'),
  56. f'Generated CRC32:',
  57. nvs_log.green(f'{page.header["crc"]["computed"]:x}'),
  58. )
  59. # Check all entries
  60. seen_written_entires: Dict[str, list[NVS_Entry]] = {}
  61. for entry in page.entries:
  62. # entry: NVS_Entry
  63. # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
  64. # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless)
  65. # and are stored in as entries inside string/blob data entry 'entry.children' list
  66. # Duplicate entry check (1) - same key, different index - find duplicates
  67. if entry.state == 'Written':
  68. if entry.key in seen_written_entires:
  69. seen_written_entires[entry.key].append(entry)
  70. else:
  71. seen_written_entires[entry.key] = [entry]
  72. # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
  73. if entry.is_empty:
  74. if entry.state == 'Written':
  75. nvs_log.info(
  76. nvs_log.red(
  77. f' Entry #{entry.index:03d} is reported as Written but it is empty!'
  78. )
  79. )
  80. continue
  81. elif entry.state == 'Erased':
  82. nvs_log.info(
  83. nvs_log.yellow(
  84. f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)'
  85. )
  86. )
  87. if entry.state == 'Written':
  88. # Entry CRC32 check
  89. if (
  90. entry.metadata['crc']['original']
  91. != entry.metadata['crc']['computed']
  92. ):
  93. nvs_log.info(
  94. nvs_log.red(
  95. f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
  96. ),
  97. f'Written:',
  98. nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
  99. f'Generated:',
  100. nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
  101. )
  102. # Entry children CRC32 check
  103. if (
  104. entry.metadata['span'] > 1
  105. and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed'])
  106. ):
  107. nvs_log.info(
  108. nvs_log.red(
  109. f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!'
  110. ),
  111. f'Written:',
  112. nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'),
  113. f'Generated:',
  114. nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'),
  115. )
  116. # Entry type check
  117. if entry.metadata['type'] not in [
  118. nvs_const.item_type[key] for key in nvs_const.item_type
  119. ]:
  120. nvs_log.info(
  121. nvs_log.yellow(
  122. f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
  123. ),
  124. f'Type: {entry.metadata["type"]}',
  125. )
  126. # Span check
  127. if (
  128. entry.index + entry.metadata['span'] - 1
  129. >= int(nvs_const.page_size / nvs_const.entry_size) - 2
  130. ):
  131. nvs_log.info(
  132. nvs_log.red(
  133. f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
  134. )
  135. )
  136. # Spanned entry state checks
  137. elif entry.metadata['span'] > 1:
  138. parent_state = entry.state
  139. for kid in entry.children:
  140. if parent_state != kid.state:
  141. nvs_log.info(
  142. nvs_log.yellow(' Inconsistent data state!'),
  143. f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
  144. f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
  145. )
  146. # Gather blobs & namespaces
  147. if entry.metadata['type'] == 'blob_index':
  148. blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
  149. empty_entry
  150. ] * entry.data['chunk_count']
  151. elif entry.metadata['type'] == 'blob_data':
  152. blob_chunks.append(entry)
  153. if entry.metadata['namespace'] == 0:
  154. found_namespaces[entry.data['value']] = entry.key
  155. else:
  156. used_namespaces[entry.metadata['namespace']] = None
  157. # Duplicate entry check (2) - same key, different index - print duplicates
  158. duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1]
  159. for duplicate_entries in duplicate_entries_list:
  160. # duplicate_entries: list[NVS_Entry]
  161. nvs_log.info(
  162. nvs_log.red(
  163. f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]}
  164. with status {page.header["status"]} is used by the following entries:'''
  165. )
  166. )
  167. for entry in duplicate_entries:
  168. nvs_log.info(
  169. nvs_log.red(
  170. f'Entry #{entry.index:03d} {entry.key} is a duplicate!'
  171. )
  172. )
  173. nvs_log.info()
  174. # Blob checks
  175. # Assemble blobs
  176. for chunk in blob_chunks:
  177. parent = blobs.get(
  178. f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry]
  179. )[0]
  180. # Blob chunk without blob index check
  181. if parent is empty_entry:
  182. nvs_log.info(
  183. nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
  184. f'Namespace index: {chunk.metadata["namespace"]:03d}',
  185. f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
  186. f'Chunk Index: {chunk.metadata["chunk_index"]:03d}',
  187. )
  188. else:
  189. blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
  190. chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
  191. blobs[blob_key][chunk_index + 1] = chunk
  192. # Blob data check
  193. for blob_key in blobs:
  194. blob_index = blobs[blob_key][0]
  195. blob_chunks = blobs[blob_key][1:]
  196. blob_size = blob_index.data['size']
  197. for i, chunk in enumerate(blob_chunks):
  198. # Blob missing chunk check
  199. if chunk is empty_entry:
  200. nvs_log.info(
  201. nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
  202. f'Namespace index: {blob_index.metadata["namespace"]:03d}',
  203. f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
  204. f'Chunk Index: {i:03d}',
  205. )
  206. else:
  207. blob_size -= len(chunk.children) * nvs_const.entry_size
  208. # Blob missing data check
  209. if blob_size > 0:
  210. nvs_log.info(
  211. nvs_log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
  212. f'Namespace index: {blob_index.metadata["namespace"]:03d}',
  213. )
  214. # Namespace checks
  215. # Undefined namespace index check
  216. for used_ns in used_namespaces:
  217. key = found_namespaces.pop(used_ns, '')
  218. if key == '':
  219. nvs_log.info(
  220. nvs_log.red('Undefined namespace index!'),
  221. f'Namespace index: {used_ns:03d}',
  222. f'[undefined]',
  223. )
  224. # Unused namespace index check
  225. for unused_ns in found_namespaces:
  226. nvs_log.info(
  227. nvs_log.yellow('Found unused namespace.'),
  228. f'Namespace index: {unused_ns:03d}',
  229. f'[{found_namespaces[unused_ns]}]',
  230. )