resources.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import os
  2. import tempfile
  3. from . import abc as resources_abc
  4. from contextlib import contextmanager, suppress
  5. from importlib import import_module
  6. from importlib.abc import ResourceLoader
  7. from io import BytesIO, TextIOWrapper
  8. from pathlib import Path
  9. from types import ModuleType
  10. from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
  11. from typing import cast
  12. from typing.io import BinaryIO, TextIO
  13. from zipimport import ZipImportError
  14. __all__ = [
  15. 'Package',
  16. 'Resource',
  17. 'contents',
  18. 'is_resource',
  19. 'open_binary',
  20. 'open_text',
  21. 'path',
  22. 'read_binary',
  23. 'read_text',
  24. ]
  25. Package = Union[str, ModuleType]
  26. Resource = Union[str, os.PathLike]
  27. def _get_package(package) -> ModuleType:
  28. """Take a package name or module object and return the module.
  29. If a name, the module is imported. If the passed or imported module
  30. object is not a package, raise an exception.
  31. """
  32. if hasattr(package, '__spec__'):
  33. if package.__spec__.submodule_search_locations is None:
  34. raise TypeError('{!r} is not a package'.format(
  35. package.__spec__.name))
  36. else:
  37. return package
  38. else:
  39. module = import_module(package)
  40. if module.__spec__.submodule_search_locations is None:
  41. raise TypeError('{!r} is not a package'.format(package))
  42. else:
  43. return module
  44. def _normalize_path(path) -> str:
  45. """Normalize a path by ensuring it is a string.
  46. If the resulting string contains path separators, an exception is raised.
  47. """
  48. parent, file_name = os.path.split(path)
  49. if parent:
  50. raise ValueError('{!r} must be only a file name'.format(path))
  51. else:
  52. return file_name
  53. def _get_resource_reader(
  54. package: ModuleType) -> Optional[resources_abc.ResourceReader]:
  55. # Return the package's loader if it's a ResourceReader. We can't use
  56. # a issubclass() check here because apparently abc.'s __subclasscheck__()
  57. # hook wants to create a weak reference to the object, but
  58. # zipimport.zipimporter does not support weak references, resulting in a
  59. # TypeError. That seems terrible.
  60. spec = package.__spec__
  61. if hasattr(spec.loader, 'get_resource_reader'):
  62. return cast(resources_abc.ResourceReader,
  63. spec.loader.get_resource_reader(spec.name))
  64. return None
  65. def _check_location(package):
  66. if package.__spec__.origin is None or not package.__spec__.has_location:
  67. raise FileNotFoundError(f'Package has no location {package!r}')
  68. def open_binary(package: Package, resource: Resource) -> BinaryIO:
  69. """Return a file-like object opened for binary reading of the resource."""
  70. resource = _normalize_path(resource)
  71. package = _get_package(package)
  72. reader = _get_resource_reader(package)
  73. if reader is not None:
  74. return reader.open_resource(resource)
  75. _check_location(package)
  76. absolute_package_path = os.path.abspath(package.__spec__.origin)
  77. package_path = os.path.dirname(absolute_package_path)
  78. full_path = os.path.join(package_path, resource)
  79. try:
  80. return open(full_path, mode='rb')
  81. except OSError:
  82. # Just assume the loader is a resource loader; all the relevant
  83. # importlib.machinery loaders are and an AttributeError for
  84. # get_data() will make it clear what is needed from the loader.
  85. loader = cast(ResourceLoader, package.__spec__.loader)
  86. data = None
  87. if hasattr(package.__spec__.loader, 'get_data'):
  88. with suppress(OSError):
  89. data = loader.get_data(full_path)
  90. if data is None:
  91. package_name = package.__spec__.name
  92. message = '{!r} resource not found in {!r}'.format(
  93. resource, package_name)
  94. raise FileNotFoundError(message)
  95. else:
  96. return BytesIO(data)
  97. def open_text(package: Package,
  98. resource: Resource,
  99. encoding: str = 'utf-8',
  100. errors: str = 'strict') -> TextIO:
  101. """Return a file-like object opened for text reading of the resource."""
  102. resource = _normalize_path(resource)
  103. package = _get_package(package)
  104. reader = _get_resource_reader(package)
  105. if reader is not None:
  106. return TextIOWrapper(reader.open_resource(resource), encoding, errors)
  107. _check_location(package)
  108. absolute_package_path = os.path.abspath(package.__spec__.origin)
  109. package_path = os.path.dirname(absolute_package_path)
  110. full_path = os.path.join(package_path, resource)
  111. try:
  112. return open(full_path, mode='r', encoding=encoding, errors=errors)
  113. except OSError:
  114. # Just assume the loader is a resource loader; all the relevant
  115. # importlib.machinery loaders are and an AttributeError for
  116. # get_data() will make it clear what is needed from the loader.
  117. loader = cast(ResourceLoader, package.__spec__.loader)
  118. data = None
  119. if hasattr(package.__spec__.loader, 'get_data'):
  120. with suppress(OSError):
  121. data = loader.get_data(full_path)
  122. if data is None:
  123. package_name = package.__spec__.name
  124. message = '{!r} resource not found in {!r}'.format(
  125. resource, package_name)
  126. raise FileNotFoundError(message)
  127. else:
  128. return TextIOWrapper(BytesIO(data), encoding, errors)
  129. def read_binary(package: Package, resource: Resource) -> bytes:
  130. """Return the binary contents of the resource."""
  131. resource = _normalize_path(resource)
  132. package = _get_package(package)
  133. with open_binary(package, resource) as fp:
  134. return fp.read()
  135. def read_text(package: Package,
  136. resource: Resource,
  137. encoding: str = 'utf-8',
  138. errors: str = 'strict') -> str:
  139. """Return the decoded string of the resource.
  140. The decoding-related arguments have the same semantics as those of
  141. bytes.decode().
  142. """
  143. resource = _normalize_path(resource)
  144. package = _get_package(package)
  145. with open_text(package, resource, encoding, errors) as fp:
  146. return fp.read()
  147. @contextmanager
  148. def path(package: Package, resource: Resource) -> Iterator[Path]:
  149. """A context manager providing a file path object to the resource.
  150. If the resource does not already exist on its own on the file system,
  151. a temporary file will be created. If the file was created, the file
  152. will be deleted upon exiting the context manager (no exception is
  153. raised if the file was deleted prior to the context manager
  154. exiting).
  155. """
  156. resource = _normalize_path(resource)
  157. package = _get_package(package)
  158. reader = _get_resource_reader(package)
  159. if reader is not None:
  160. try:
  161. yield Path(reader.resource_path(resource))
  162. return
  163. except FileNotFoundError:
  164. pass
  165. else:
  166. _check_location(package)
  167. # Fall-through for both the lack of resource_path() *and* if
  168. # resource_path() raises FileNotFoundError.
  169. package_directory = Path(package.__spec__.origin).parent
  170. file_path = package_directory / resource
  171. if file_path.exists():
  172. yield file_path
  173. else:
  174. with open_binary(package, resource) as fp:
  175. data = fp.read()
  176. # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
  177. # blocks due to the need to close the temporary file to work on
  178. # Windows properly.
  179. fd, raw_path = tempfile.mkstemp()
  180. try:
  181. os.write(fd, data)
  182. os.close(fd)
  183. yield Path(raw_path)
  184. finally:
  185. try:
  186. os.remove(raw_path)
  187. except FileNotFoundError:
  188. pass
  189. def is_resource(package: Package, name: str) -> bool:
  190. """True if 'name' is a resource inside 'package'.
  191. Directories are *not* resources.
  192. """
  193. package = _get_package(package)
  194. _normalize_path(name)
  195. reader = _get_resource_reader(package)
  196. if reader is not None:
  197. return reader.is_resource(name)
  198. try:
  199. package_contents = set(contents(package))
  200. except (NotADirectoryError, FileNotFoundError):
  201. return False
  202. if name not in package_contents:
  203. return False
  204. # Just because the given file_name lives as an entry in the package's
  205. # contents doesn't necessarily mean it's a resource. Directories are not
  206. # resources, so let's try to find out if it's a directory or not.
  207. path = Path(package.__spec__.origin).parent / name
  208. return path.is_file()
  209. def contents(package: Package) -> Iterable[str]:
  210. """Return an iterable of entries in 'package'.
  211. Note that not all entries are resources. Specifically, directories are
  212. not considered resources. Use `is_resource()` on each entry returned here
  213. to check if it is a resource or not.
  214. """
  215. package = _get_package(package)
  216. reader = _get_resource_reader(package)
  217. if reader is not None:
  218. return reader.contents()
  219. # Is the package a namespace package? By definition, namespace packages
  220. # cannot have resources. We could use _check_location() and catch the
  221. # exception, but that's extra work, so just inline the check.
  222. elif package.__spec__.origin is None or not package.__spec__.has_location:
  223. return ()
  224. else:
  225. package_directory = Path(package.__spec__.origin).parent
  226. return os.listdir(package_directory)
  227. # Private implementation of ResourceReader and get_resource_reader() called
  228. # from zipimport.c. Don't use these directly! We're implementing these in
  229. # Python because 1) it's easier, 2) zipimport may get rewritten in Python
  230. # itself at some point, so doing this all in C would difficult and a waste of
  231. # effort.
  232. class _ZipImportResourceReader(resources_abc.ResourceReader):
  233. """Private class used to support ZipImport.get_resource_reader().
  234. This class is allowed to reference all the innards and private parts of
  235. the zipimporter.
  236. """
  237. def __init__(self, zipimporter, fullname):
  238. self.zipimporter = zipimporter
  239. self.fullname = fullname
  240. def open_resource(self, resource):
  241. fullname_as_path = self.fullname.replace('.', '/')
  242. path = f'{fullname_as_path}/{resource}'
  243. try:
  244. return BytesIO(self.zipimporter.get_data(path))
  245. except OSError:
  246. raise FileNotFoundError(path)
  247. def resource_path(self, resource):
  248. # All resources are in the zip file, so there is no path to the file.
  249. # Raising FileNotFoundError tells the higher level API to extract the
  250. # binary data and create a temporary file.
  251. raise FileNotFoundError
  252. def is_resource(self, name):
  253. # Maybe we could do better, but if we can get the data, it's a
  254. # resource. Otherwise it isn't.
  255. fullname_as_path = self.fullname.replace('.', '/')
  256. path = f'{fullname_as_path}/{name}'
  257. try:
  258. self.zipimporter.get_data(path)
  259. except OSError:
  260. return False
  261. return True
  262. def contents(self):
  263. # This is a bit convoluted, because fullname will be a module path,
  264. # but _files is a list of file names relative to the top of the
  265. # archive's namespace. We want to compare file paths to find all the
  266. # names of things inside the module represented by fullname. So we
  267. # turn the module path of fullname into a file path relative to the
  268. # top of the archive, and then we iterate through _files looking for
  269. # names inside that "directory".
  270. fullname_path = Path(self.zipimporter.get_filename(self.fullname))
  271. relative_path = fullname_path.relative_to(self.zipimporter.archive)
  272. # Don't forget that fullname names a package, so its path will include
  273. # __init__.py, which we want to ignore.
  274. assert relative_path.name == '__init__.py'
  275. package_path = relative_path.parent
  276. subdirs_seen = set()
  277. for filename in self.zipimporter._files:
  278. try:
  279. relative = Path(filename).relative_to(package_path)
  280. except ValueError:
  281. continue
  282. # If the path of the file (which is relative to the top of the zip
  283. # namespace), relative to the package given when the resource
  284. # reader was created, has a parent, then it's a name in a
  285. # subdirectory and thus we skip it.
  286. parent_name = relative.parent.name
  287. if len(parent_name) == 0:
  288. yield relative.name
  289. elif parent_name not in subdirs_seen:
  290. subdirs_seen.add(parent_name)
  291. yield parent_name
  292. # Called from zipimport.c
  293. def _zipimport_get_resource_reader(zipimporter, fullname):
  294. try:
  295. if not zipimporter.is_package(fullname):
  296. return None
  297. except ZipImportError:
  298. return None
  299. return _ZipImportResourceReader(zipimporter, fullname)