| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Apache-2.0
- import os
- from datetime import datetime
- from typing import List, Optional, Tuple, Union
- from .entry import Entry
- from .exceptions import FatalError, WriteDirectoryException
- from .fat import FAT, Cluster
- from .fatfs_state import FATFSState
- from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order,
- get_required_lfn_entries_count, split_name_to_lfn_entries,
- split_name_to_lfn_entry_blocks)
- from .utils import (DATETIME, INVALID_SFN_CHARS_PATTERN, MAX_EXT_SIZE, MAX_NAME_SIZE, FATDefaults,
- build_lfn_short_entry_name, build_name, lfn_checksum, required_clusters_count,
- split_content_into_sectors, split_to_name_and_extension)
- class File:
- """
- The class File provides API to write into the files. It represents file in the FS.
- """
- ATTR_ARCHIVE: int = 0x20
- ENTITY_TYPE: int = ATTR_ARCHIVE
- def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
- self.name: str = name
- self.extension: str = extension
- self.fatfs_state: FATFSState = fatfs_state
- self.fat: FAT = fat
- self.size: int = 0
- self._first_cluster: Optional[Cluster] = None
- self._entry: Entry = entry
- @property
- def entry(self) -> Entry:
- return self._entry
- @property
- def first_cluster(self) -> Optional[Cluster]:
- return self._first_cluster
- @first_cluster.setter
- def first_cluster(self, value: Cluster) -> None:
- self._first_cluster = value
- def name_equals(self, name: str, extension: str) -> bool:
- equals_: bool = build_name(name, extension) == build_name(self.name, self.extension)
- return equals_
- def write(self, content: bytes) -> None:
- self.entry.update_content_size(len(content))
- # we assume that the correct amount of clusters is allocated
- current_cluster = self._first_cluster
- for content_part in split_content_into_sectors(content, self.fatfs_state.boot_sector_state.sector_size):
- content_as_list = content_part
- if current_cluster is None:
- raise FatalError('No free space left!')
- address: int = current_cluster.cluster_data_address
- self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
- current_cluster = current_cluster.next_cluster
- class Directory:
- """
- The Directory class provides API to add files and directories into the directory
- and to find the file according to path and write it.
- """
- ATTR_DIRECTORY: int = 0x10
- ATTR_ARCHIVE: int = 0x20
- ENTITY_TYPE: int = ATTR_DIRECTORY
- CURRENT_DIRECTORY = '.'
- PARENT_DIRECTORY = '..'
- def __init__(self,
- name,
- fat,
- fatfs_state,
- entry=None,
- cluster=None,
- size=None,
- extension='',
- parent=None):
- # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
- self.name: str = name
- self.fatfs_state: FATFSState = fatfs_state
- self.extension: str = extension
- self.fat: FAT = fat
- self.size: int = size or self.fatfs_state.boot_sector_state.sector_size
- # if directory is root its parent is itself
- self.parent: Directory = parent or self
- self._first_cluster: Cluster = cluster
- # entries will be initialized after the cluster allocation
- self.entries: List[Entry] = []
- self.entities: List[Union[File, Directory]] = [] # type: ignore
- self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
- @property
- def is_root(self) -> bool:
- return self.parent is self
- @property
- def first_cluster(self) -> Cluster:
- return self._first_cluster
- @first_cluster.setter
- def first_cluster(self, value: Cluster) -> None:
- self._first_cluster = value
- def name_equals(self, name: str, extension: str) -> bool:
- equals_: bool = build_name(name, extension) == build_name(self.name, self.extension)
- return equals_
- @property
- def entries_count(self) -> int:
- entries_count_: int = self.size // FATDefaults.ENTRY_SIZE
- return entries_count_
- def create_entries(self, cluster: Cluster) -> List[Entry]:
- return [Entry(entry_id=i,
- parent_dir_entries_address=cluster.cluster_data_address,
- fatfs_state=self.fatfs_state)
- for i in range(self.entries_count)]
- def init_directory(self) -> None:
- self.entries = self.create_entries(self._first_cluster)
- # the root directory doesn't contain link to itself nor the parent
- if self.is_root:
- return
- # if the directory is not root we initialize the reference to itself and to the parent directory
- for dir_id, name_ in ((self, self.CURRENT_DIRECTORY), (self.parent, self.PARENT_DIRECTORY)):
- new_dir_: Entry = self.find_free_entry() or self.chain_directory()
- new_dir_.allocate_entry(first_cluster_id=dir_id.first_cluster.id,
- entity_name=name_,
- entity_extension='',
- entity_type=dir_id.ENTITY_TYPE)
- def lookup_entity(self, object_name: str, extension: str): # type: ignore
- for entity in self.entities:
- if build_name(entity.name, entity.extension) == build_name(object_name, extension):
- return entity
- return None
- @staticmethod
- def _is_end_of_path(path_as_list: List[str]) -> bool:
- """
- :param path_as_list: path split into the list
- :returns: True if the file is the leaf of the directory tree, False otherwise
- The method is part of the base of recursion,
- determines if the path is target file or directory in the tree folder structure.
- """
- return len(path_as_list) == 1
- def recursive_search(self, path_as_list, current_dir): # type: ignore
- name, extension = split_to_name_and_extension(path_as_list[0])
- next_obj = current_dir.lookup_entity(name, extension)
- if next_obj is None:
- raise FileNotFoundError('No such file or directory!')
- if self._is_end_of_path(path_as_list) and next_obj.name_equals(name, extension):
- return next_obj
- return self.recursive_search(path_as_list[1:], next_obj)
- def find_free_entry(self) -> Optional[Entry]:
- for entry in self.entries:
- if entry.is_empty:
- return entry
- return None
- def _extend_directory(self) -> None:
- current: Cluster = self.first_cluster
- while current.next_cluster is not None:
- current = current.next_cluster
- new_cluster: Cluster = self.fat.find_free_cluster()
- current.set_in_fat(new_cluster.id)
- assert current is not new_cluster
- current.next_cluster = new_cluster
- self.entries += self.create_entries(new_cluster)
- def chain_directory(self) -> Entry:
- """
- :returns: First free entry
- The method adds new Cluster to the Directory and returns first free entry.
- """
- self._extend_directory()
- free_entry: Entry = self.find_free_entry()
- if free_entry is None:
- raise FatalError('No more space left!')
- return free_entry
- @staticmethod
- def allocate_long_name_object(free_entry,
- name,
- extension,
- target_dir,
- free_cluster_id,
- entity_type,
- date,
- time):
- # type: (Entry, str, str, Directory, int, int, DATETIME, DATETIME) -> Entry
- lfn_full_name: str = build_lfn_full_name(name, extension)
- lfn_unique_entry_order: int = build_lfn_unique_entry_name_order(target_dir.entities, name)
- lfn_short_entry_name: str = build_lfn_short_entry_name(name, extension, lfn_unique_entry_order)
- checksum: int = lfn_checksum(lfn_short_entry_name)
- entries_count: int = get_required_lfn_entries_count(lfn_full_name)
- # entries in long file name entries chain starts with the last entry
- split_names_reversed = list(reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count)))))
- for i, name_split_to_entry in split_names_reversed:
- order: int = i + 1
- blocks_: List[bytes] = split_name_to_lfn_entry_blocks(name_split_to_entry)
- lfn_names: List[bytes] = list(map(lambda x: x.lower(), blocks_))
- free_entry.allocate_entry(first_cluster_id=free_cluster_id,
- entity_name=name,
- entity_extension=extension,
- entity_type=entity_type,
- lfn_order=order,
- lfn_names=lfn_names,
- lfn_checksum_=checksum,
- lfn_is_last=order == entries_count)
- free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
- free_entry.allocate_entry(first_cluster_id=free_cluster_id,
- entity_name=lfn_short_entry_name[:MAX_NAME_SIZE],
- entity_extension=lfn_short_entry_name[MAX_NAME_SIZE:],
- entity_type=entity_type,
- lfn_order=Entry.SHORT_ENTRY_LN,
- date=date,
- time=time)
- return free_entry
- @staticmethod
- def _is_valid_sfn(name: str, extension: str) -> bool:
- if INVALID_SFN_CHARS_PATTERN.search(name) or INVALID_SFN_CHARS_PATTERN.search(name):
- return False
- ret: bool = len(name) <= MAX_NAME_SIZE and len(extension) <= MAX_EXT_SIZE
- return ret
- def allocate_object(self,
- name,
- entity_type,
- object_timestamp_,
- path_from_root=None,
- extension='',
- is_empty=False):
- # type: (str, int, datetime, Optional[List[str]], str, bool) -> Tuple[Cluster, Entry, Directory]
- """
- Method finds the target directory in the path
- and allocates cluster (both the record in FAT and cluster in the data region)
- and entry in the specified directory
- """
- free_cluster: Optional[Cluster] = None
- free_cluster_id = 0x00
- if not is_empty:
- free_cluster = self.fat.find_free_cluster()
- free_cluster_id = free_cluster.id
- target_dir: Directory = self if not path_from_root else self.recursive_search(path_from_root, self)
- free_entry: Entry = target_dir.find_free_entry() or target_dir.chain_directory()
- fatfs_date_ = (object_timestamp_.year, object_timestamp_.month, object_timestamp_.day)
- fatfs_time_ = (object_timestamp_.hour, object_timestamp_.minute, object_timestamp_.second)
- if not self.fatfs_state.long_names_enabled or self._is_valid_sfn(name, extension):
- free_entry.allocate_entry(first_cluster_id=free_cluster_id,
- entity_name=name,
- entity_extension=extension,
- date=fatfs_date_,
- time=fatfs_time_,
- fits_short=True,
- entity_type=entity_type)
- return free_cluster, free_entry, target_dir
- return free_cluster, self.allocate_long_name_object(free_entry=free_entry,
- name=name,
- extension=extension,
- target_dir=target_dir,
- free_cluster_id=free_cluster_id,
- entity_type=entity_type,
- date=fatfs_date_,
- time=fatfs_time_), target_dir
- def new_file(self,
- name: str,
- extension: str,
- path_from_root: Optional[List[str]],
- object_timestamp_: datetime,
- is_empty: bool) -> None:
- free_cluster, free_entry, target_dir = self.allocate_object(name=name,
- extension=extension,
- entity_type=Directory.ATTR_ARCHIVE,
- path_from_root=path_from_root,
- object_timestamp_=object_timestamp_,
- is_empty=is_empty)
- file: File = File(name=name,
- fat=self.fat,
- extension=extension,
- fatfs_state=self.fatfs_state,
- entry=free_entry)
- file.first_cluster = free_cluster
- target_dir.entities.append(file)
- def new_directory(self, name, parent, path_from_root, object_timestamp_):
- # type: (str, Directory, Optional[List[str]], datetime) -> None
- free_cluster, free_entry, target_dir = self.allocate_object(name=name,
- entity_type=Directory.ATTR_DIRECTORY,
- path_from_root=path_from_root,
- object_timestamp_=object_timestamp_)
- directory: Directory = Directory(name=name,
- fat=self.fat,
- parent=parent,
- fatfs_state=self.fatfs_state,
- entry=free_entry)
- directory.first_cluster = free_cluster
- directory.init_directory()
- target_dir.entities.append(directory)
- def write_to_file(self, path: List[str], content: bytes) -> None:
- """
- Writes to file existing in the directory structure.
- :param path: path split into the list
- :param content: content as a string to be written into a file
- :returns: None
- :raises WriteDirectoryException: raised is the target object for writing is a directory
- """
- entity_to_write: Entry = self.recursive_search(path, self)
- if isinstance(entity_to_write, File):
- clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.boot_sector_state.sector_size,
- content=content)
- self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
- entity_to_write.write(content)
- else:
- raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')
|