nvs_logger.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. #!/usr/bin/env python3
  2. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  3. # SPDX-License-Identifier: Apache-2.0
  4. import binascii
  5. import json
  6. import sys
  7. from typing import Any, Dict, List, Union
  8. from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
  9. class NVS_Logger:
  10. ansi = {
  11. 'red': '\033[31m',
  12. 'green': '\033[32m',
  13. 'yellow': '\033[33m',
  14. 'blue': '\033[34m',
  15. 'cyan': '\033[36m',
  16. 'bold': '\033[1m',
  17. 'clear': '\033[0m',
  18. }
  19. def __init__(self, *, color: str = 'auto', out_format: str = 'text'):
  20. self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
  21. self.output_format = out_format
  22. def set_color(self, color: str) -> None:
  23. self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
  24. def set_format(self, out_format: str) -> None:
  25. self.output_format = out_format
  26. def info(self, *args, **kwargs) -> None: # type: ignore
  27. kwargs['file'] = kwargs.get(
  28. 'file', sys.stdout
  29. ) # Set default output to be stdout, but can be overwritten
  30. print(*args, **kwargs)
  31. def error(self, *args, **kwargs) -> None: # type: ignore
  32. kwargs['file'] = kwargs.get(
  33. 'file', sys.stderr
  34. ) # Set default output to be stderr, but can be overwritten
  35. print(*args, **kwargs)
  36. def red(self, text: str) -> str:
  37. if self.color:
  38. return NVS_Logger.ansi['red'] + text + NVS_Logger.ansi['clear']
  39. return text
  40. def green(self, text: str) -> str:
  41. if self.color:
  42. return NVS_Logger.ansi['green'] + text + NVS_Logger.ansi['clear']
  43. return text
  44. def yellow(self, text: str) -> str:
  45. if self.color:
  46. return NVS_Logger.ansi['yellow'] + text + NVS_Logger.ansi['clear']
  47. return text
  48. def blue(self, text: str) -> str:
  49. if self.color:
  50. return NVS_Logger.ansi['blue'] + text + NVS_Logger.ansi['clear']
  51. return text
  52. def cyan(self, text: str) -> str:
  53. if self.color:
  54. return NVS_Logger.ansi['cyan'] + text + NVS_Logger.ansi['clear']
  55. return text
  56. def bold(self, text: str) -> str:
  57. if self.color:
  58. return NVS_Logger.ansi['bold'] + text + NVS_Logger.ansi['clear']
  59. return text
  60. nvs_log = NVS_Logger()
  61. def storage_stats(nvs_partition: NVS_Partition) -> None:
  62. global_stats = {
  63. 'written_entries': 0,
  64. 'free_entries': 0,
  65. 'erased_entries': 0,
  66. 'invalid_entries': 0,
  67. }
  68. for page in nvs_partition.pages:
  69. written_e = 0
  70. free_e = 0
  71. erased_e = 0
  72. invalid_e = 0
  73. for entry in page.entries:
  74. if entry.state == 'Written':
  75. written_e += 1
  76. elif entry.state == 'Empty':
  77. free_e += 1
  78. elif entry.state == 'Erased':
  79. erased_e += 1
  80. else:
  81. invalid_e += 1
  82. nvs_log.info(nvs_log.bold(f'Page {page.header["status"]}'))
  83. nvs_log.info(' Found entries:')
  84. nvs_log.info(f' Written: {written_e: 5d}')
  85. nvs_log.info(f' Erased: {erased_e: 5d}')
  86. nvs_log.info(f' Empty: {free_e: 5d}')
  87. nvs_log.info(f' Invalid: {invalid_e: 5d}')
  88. nvs_log.info(f' Total: {written_e + free_e + erased_e + invalid_e: 5d}')
  89. nvs_log.info()
  90. global_stats['written_entries'] += written_e
  91. global_stats['erased_entries'] += erased_e
  92. global_stats['free_entries'] += free_e
  93. global_stats['invalid_entries'] += invalid_e
  94. nvs_log.info(nvs_log.bold('Global'))
  95. nvs_log.info(' Config:')
  96. nvs_log.info(f' Page size: {nvs_const.page_size: 5d}')
  97. nvs_log.info(f' Entry size: {nvs_const.entry_size: 5d}')
  98. nvs_log.info(f' Total pages: {len(nvs_partition.pages): 5d}')
  99. nvs_log.info(' Entries:')
  100. nvs_log.info(f' Written: {global_stats["written_entries"]: 5d}')
  101. nvs_log.info(f' Erased: {global_stats["erased_entries"]: 5d}')
  102. nvs_log.info(f' Empty: {global_stats["free_entries"]: 5d}')
  103. nvs_log.info(f' Invalid: {global_stats["invalid_entries"]: 5d}')
  104. nvs_log.info(f' Total: {sum([global_stats[key] for key in global_stats]): 5d}')
  105. nvs_log.info()
  106. def dump_everything(nvs_partition: NVS_Partition, written_only: bool = False) -> None:
  107. for page in nvs_partition.pages:
  108. # Print page header
  109. if page.is_empty:
  110. nvs_log.info(
  111. nvs_log.bold(f'Page Empty, Page address: 0x{page.start_address:x}')
  112. )
  113. else:
  114. if (
  115. page.header['crc']['original'] == page.header['crc']['computed']
  116. ): # Color CRC32
  117. crc = nvs_log.green(f'{page.header["crc"]["original"]: >8x}')
  118. else:
  119. crc = nvs_log.red(f'{page.header["crc"]["original"]: >8x}')
  120. nvs_log.info(
  121. nvs_log.bold(
  122. f'Page no. {page.header["page_index"]}'
  123. + f', Status: {page.header["status"]}'
  124. + f', Version: {page.header["version"]}'
  125. + f', CRC32: {crc}'
  126. )
  127. + nvs_log.bold(f', Page address: 0x{page.start_address:x}')
  128. )
  129. nvs_log.info(nvs_log.bold(' Entry state bitmap: '), end='')
  130. for x in page.raw_entry_state_bitmap:
  131. nvs_log.info(f'{x:02x} ', end='')
  132. nvs_log.info()
  133. # Dump entries
  134. empty_entries = []
  135. for entry in page.entries:
  136. # Skip non-written entries if needed
  137. if written_only and not entry.state == 'Written':
  138. continue
  139. # Compress all empty entries
  140. if (
  141. entry.state == 'Empty' and entry.is_empty
  142. ): # Gather all subsequent empty entries
  143. empty_entries.append(entry)
  144. continue
  145. else:
  146. # Print the empty entries
  147. if len(empty_entries) >= 3: # There is enough entries to compress
  148. nvs_log.info(
  149. nvs_log.bold(f' {empty_entries[0].index:03d}.'), 'Empty'
  150. )
  151. nvs_log.info(nvs_log.bold(' ...'))
  152. nvs_log.info(
  153. nvs_log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty'
  154. )
  155. else: # No need for compression
  156. for e in empty_entries:
  157. nvs_log.info(nvs_log.bold(f' {e.index:03d}.'), 'Empty')
  158. empty_entries.clear()
  159. # Dump a single entry
  160. status = entry.state
  161. if status == 'Written':
  162. status = nvs_log.green(f'{status: <7}')
  163. elif status == 'Erased':
  164. status = nvs_log.red(f'{status: <7}')
  165. crc = ''
  166. if (
  167. entry.metadata['crc']['original'] == entry.metadata['crc']['computed']
  168. ): # Color CRC32
  169. crc = nvs_log.green(f'{entry.metadata["crc"]["original"]: >8x}')
  170. else:
  171. crc = nvs_log.red(f'{entry.metadata["crc"]["original"]: >8x}')
  172. nvs_log.info(
  173. nvs_log.bold(f' {entry.index:03d}.')
  174. + ' '
  175. + status
  176. + f', Namespace Index: {entry.metadata["namespace"]:03d}'
  177. + f', Type: {entry.metadata["type"]:<10}'
  178. + f', Span: {entry.metadata["span"]:03d}'
  179. + f', Chunk Index: {entry.metadata["chunk_index"]:03d}'
  180. + f', CRC32: {crc}'
  181. + f' | {entry.key} : ',
  182. end='',
  183. )
  184. if entry.metadata['type'] not in [
  185. 'string',
  186. 'blob_data',
  187. 'blob_index',
  188. 'blob',
  189. ]: # Entry is non-variable length
  190. if entry.data is not None:
  191. nvs_log.info(entry.data['value'])
  192. else:
  193. nvs_log.info(entry.data) # None
  194. else:
  195. if entry.metadata['type'] == 'blob_index':
  196. nvs_log.info(
  197. f'Size={entry.data["size"]}'
  198. + f', ChunkCount={entry.data["chunk_count"]}'
  199. + f', ChunkStart={entry.data["chunk_start"]}'
  200. )
  201. else:
  202. if (
  203. entry.metadata['crc']['data_original']
  204. == entry.metadata['crc']['data_computed']
  205. ): # Color CRC32
  206. crc = nvs_log.green(
  207. f'{entry.metadata["crc"]["data_original"]:x}'
  208. )
  209. else:
  210. crc = nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}')
  211. nvs_log.info(f'Size={entry.data["size"]}, CRC32={crc}')
  212. # Dump all children entries
  213. if entry.metadata['span'] != 1:
  214. for i, data in enumerate(entry.children):
  215. nvs_log.info(
  216. f'{"": >6}0x{(i*nvs_const.entry_size):03x} {data.dump_raw()}'
  217. )
  218. # Dump trailing empty entries
  219. if len(empty_entries) >= 3:
  220. nvs_log.info(nvs_log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
  221. nvs_log.info(nvs_log.bold(' ...'))
  222. nvs_log.info(nvs_log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
  223. else:
  224. for e in empty_entries:
  225. nvs_log.info(nvs_log.bold(f' {e.index:03d}.'), 'Empty')
  226. empty_entries.clear()
  227. nvs_log.info()
  228. def dump_written_entries(nvs_partition: NVS_Partition) -> None:
  229. dump_everything(nvs_partition, True)
  230. def list_namespaces(nvs_partition: NVS_Partition) -> None:
  231. # Gather namespaces
  232. ns = {}
  233. for page in nvs_partition.pages:
  234. for entry in page.entries:
  235. if entry.state == 'Written' and entry.metadata['namespace'] == 0:
  236. ns[entry.data['value']] = entry.key
  237. # Print found namespaces
  238. nvs_log.info(nvs_log.bold(f'Index : Namespace'))
  239. for ns_index in sorted(ns):
  240. nvs_log.info(f' {ns_index:03d} :', nvs_log.cyan(ns[ns_index]))
  241. def dump_key_value_pairs(nvs_partition: NVS_Partition) -> None:
  242. # Get namespace list
  243. ns = {}
  244. for page in nvs_partition.pages:
  245. for entry in page.entries:
  246. if entry.state == 'Written' and entry.metadata['namespace'] == 0:
  247. ns[entry.data['value']] = entry.key
  248. # Print key-value pairs
  249. for page in nvs_partition.pages:
  250. # Print page header
  251. if page.is_empty:
  252. nvs_log.info(nvs_log.bold('Page Empty'))
  253. else:
  254. nvs_log.info(
  255. nvs_log.bold(
  256. f'Page no. {page.header["page_index"]}'
  257. + f', Status: {page.header["status"]}'
  258. )
  259. )
  260. # Print entries
  261. for entry in page.entries:
  262. if (
  263. entry.state == 'Written' and entry.metadata['namespace'] != 0
  264. ): # Ignore non-written entries
  265. chunk_index = ''
  266. data = ''
  267. if entry.metadata['type'] not in [
  268. 'string',
  269. 'blob_data',
  270. 'blob_index',
  271. 'blob',
  272. ]: # Non-variable length entry
  273. data = entry.data['value']
  274. elif entry.metadata['type'] == 'blob_index':
  275. continue
  276. else: # Variable length entries
  277. tmp = b''
  278. for e in entry.children: # Merge all children entries
  279. tmp += bytes(e.raw)
  280. tmp = tmp[: entry.data['size']] # Discard padding
  281. if entry.metadata['type'] == 'blob_data':
  282. if entry.metadata['chunk_index'] >= 128: # Get real chunk index
  283. chunk_index = f'[{entry.metadata["chunk_index"] - 128}]'
  284. else:
  285. chunk_index = f'[{entry.metadata["chunk_index"]}]'
  286. data = str(tmp)
  287. if entry.metadata['namespace'] not in ns:
  288. continue
  289. else:
  290. nvs_log.info(
  291. ' '
  292. + nvs_log.cyan(ns[entry.metadata['namespace']])
  293. + ':'
  294. + nvs_log.yellow(entry.key)
  295. + f'{chunk_index} = {data}'
  296. )
  297. nvs_log.info()
  298. def dump_written_blobs(nvs_partition: NVS_Partition) -> None:
  299. blobs: Dict = {}
  300. strings: List[NVS_Entry] = []
  301. legacy_blobs: List[NVS_Entry] = []
  302. ns = {}
  303. empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
  304. # Gather namespaces, blob indexes and legacy blobs
  305. for page in nvs_partition.pages:
  306. for entry in page.entries:
  307. if entry.state == 'Written':
  308. if entry.metadata['type'] == 'blob_index':
  309. blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
  310. empty_entry
  311. ] * entry.data['chunk_count']
  312. elif entry.metadata['type'] == 'blob':
  313. legacy_blobs.append(entry)
  314. elif entry.metadata['type'] == 'string':
  315. strings.append(entry)
  316. elif entry.metadata['namespace'] == 0:
  317. ns[entry.data['value']] = entry.key
  318. # Dump blobs
  319. for key in blobs:
  320. for page in nvs_partition.pages:
  321. for entry in page.entries:
  322. # Gather all blob chunks
  323. if (
  324. entry.state == 'Written'
  325. and entry.metadata['type'] != 'blob_index'
  326. and entry.metadata['namespace']
  327. == blobs[key][0].metadata['namespace']
  328. and entry.key == blobs[key][0].key
  329. ):
  330. blobs[key][
  331. 1
  332. + entry.metadata['chunk_index']
  333. - blobs[key][0].data['chunk_start']
  334. ] = entry
  335. blob_index = blobs[key][0]
  336. blob_chunks = blobs[key][1:]
  337. # Print blob info
  338. nvs_log.info(
  339. nvs_log.cyan(
  340. ns.get(
  341. blob_index.metadata['namespace'], blob_index.metadata['namespace']
  342. )
  343. )
  344. + ':'
  345. + nvs_log.yellow(blob_index.key)
  346. + ' - '
  347. + f'Type: Blob (Version 2), '
  348. + f'Size: {blob_index.data["size"]}'
  349. )
  350. # Print blob data
  351. raw_entries = []
  352. for kid in blob_chunks: # Gather all chunk entries
  353. if kid is empty_entry:
  354. raw_entries += [empty_entry]
  355. else:
  356. raw_entries += kid.children
  357. for i, entry in enumerate(raw_entries):
  358. if entry is empty_entry:
  359. nvs_log.info(nvs_log.yellow(f' {"":->63} Missing data {"":-<64}'))
  360. else:
  361. nvs_log.info(
  362. f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}'
  363. )
  364. nvs_log.info()
  365. # Dump strings
  366. for string in strings:
  367. nvs_log.info(
  368. nvs_log.cyan(
  369. ns.get(string.metadata['namespace'], string.metadata['namespace'])
  370. )
  371. + ':'
  372. + nvs_log.yellow(string.key)
  373. + ' - '
  374. + 'Type: String, '
  375. + f'Size: {string.data["size"]}'
  376. )
  377. for i, entry in enumerate(string.children):
  378. nvs_log.info(f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}')
  379. nvs_log.info()
  380. # Dump legacy blobs
  381. for blob in legacy_blobs:
  382. nvs_log.info(
  383. nvs_log.cyan(ns.get(blob.metadata['namespace'], blob.metadata['namespace']))
  384. + ':'
  385. + nvs_log.yellow(blob.key)
  386. + ' - '
  387. + 'Type: Blob (Version 1), '
  388. + f'Size: {blob.data["size"]}'
  389. )
  390. for i, entry in enumerate(blob.children):
  391. nvs_log.info(f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}')
  392. nvs_log.info()
  393. def print_json(nvs: NVS_Partition) -> None:
  394. class NVSEncoder(json.JSONEncoder):
  395. def default(self, obj: Any) -> Union[Any, Dict[str, Any], str]:
  396. if hasattr(obj, 'toJSON'):
  397. return obj.toJSON()
  398. if isinstance(obj, bytearray):
  399. return binascii.b2a_base64(obj, newline=False).decode(
  400. 'ascii'
  401. ) # Binary to Base64 ASCII representation
  402. return json.JSONEncoder.default(self, obj)
  403. print(json.dumps(nvs.toJSON(), cls=NVSEncoder, indent=2))