fs_object.py 15 KB


  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import os
  4. from datetime import datetime
  5. from typing import List, Optional, Tuple, Union
  6. from .entry import Entry
  7. from .exceptions import FatalError, WriteDirectoryException
  8. from .fat import FAT, Cluster
  9. from .fatfs_state import FATFSState
  10. from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order,
  11. get_required_lfn_entries_count, split_name_to_lfn_entries,
  12. split_name_to_lfn_entry_blocks)
  13. from .utils import (DATETIME, INVALID_SFN_CHARS_PATTERN, MAX_EXT_SIZE, MAX_NAME_SIZE, FATDefaults,
  14. build_lfn_short_entry_name, build_name, lfn_checksum, required_clusters_count,
  15. split_content_into_sectors, split_to_name_and_extension)
  16. class File:
  17. """
  18. The class File provides API to write into the files. It represents file in the FS.
  19. """
  20. ATTR_ARCHIVE: int = 0x20
  21. ENTITY_TYPE: int = ATTR_ARCHIVE
  22. def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
  23. self.name: str = name
  24. self.extension: str = extension
  25. self.fatfs_state: FATFSState = fatfs_state
  26. self.fat: FAT = fat
  27. self.size: int = 0
  28. self._first_cluster: Optional[Cluster] = None
  29. self._entry: Entry = entry
  30. @property
  31. def entry(self) -> Entry:
  32. return self._entry
  33. @property
  34. def first_cluster(self) -> Optional[Cluster]:
  35. return self._first_cluster
  36. @first_cluster.setter
  37. def first_cluster(self, value: Cluster) -> None:
  38. self._first_cluster = value
  39. def name_equals(self, name: str, extension: str) -> bool:
  40. equals_: bool = build_name(name, extension) == build_name(self.name, self.extension)
  41. return equals_
  42. def write(self, content: bytes) -> None:
  43. self.entry.update_content_size(len(content))
  44. # we assume that the correct amount of clusters is allocated
  45. current_cluster = self._first_cluster
  46. for content_part in split_content_into_sectors(content, self.fatfs_state.boot_sector_state.sector_size):
  47. content_as_list = content_part
  48. if current_cluster is None:
  49. raise FatalError('No free space left!')
  50. address: int = current_cluster.cluster_data_address
  51. self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
  52. current_cluster = current_cluster.next_cluster
  53. class Directory:
  54. """
  55. The Directory class provides API to add files and directories into the directory
  56. and to find the file according to path and write it.
  57. """
  58. ATTR_DIRECTORY: int = 0x10
  59. ATTR_ARCHIVE: int = 0x20
  60. ENTITY_TYPE: int = ATTR_DIRECTORY
  61. CURRENT_DIRECTORY = '.'
  62. PARENT_DIRECTORY = '..'
  63. def __init__(self,
  64. name,
  65. fat,
  66. fatfs_state,
  67. entry=None,
  68. cluster=None,
  69. size=None,
  70. extension='',
  71. parent=None):
  72. # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
  73. self.name: str = name
  74. self.fatfs_state: FATFSState = fatfs_state
  75. self.extension: str = extension
  76. self.fat: FAT = fat
  77. self.size: int = size or self.fatfs_state.boot_sector_state.sector_size
  78. # if directory is root its parent is itself
  79. self.parent: Directory = parent or self
  80. self._first_cluster: Cluster = cluster
  81. # entries will be initialized after the cluster allocation
  82. self.entries: List[Entry] = []
  83. self.entities: List[Union[File, Directory]] = [] # type: ignore
  84. self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
  85. @property
  86. def is_root(self) -> bool:
  87. return self.parent is self
  88. @property
  89. def first_cluster(self) -> Cluster:
  90. return self._first_cluster
  91. @first_cluster.setter
  92. def first_cluster(self, value: Cluster) -> None:
  93. self._first_cluster = value
  94. def name_equals(self, name: str, extension: str) -> bool:
  95. equals_: bool = build_name(name, extension) == build_name(self.name, self.extension)
  96. return equals_
  97. @property
  98. def entries_count(self) -> int:
  99. entries_count_: int = self.size // FATDefaults.ENTRY_SIZE
  100. return entries_count_
  101. def create_entries(self, cluster: Cluster) -> List[Entry]:
  102. return [Entry(entry_id=i,
  103. parent_dir_entries_address=cluster.cluster_data_address,
  104. fatfs_state=self.fatfs_state)
  105. for i in range(self.entries_count)]
  106. def init_directory(self) -> None:
  107. self.entries = self.create_entries(self._first_cluster)
  108. # the root directory doesn't contain link to itself nor the parent
  109. if self.is_root:
  110. return
  111. # if the directory is not root we initialize the reference to itself and to the parent directory
  112. for dir_id, name_ in ((self, self.CURRENT_DIRECTORY), (self.parent, self.PARENT_DIRECTORY)):
  113. new_dir_: Entry = self.find_free_entry() or self.chain_directory()
  114. new_dir_.allocate_entry(first_cluster_id=dir_id.first_cluster.id,
  115. entity_name=name_,
  116. entity_extension='',
  117. entity_type=dir_id.ENTITY_TYPE)
  118. def lookup_entity(self, object_name: str, extension: str): # type: ignore
  119. for entity in self.entities:
  120. if build_name(entity.name, entity.extension) == build_name(object_name, extension):
  121. return entity
  122. return None
  123. @staticmethod
  124. def _is_end_of_path(path_as_list: List[str]) -> bool:
  125. """
  126. :param path_as_list: path split into the list
  127. :returns: True if the file is the leaf of the directory tree, False otherwise
  128. The method is part of the base of recursion,
  129. determines if the path is target file or directory in the tree folder structure.
  130. """
  131. return len(path_as_list) == 1
  132. def recursive_search(self, path_as_list, current_dir): # type: ignore
  133. name, extension = split_to_name_and_extension(path_as_list[0])
  134. next_obj = current_dir.lookup_entity(name, extension)
  135. if next_obj is None:
  136. raise FileNotFoundError('No such file or directory!')
  137. if self._is_end_of_path(path_as_list) and next_obj.name_equals(name, extension):
  138. return next_obj
  139. return self.recursive_search(path_as_list[1:], next_obj)
  140. def find_free_entry(self) -> Optional[Entry]:
  141. for entry in self.entries:
  142. if entry.is_empty:
  143. return entry
  144. return None
  145. def _extend_directory(self) -> None:
  146. current: Cluster = self.first_cluster
  147. while current.next_cluster is not None:
  148. current = current.next_cluster
  149. new_cluster: Cluster = self.fat.find_free_cluster()
  150. current.set_in_fat(new_cluster.id)
  151. assert current is not new_cluster
  152. current.next_cluster = new_cluster
  153. self.entries += self.create_entries(new_cluster)
  154. def chain_directory(self) -> Entry:
  155. """
  156. :returns: First free entry
  157. The method adds new Cluster to the Directory and returns first free entry.
  158. """
  159. self._extend_directory()
  160. free_entry: Entry = self.find_free_entry()
  161. if free_entry is None:
  162. raise FatalError('No more space left!')
  163. return free_entry
  164. @staticmethod
  165. def allocate_long_name_object(free_entry,
  166. name,
  167. extension,
  168. target_dir,
  169. free_cluster_id,
  170. entity_type,
  171. date,
  172. time):
  173. # type: (Entry, str, str, Directory, int, int, DATETIME, DATETIME) -> Entry
  174. lfn_full_name: str = build_lfn_full_name(name, extension)
  175. lfn_unique_entry_order: int = build_lfn_unique_entry_name_order(target_dir.entities, name)
  176. lfn_short_entry_name: str = build_lfn_short_entry_name(name, extension, lfn_unique_entry_order)
  177. checksum: int = lfn_checksum(lfn_short_entry_name)
  178. entries_count: int = get_required_lfn_entries_count(lfn_full_name)
  179. # entries in long file name entries chain starts with the last entry
  180. split_names_reversed = list(reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count)))))
  181. for i, name_split_to_entry in split_names_reversed:
  182. order: int = i + 1
  183. blocks_: List[bytes] = split_name_to_lfn_entry_blocks(name_split_to_entry)
  184. lfn_names: List[bytes] = list(map(lambda x: x.lower(), blocks_))
  185. free_entry.allocate_entry(first_cluster_id=free_cluster_id,
  186. entity_name=name,
  187. entity_extension=extension,
  188. entity_type=entity_type,
  189. lfn_order=order,
  190. lfn_names=lfn_names,
  191. lfn_checksum_=checksum,
  192. lfn_is_last=order == entries_count)
  193. free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
  194. free_entry.allocate_entry(first_cluster_id=free_cluster_id,
  195. entity_name=lfn_short_entry_name[:MAX_NAME_SIZE],
  196. entity_extension=lfn_short_entry_name[MAX_NAME_SIZE:],
  197. entity_type=entity_type,
  198. lfn_order=Entry.SHORT_ENTRY_LN,
  199. date=date,
  200. time=time)
  201. return free_entry
  202. @staticmethod
  203. def _is_valid_sfn(name: str, extension: str) -> bool:
  204. if INVALID_SFN_CHARS_PATTERN.search(name) or INVALID_SFN_CHARS_PATTERN.search(name):
  205. return False
  206. ret: bool = len(name) <= MAX_NAME_SIZE and len(extension) <= MAX_EXT_SIZE
  207. return ret
  208. def allocate_object(self,
  209. name,
  210. entity_type,
  211. object_timestamp_,
  212. path_from_root=None,
  213. extension='',
  214. is_empty=False):
  215. # type: (str, int, datetime, Optional[List[str]], str, bool) -> Tuple[Cluster, Entry, Directory]
  216. """
  217. Method finds the target directory in the path
  218. and allocates cluster (both the record in FAT and cluster in the data region)
  219. and entry in the specified directory
  220. """
  221. free_cluster: Optional[Cluster] = None
  222. free_cluster_id = 0x00
  223. if not is_empty:
  224. free_cluster = self.fat.find_free_cluster()
  225. free_cluster_id = free_cluster.id
  226. target_dir: Directory = self if not path_from_root else self.recursive_search(path_from_root, self)
  227. free_entry: Entry = target_dir.find_free_entry() or target_dir.chain_directory()
  228. fatfs_date_ = (object_timestamp_.year, object_timestamp_.month, object_timestamp_.day)
  229. fatfs_time_ = (object_timestamp_.hour, object_timestamp_.minute, object_timestamp_.second)
  230. if not self.fatfs_state.long_names_enabled or self._is_valid_sfn(name, extension):
  231. free_entry.allocate_entry(first_cluster_id=free_cluster_id,
  232. entity_name=name,
  233. entity_extension=extension,
  234. date=fatfs_date_,
  235. time=fatfs_time_,
  236. fits_short=True,
  237. entity_type=entity_type)
  238. return free_cluster, free_entry, target_dir
  239. return free_cluster, self.allocate_long_name_object(free_entry=free_entry,
  240. name=name,
  241. extension=extension,
  242. target_dir=target_dir,
  243. free_cluster_id=free_cluster_id,
  244. entity_type=entity_type,
  245. date=fatfs_date_,
  246. time=fatfs_time_), target_dir
  247. def new_file(self,
  248. name: str,
  249. extension: str,
  250. path_from_root: Optional[List[str]],
  251. object_timestamp_: datetime,
  252. is_empty: bool) -> None:
  253. free_cluster, free_entry, target_dir = self.allocate_object(name=name,
  254. extension=extension,
  255. entity_type=Directory.ATTR_ARCHIVE,
  256. path_from_root=path_from_root,
  257. object_timestamp_=object_timestamp_,
  258. is_empty=is_empty)
  259. file: File = File(name=name,
  260. fat=self.fat,
  261. extension=extension,
  262. fatfs_state=self.fatfs_state,
  263. entry=free_entry)
  264. file.first_cluster = free_cluster
  265. target_dir.entities.append(file)
  266. def new_directory(self, name, parent, path_from_root, object_timestamp_):
  267. # type: (str, Directory, Optional[List[str]], datetime) -> None
  268. free_cluster, free_entry, target_dir = self.allocate_object(name=name,
  269. entity_type=Directory.ATTR_DIRECTORY,
  270. path_from_root=path_from_root,
  271. object_timestamp_=object_timestamp_)
  272. directory: Directory = Directory(name=name,
  273. fat=self.fat,
  274. parent=parent,
  275. fatfs_state=self.fatfs_state,
  276. entry=free_entry)
  277. directory.first_cluster = free_cluster
  278. directory.init_directory()
  279. target_dir.entities.append(directory)
  280. def write_to_file(self, path: List[str], content: bytes) -> None:
  281. """
  282. Writes to file existing in the directory structure.
  283. :param path: path split into the list
  284. :param content: content as a string to be written into a file
  285. :returns: None
  286. :raises WriteDirectoryException: raised is the target object for writing is a directory
  287. """
  288. entity_to_write: Entry = self.recursive_search(path, self)
  289. if isinstance(entity_to_write, File):
  290. clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.boot_sector_state.sector_size,
  291. content=content)
  292. self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
  293. entity_to_write.write(content)
  294. else:
  295. raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')