fs_object.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import os
  4. from datetime import datetime
  5. from typing import List, Optional, Tuple, Union
  6. from .entry import Entry
  7. from .exceptions import FatalError, WriteDirectoryException
  8. from .fat import FAT, Cluster
  9. from .fatfs_state import FATFSState
  10. from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order,
  11. get_required_lfn_entries_count, split_name_to_lfn_entries,
  12. split_name_to_lfn_entry_blocks)
  13. from .utils import (DATETIME, MAX_EXT_SIZE, MAX_NAME_SIZE, FATDefaults, build_lfn_short_entry_name, lfn_checksum,
  14. required_clusters_count, split_content_into_sectors, split_to_name_and_extension)
  15. class File:
  16. """
  17. The class File provides API to write into the files. It represents file in the FS.
  18. """
  19. ATTR_ARCHIVE: int = 0x20
  20. ENTITY_TYPE: int = ATTR_ARCHIVE
  21. def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
  22. self.name: str = name
  23. self.extension: str = extension
  24. self.fatfs_state: FATFSState = fatfs_state
  25. self.fat: FAT = fat
  26. self.size: int = 0
  27. self._first_cluster: Optional[Cluster] = None
  28. self._entry: Entry = entry
  29. @property
  30. def entry(self) -> Entry:
  31. return self._entry
  32. @property
  33. def first_cluster(self) -> Optional[Cluster]:
  34. return self._first_cluster
  35. @first_cluster.setter
  36. def first_cluster(self, value: Cluster) -> None:
  37. self._first_cluster = value
  38. def name_equals(self, name: str, extension: str) -> bool:
  39. return self.name == name and self.extension == extension
  40. def write(self, content: bytes) -> None:
  41. self.entry.update_content_size(len(content))
  42. # we assume that the correct amount of clusters is allocated
  43. current_cluster = self._first_cluster
  44. for content_part in split_content_into_sectors(content, self.fatfs_state.boot_sector_state.sector_size):
  45. content_as_list = content_part
  46. if current_cluster is None:
  47. raise FatalError('No free space left!')
  48. address: int = current_cluster.cluster_data_address
  49. self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
  50. current_cluster = current_cluster.next_cluster
  51. class Directory:
  52. """
  53. The Directory class provides API to add files and directories into the directory
  54. and to find the file according to path and write it.
  55. """
  56. ATTR_DIRECTORY: int = 0x10
  57. ATTR_ARCHIVE: int = 0x20
  58. ENTITY_TYPE: int = ATTR_DIRECTORY
  59. CURRENT_DIRECTORY = '.'
  60. PARENT_DIRECTORY = '..'
  61. def __init__(self,
  62. name,
  63. fat,
  64. fatfs_state,
  65. entry=None,
  66. cluster=None,
  67. size=None,
  68. extension='',
  69. parent=None):
  70. # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
  71. self.name: str = name
  72. self.fatfs_state: FATFSState = fatfs_state
  73. self.extension: str = extension
  74. self.fat: FAT = fat
  75. self.size: int = size or self.fatfs_state.boot_sector_state.sector_size
  76. # if directory is root its parent is itself
  77. self.parent: Directory = parent or self
  78. self._first_cluster: Cluster = cluster
  79. # entries will be initialized after the cluster allocation
  80. self.entries: List[Entry] = []
  81. self.entities: List[Union[File, Directory]] = [] # type: ignore
  82. self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
  83. @property
  84. def is_root(self) -> bool:
  85. return self.parent is self
  86. @property
  87. def first_cluster(self) -> Cluster:
  88. return self._first_cluster
  89. @first_cluster.setter
  90. def first_cluster(self, value: Cluster) -> None:
  91. self._first_cluster = value
  92. def name_equals(self, name: str, extension: str) -> bool:
  93. return self.name == name and self.extension == extension
  94. @property
  95. def entries_count(self) -> int:
  96. entries_count_: int = self.size // FATDefaults.ENTRY_SIZE
  97. return entries_count_
  98. def create_entries(self, cluster: Cluster) -> List[Entry]:
  99. return [Entry(entry_id=i,
  100. parent_dir_entries_address=cluster.cluster_data_address,
  101. fatfs_state=self.fatfs_state)
  102. for i in range(self.entries_count)]
  103. def init_directory(self) -> None:
  104. self.entries = self.create_entries(self._first_cluster)
  105. # the root directory doesn't contain link to itself nor the parent
  106. if self.is_root:
  107. return
  108. # if the directory is not root we initialize the reference to itself and to the parent directory
  109. for dir_id, name_ in ((self, self.CURRENT_DIRECTORY), (self.parent, self.PARENT_DIRECTORY)):
  110. new_dir_: Entry = self.find_free_entry() or self.chain_directory()
  111. new_dir_.allocate_entry(first_cluster_id=dir_id.first_cluster.id,
  112. entity_name=name_,
  113. entity_extension='',
  114. entity_type=dir_id.ENTITY_TYPE)
  115. def lookup_entity(self, object_name: str, extension: str): # type: ignore
  116. for entity in self.entities:
  117. if entity.name == object_name and entity.extension == extension:
  118. return entity
  119. return None
  120. @staticmethod
  121. def _if_end_of_path(path_as_list: List[str]) -> bool:
  122. """
  123. :param path_as_list: path split into the list
  124. :returns: True if the file is the leaf of the directory tree, False otherwise
  125. The method is part of the base of recursion,
  126. determines if the path is target file or directory in the tree folder structure.
  127. """
  128. return len(path_as_list) == 1
  129. def recursive_search(self, path_as_list, current_dir): # type: ignore
  130. name, extension = split_to_name_and_extension(path_as_list[0])
  131. next_obj = current_dir.lookup_entity(name, extension)
  132. if next_obj is None:
  133. raise FileNotFoundError('No such file or directory!')
  134. if self._if_end_of_path(path_as_list) and next_obj.name_equals(name, extension):
  135. return next_obj
  136. return self.recursive_search(path_as_list[1:], next_obj)
  137. def find_free_entry(self) -> Optional[Entry]:
  138. for entry in self.entries:
  139. if entry.is_empty:
  140. return entry
  141. return None
  142. def _extend_directory(self) -> None:
  143. current: Cluster = self.first_cluster
  144. while current.next_cluster is not None:
  145. current = current.next_cluster
  146. new_cluster: Cluster = self.fat.find_free_cluster()
  147. current.set_in_fat(new_cluster.id)
  148. assert current is not new_cluster
  149. current.next_cluster = new_cluster
  150. self.entries += self.create_entries(new_cluster)
  151. def chain_directory(self) -> Entry:
  152. """
  153. :returns: First free entry
  154. The method adds new Cluster to the Directory and returns first free entry.
  155. """
  156. self._extend_directory()
  157. free_entry: Entry = self.find_free_entry()
  158. if free_entry is None:
  159. raise FatalError('No more space left!')
  160. return free_entry
  161. @staticmethod
  162. def allocate_long_name_object(free_entry,
  163. name,
  164. extension,
  165. target_dir,
  166. free_cluster,
  167. entity_type,
  168. date,
  169. time):
  170. # type: (Entry, str, str, Directory, Cluster, int, DATETIME, DATETIME) -> Tuple[Cluster, Entry, Directory]
  171. lfn_full_name: str = build_lfn_full_name(name, extension)
  172. lfn_unique_entry_order: int = build_lfn_unique_entry_name_order(target_dir.entities, name)
  173. lfn_short_entry_name: str = build_lfn_short_entry_name(name, extension, lfn_unique_entry_order)
  174. checksum: int = lfn_checksum(lfn_short_entry_name)
  175. entries_count: int = get_required_lfn_entries_count(lfn_full_name)
  176. # entries in long file name entries chain starts with the last entry
  177. split_names_reversed = reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count))))
  178. for i, name_split_to_entry in split_names_reversed:
  179. order: int = i + 1
  180. lfn_names: List[bytes] = list(
  181. map(lambda x: x.lower(), split_name_to_lfn_entry_blocks(name_split_to_entry))) # type: ignore
  182. free_entry.allocate_entry(first_cluster_id=free_cluster.id,
  183. entity_name=name,
  184. entity_extension=extension,
  185. entity_type=entity_type,
  186. lfn_order=order,
  187. lfn_names=lfn_names,
  188. lfn_checksum_=checksum,
  189. lfn_is_last=order == entries_count)
  190. free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
  191. free_entry.allocate_entry(first_cluster_id=free_cluster.id,
  192. entity_name=lfn_short_entry_name[:MAX_NAME_SIZE],
  193. entity_extension=lfn_short_entry_name[MAX_NAME_SIZE:],
  194. entity_type=entity_type,
  195. lfn_order=Entry.SHORT_ENTRY_LN,
  196. date=date,
  197. time=time)
  198. return free_cluster, free_entry, target_dir
  199. def allocate_object(self,
  200. name,
  201. entity_type,
  202. object_timestamp_,
  203. path_from_root=None,
  204. extension=''):
  205. # type: (str, int, datetime, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory]
  206. """
  207. Method finds the target directory in the path
  208. and allocates cluster (both the record in FAT and cluster in the data region)
  209. and entry in the specified directory
  210. """
  211. free_cluster: Cluster = self.fat.find_free_cluster()
  212. target_dir: Directory = self if not path_from_root else self.recursive_search(path_from_root, self)
  213. free_entry: Entry = target_dir.find_free_entry() or target_dir.chain_directory()
  214. name_fits_short_struct: bool = len(name) <= MAX_NAME_SIZE and len(extension) <= MAX_EXT_SIZE
  215. fatfs_date_ = (object_timestamp_.year, object_timestamp_.month, object_timestamp_.day)
  216. fatfs_time_ = (object_timestamp_.hour, object_timestamp_.minute, object_timestamp_.second)
  217. if not self.fatfs_state.long_names_enabled or name_fits_short_struct:
  218. free_entry.allocate_entry(first_cluster_id=free_cluster.id,
  219. entity_name=name,
  220. entity_extension=extension,
  221. date=fatfs_date_,
  222. time=fatfs_time_,
  223. entity_type=entity_type)
  224. return free_cluster, free_entry, target_dir
  225. return self.allocate_long_name_object(free_entry=free_entry,
  226. name=name,
  227. extension=extension,
  228. target_dir=target_dir,
  229. free_cluster=free_cluster,
  230. entity_type=entity_type,
  231. date=fatfs_date_,
  232. time=fatfs_time_)
  233. def new_file(self,
  234. name: str,
  235. extension: str,
  236. path_from_root: Optional[List[str]],
  237. object_timestamp_: datetime) -> None:
  238. free_cluster, free_entry, target_dir = self.allocate_object(name=name,
  239. extension=extension,
  240. entity_type=Directory.ATTR_ARCHIVE,
  241. path_from_root=path_from_root,
  242. object_timestamp_=object_timestamp_)
  243. file: File = File(name=name,
  244. fat=self.fat,
  245. extension=extension,
  246. fatfs_state=self.fatfs_state,
  247. entry=free_entry)
  248. file.first_cluster = free_cluster
  249. target_dir.entities.append(file)
  250. def new_directory(self, name, parent, path_from_root, object_timestamp_):
  251. # type: (str, Directory, Optional[List[str]], datetime) -> None
  252. free_cluster, free_entry, target_dir = self.allocate_object(name=name,
  253. entity_type=Directory.ATTR_DIRECTORY,
  254. path_from_root=path_from_root,
  255. object_timestamp_=object_timestamp_)
  256. directory: Directory = Directory(name=name,
  257. fat=self.fat,
  258. parent=parent,
  259. fatfs_state=self.fatfs_state,
  260. entry=free_entry)
  261. directory.first_cluster = free_cluster
  262. directory.init_directory()
  263. target_dir.entities.append(directory)
  264. def write_to_file(self, path: List[str], content: bytes) -> None:
  265. """
  266. Writes to file existing in the directory structure.
  267. :param path: path split into the list
  268. :param content: content as a string to be written into a file
  269. :returns: None
  270. :raises WriteDirectoryException: raised is the target object for writing is a directory
  271. """
  272. entity_to_write: Entry = self.recursive_search(path, self)
  273. if isinstance(entity_to_write, File):
  274. clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.boot_sector_state.sector_size,
  275. content=content)
  276. self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
  277. entity_to_write.write(content)
  278. else:
  279. raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')