fs_object.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import os
  4. from typing import List, Optional, Tuple
  5. from .entry import Entry
  6. from .exceptions import FatalError, WriteDirectoryException
  7. from .fat import FAT, Cluster
  8. from .fatfs_state import FATFSState
  9. from .utils import required_clusters_count, split_content_into_sectors, split_to_name_and_extension
  10. class File:
  11. """
  12. The class File provides API to write into the files. It represents file in the FS.
  13. """
  14. ATTR_ARCHIVE = 0x20
  15. ENTITY_TYPE = ATTR_ARCHIVE
  16. def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
  17. self.name = name
  18. self.extension = extension
  19. self.fatfs_state = fatfs_state
  20. self.fat = fat
  21. self.size = 0
  22. self._first_cluster = None
  23. self._entry = entry
  24. @property
  25. def entry(self) -> Entry:
  26. return self._entry
  27. @property
  28. def first_cluster(self) -> Optional[Cluster]:
  29. return self._first_cluster
  30. @first_cluster.setter
  31. def first_cluster(self, value: Cluster) -> None:
  32. self._first_cluster = value
  33. def name_equals(self, name: str, extension: str) -> bool:
  34. return self.name == name and self.extension == extension
  35. def write(self, content: bytes) -> None:
  36. self.entry.update_content_size(len(content))
  37. # we assume that the correct amount of clusters is allocated
  38. current_cluster = self._first_cluster
  39. for content_part in split_content_into_sectors(content, self.fatfs_state.sector_size):
  40. content_as_list = content_part
  41. if current_cluster is None:
  42. raise FatalError('No free space left!')
  43. address = current_cluster.cluster_data_address
  44. self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
  45. current_cluster = current_cluster.next_cluster
  46. class Directory:
  47. """
  48. The Directory class provides API to add files and directories into the directory
  49. and to find the file according to path and write it.
  50. """
  51. ATTR_DIRECTORY = 0x10
  52. ATTR_ARCHIVE = 0x20
  53. ENTITY_TYPE = ATTR_DIRECTORY
  54. def __init__(self,
  55. name,
  56. fat,
  57. fatfs_state,
  58. entry=None,
  59. cluster=None,
  60. size=None,
  61. extension='',
  62. parent=None):
  63. # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
  64. self.name = name
  65. self.fatfs_state = fatfs_state
  66. self.extension = extension
  67. self.fat = fat
  68. self.size = size or self.fatfs_state.sector_size
  69. # if directory is root its parent is itself
  70. self.parent: Directory = parent or self
  71. self._first_cluster = cluster
  72. # entries will be initialized after the cluster allocation
  73. self.entries: List[Entry] = []
  74. self.entities = [] # type: ignore
  75. self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
  76. @property
  77. def is_root(self) -> bool:
  78. return self.parent is self
  79. @property
  80. def first_cluster(self) -> Cluster:
  81. return self._first_cluster
  82. @first_cluster.setter
  83. def first_cluster(self, value: Cluster) -> None:
  84. self._first_cluster = value
  85. def name_equals(self, name: str, extension: str) -> bool:
  86. return self.name == name and self.extension == extension
  87. def create_entries(self, cluster: Cluster) -> list:
  88. return [Entry(entry_id=i,
  89. parent_dir_entries_address=cluster.cluster_data_address,
  90. fatfs_state=self.fatfs_state)
  91. for i in range(self.size // self.fatfs_state.entry_size)]
  92. def init_directory(self) -> None:
  93. self.entries = self.create_entries(self._first_cluster)
  94. if not self.is_root:
  95. # the root directory doesn't contain link to itself nor the parent
  96. free_entry1 = self.find_free_entry() or self.chain_directory()
  97. free_entry1.allocate_entry(first_cluster_id=self.first_cluster.id,
  98. entity_name='.',
  99. entity_extension='',
  100. entity_type=self.ENTITY_TYPE)
  101. self.first_cluster = self._first_cluster
  102. free_entry2 = self.find_free_entry() or self.chain_directory()
  103. free_entry2.allocate_entry(first_cluster_id=self.parent.first_cluster.id,
  104. entity_name='..',
  105. entity_extension='',
  106. entity_type=self.parent.ENTITY_TYPE)
  107. self.parent.first_cluster = self.parent.first_cluster
  108. def lookup_entity(self, object_name: str, extension: str): # type: ignore
  109. for entity in self.entities:
  110. if entity.name == object_name and entity.extension == extension:
  111. return entity
  112. return None
  113. def recursive_search(self, path_as_list, current_dir): # type: ignore
  114. name, extension = split_to_name_and_extension(path_as_list[0])
  115. next_obj = current_dir.lookup_entity(name, extension)
  116. if next_obj is None:
  117. raise FileNotFoundError('No such file or directory!')
  118. if len(path_as_list) == 1 and next_obj.name_equals(name, extension):
  119. return next_obj
  120. return self.recursive_search(path_as_list[1:], next_obj)
  121. def find_free_entry(self) -> Optional[Entry]:
  122. for entry in self.entries:
  123. if entry.is_empty:
  124. return entry
  125. return None
  126. def _extend_directory(self) -> None:
  127. current = self.first_cluster
  128. while current.next_cluster is not None:
  129. current = current.next_cluster
  130. new_cluster = self.fat.find_free_cluster()
  131. current.set_in_fat(new_cluster.id)
  132. current.next_cluster = new_cluster
  133. self.entries += self.create_entries(new_cluster)
  134. def chain_directory(self) -> Entry:
  135. self._extend_directory()
  136. free_entry = self.find_free_entry()
  137. if free_entry is None:
  138. raise FatalError('No more space left!')
  139. return free_entry
  140. def allocate_object(self,
  141. name,
  142. entity_type,
  143. path_from_root=None,
  144. extension=''):
  145. # type: (str, int, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory]
  146. """
  147. Method finds the target directory in the path
  148. and allocates cluster (both the record in FAT and cluster in the data region)
  149. and entry in the specified directory
  150. """
  151. free_cluster = self.fat.find_free_cluster()
  152. target_dir = self if not path_from_root else self.recursive_search(path_from_root, self)
  153. free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
  154. free_entry.allocate_entry(first_cluster_id=free_cluster.id,
  155. entity_name=name,
  156. entity_extension=extension,
  157. entity_type=entity_type)
  158. return free_cluster, free_entry, target_dir
  159. def new_file(self, name: str, extension: str, path_from_root: Optional[List[str]]) -> None:
  160. free_cluster, free_entry, target_dir = self.allocate_object(name=name,
  161. extension=extension,
  162. entity_type=Directory.ATTR_ARCHIVE,
  163. path_from_root=path_from_root)
  164. file = File(name, fat=self.fat, extension=extension, fatfs_state=self.fatfs_state, entry=free_entry)
  165. file.first_cluster = free_cluster
  166. target_dir.entities.append(file)
  167. def new_directory(self, name, parent, path_from_root):
  168. # type: (str, Directory, Optional[List[str]]) -> None
  169. free_cluster, free_entry, target_dir = self.allocate_object(name=name,
  170. entity_type=Directory.ATTR_DIRECTORY,
  171. path_from_root=path_from_root)
  172. directory = Directory(name=name, fat=self.fat, parent=parent, fatfs_state=self.fatfs_state, entry=free_entry)
  173. directory.first_cluster = free_cluster
  174. directory.init_directory()
  175. target_dir.entities.append(directory)
  176. def write_to_file(self, path: List[str], content: bytes) -> None:
  177. """
  178. Writes to file existing in the directory structure.
  179. :param path: path split into the list
  180. :param content: content as a string to be written into a file
  181. :returns: None
  182. :raises WriteDirectoryException: raised is the target object for writing is a directory
  183. """
  184. entity_to_write = self.recursive_search(path, self)
  185. if isinstance(entity_to_write, File):
  186. clusters_cnt = required_clusters_count(cluster_size=self.fatfs_state.sector_size, content=content)
  187. self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
  188. entity_to_write.write(content)
  189. else:
  190. raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')