nvs_parser.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #!/usr/bin/env python3
  2. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  3. # SPDX-License-Identifier: Apache-2.0
  4. from typing import Any, Dict, List, Optional
  5. from zlib import crc32
  6. # Constants
  7. class NVS_Constants:
  8. class ConstantError(AttributeError):
  9. pass
  10. def __init__(self) -> None:
  11. self.page_size = 4096
  12. self.entry_size = 32
  13. self.item_type = {
  14. 0x01: 'uint8_t',
  15. 0x11: 'int8_t',
  16. 0x02: 'uint16_t',
  17. 0x12: 'int16_t',
  18. 0x04: 'uint32_t',
  19. 0x14: 'int32_t',
  20. 0x08: 'uint64_t',
  21. 0x18: 'int64_t',
  22. 0x21: 'string',
  23. 0x41: 'blob',
  24. 0x42: 'blob_data',
  25. 0x48: 'blob_index',
  26. }
  27. self.page_status = {
  28. 0xFFFFFFFF: 'Empty',
  29. 0xFFFFFFFE: 'Active',
  30. 0xFFFFFFFC: 'Full',
  31. 0xFFFFFFF8: 'Erasing',
  32. 0x00000000: 'Corrupted',
  33. }
  34. self.entry_status = {
  35. 0b11: 'Empty',
  36. 0b10: 'Written',
  37. 0b00: 'Erased',
  38. }
  39. def __setattr__(self, key: str, val: Any) -> None:
  40. if self.__dict__.get(key, None) is None:
  41. self.__dict__[key] = val
  42. else:
  43. raise NVS_Constants.ConstantError('Cannot change a constant!')
  44. nvs_const = NVS_Constants()
  45. class NotAlignedError(ValueError):
  46. pass
  47. class NVS_Partition:
  48. def __init__(self, name: str, raw_data: bytearray):
  49. if len(raw_data) % nvs_const.page_size != 0:
  50. raise NotAlignedError(
  51. f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})'
  52. )
  53. # Divide partition into pages
  54. self.name = name
  55. self.pages = []
  56. for i in range(0, len(raw_data), nvs_const.page_size):
  57. self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i))
  58. def toJSON(self) -> Dict[str, Any]:
  59. return dict(name=self.name, pages=self.pages)
  60. class NVS_Page:
  61. def __init__(self, page_data: bytearray, address: int):
  62. if len(page_data) != nvs_const.page_size:
  63. raise NotAlignedError(
  64. f'Size of given page does not match page size ({len(page_data)} != {nvs_const.page_size})'
  65. )
  66. # Initialize class
  67. self.is_empty = (
  68. page_data[0: nvs_const.entry_size]
  69. == bytearray({0xFF}) * nvs_const.entry_size
  70. )
  71. self.start_address = address
  72. self.raw_header = page_data[0: nvs_const.entry_size]
  73. self.raw_entry_state_bitmap = page_data[
  74. nvs_const.entry_size: 2 * nvs_const.entry_size
  75. ]
  76. self.entries = []
  77. # Load header
  78. self.header: Dict[str, Any] = {
  79. 'status': nvs_const.page_status.get(
  80. int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'
  81. ),
  82. 'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
  83. 'version': 256 - page_data[8],
  84. 'crc': {
  85. 'original': int.from_bytes(page_data[28:32], byteorder='little'),
  86. 'computed': crc32(page_data[4:28], 0xFFFFFFFF),
  87. },
  88. }
  89. # Load entry state bitmap
  90. entry_states = []
  91. for c in self.raw_entry_state_bitmap:
  92. for index in range(0, 8, 2):
  93. entry_states.append(
  94. nvs_const.entry_status.get((c >> index) & 3, 'Invalid')
  95. )
  96. entry_states = entry_states[:-2]
  97. # Load entries
  98. i = 2
  99. while i < int(
  100. nvs_const.page_size / nvs_const.entry_size
  101. ): # Loop through every entry
  102. span = page_data[(i * nvs_const.entry_size) + 2]
  103. if span in [0xFF, 0]: # 'Default' span length to prevent span overflow
  104. span = 1
  105. # Load an entry
  106. entry = NVS_Entry(
  107. i - 2,
  108. page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size],
  109. entry_states[i - 2],
  110. )
  111. self.entries.append(entry)
  112. # Load all children entries
  113. if span != 1:
  114. for span_idx in range(1, span):
  115. page_addr = i + span_idx
  116. entry_idx = page_addr - 2
  117. if page_addr * nvs_const.entry_size >= nvs_const.page_size:
  118. break
  119. child_entry = NVS_Entry(
  120. entry_idx,
  121. page_data[
  122. page_addr
  123. * nvs_const.entry_size: (page_addr + 1)
  124. * nvs_const.entry_size
  125. ],
  126. entry_states[entry_idx],
  127. )
  128. entry.child_assign(child_entry)
  129. entry.compute_crc()
  130. i += span
  131. def toJSON(self) -> Dict[str, Any]:
  132. return dict(
  133. is_empty=self.is_empty,
  134. start_address=self.start_address,
  135. raw_header=self.raw_header,
  136. raw_entry_state_bitmap=self.raw_entry_state_bitmap,
  137. header=self.header,
  138. entries=self.entries,
  139. )
  140. class NVS_Entry:
  141. def __init__(self, index: int, entry_data: bytearray, entry_state: str):
  142. if len(entry_data) != nvs_const.entry_size:
  143. raise NotAlignedError(
  144. f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})'
  145. )
  146. def item_convert(i_type: int, data: bytearray) -> Dict:
  147. byte_size_mask = 0x0F
  148. number_sign_mask = 0xF0
  149. fixed_entry_length_threshold = (
  150. 0x20 # Fixed length entry type number is always smaller than this
  151. )
  152. if i_type in nvs_const.item_type:
  153. # Deal with non variable length entries
  154. if i_type < fixed_entry_length_threshold:
  155. size = i_type & byte_size_mask
  156. num = int.from_bytes(
  157. data[:size],
  158. byteorder='little',
  159. signed=bool(i_type & number_sign_mask),
  160. )
  161. return {'value': num}
  162. # Deal with variable length entries
  163. if nvs_const.item_type[i_type] in ['string', 'blob_data', 'blob']:
  164. size = int.from_bytes(data[:2], byteorder='little')
  165. crc = int.from_bytes(data[4:8], byteorder='little')
  166. return {'value': [size, crc], 'size': size, 'crc': crc}
  167. if nvs_const.item_type[i_type] == 'blob_index':
  168. size = int.from_bytes(data[:4], byteorder='little')
  169. chunk_count = data[4]
  170. chunk_start = data[5]
  171. return {
  172. 'value': [size, chunk_count, chunk_start],
  173. 'size': size,
  174. 'chunk_count': chunk_count,
  175. 'chunk_start': chunk_start,
  176. }
  177. return {'value': None}
  178. def key_decode(data: bytearray) -> Optional[str]:
  179. decoded = ''
  180. for n in data.rstrip(b'\x00'):
  181. char = chr(n)
  182. if char.isascii():
  183. decoded += char
  184. else:
  185. return None
  186. return decoded
  187. self.raw = entry_data
  188. self.state = entry_state
  189. self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size
  190. self.index = index
  191. namespace = self.raw[0]
  192. entry_type = self.raw[1]
  193. span = self.raw[2]
  194. chunk_index = self.raw[3]
  195. crc = self.raw[4:8]
  196. key = self.raw[8:24]
  197. data = self.raw[24:32]
  198. raw_without_crc = self.raw[:4] + self.raw[8:32]
  199. self.metadata: Dict[str, Any] = {
  200. 'namespace': namespace,
  201. 'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'),
  202. 'span': span,
  203. 'chunk_index': chunk_index,
  204. 'crc': {
  205. 'original': int.from_bytes(crc, byteorder='little'),
  206. 'computed': crc32(raw_without_crc, 0xFFFFFFFF),
  207. 'data_original': int.from_bytes(data[-4:], byteorder='little'),
  208. 'data_computed': 0,
  209. },
  210. }
  211. self.children: List['NVS_Entry'] = []
  212. self.key = key_decode(key)
  213. if self.key is None:
  214. self.data = None
  215. else:
  216. self.data = item_convert(entry_type, data)
  217. def dump_raw(self) -> str:
  218. hex_bytes = ''
  219. decoded = ''
  220. for i, c in enumerate(self.raw):
  221. middle_index = int(len(self.raw) / 2)
  222. if i == middle_index: # Add a space in the middle
  223. hex_bytes += ' '
  224. decoded += ' '
  225. hex_bytes += f'{c:02x} '
  226. decoded += chr(c) if chr(c).isprintable() else '.'
  227. return hex_bytes + ' ' + decoded
  228. def child_assign(self, entry: 'NVS_Entry') -> None:
  229. if not isinstance(entry, type(self)):
  230. raise ValueError('You can assign only NVS_Entry')
  231. self.children.append(entry)
  232. def compute_crc(self) -> None:
  233. if self.metadata['span'] == 1:
  234. return
  235. # Merge entries into one buffer
  236. children_data = bytearray()
  237. for entry in self.children:
  238. children_data += entry.raw
  239. if self.data:
  240. children_data = children_data[: self.data['size']] # Discard padding
  241. self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
  242. def toJSON(self) -> Dict[str, Any]:
  243. return dict(
  244. raw=self.raw,
  245. state=self.state,
  246. is_empty=self.is_empty,
  247. index=self.index,
  248. metadata=self.metadata,
  249. children=self.children,
  250. key=self.key,
  251. data=self.data,
  252. )