summaryrefslogtreecommitdiffstats
path: root/third_party/python/setuptools/pkg_resources/_vendor/importlib_resources/_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/setuptools/pkg_resources/_vendor/importlib_resources/_common.py')
-rw-r--r--third_party/python/setuptools/pkg_resources/_vendor/importlib_resources/_common.py207
1 files changed, 207 insertions, 0 deletions
diff --git a/third_party/python/setuptools/pkg_resources/_vendor/importlib_resources/_common.py b/third_party/python/setuptools/pkg_resources/_vendor/importlib_resources/_common.py
new file mode 100644
index 0000000000..3c6de1cfb2
--- /dev/null
+++ b/third_party/python/setuptools/pkg_resources/_vendor/importlib_resources/_common.py
@@ -0,0 +1,207 @@
+import os
+import pathlib
+import tempfile
+import functools
+import contextlib
+import types
+import importlib
+import inspect
+import warnings
+import itertools
+
+from typing import Union, Optional, cast
+from .abc import ResourceReader, Traversable
+
+from ._compat import wrap_spec
+
+Package = Union[types.ModuleType, str]
+Anchor = Package
+
+
+def package_to_anchor(func):
+ """
+ Replace 'package' parameter as 'anchor' and warn about the change.
+
+ Other errors should fall through.
+
+ >>> files('a', 'b')
+ Traceback (most recent call last):
+ TypeError: files() takes from 0 to 1 positional arguments but 2 were given
+ """
+ undefined = object()
+
+ @functools.wraps(func)
+ def wrapper(anchor=undefined, package=undefined):
+ if package is not undefined:
+ if anchor is not undefined:
+ return func(anchor, package)
+ warnings.warn(
+ "First parameter to files is renamed to 'anchor'",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return func(package)
+ elif anchor is undefined:
+ return func()
+ return func(anchor)
+
+ return wrapper
+
+
+@package_to_anchor
+def files(anchor: Optional[Anchor] = None) -> Traversable:
+ """
+ Get a Traversable resource for an anchor.
+ """
+ return from_package(resolve(anchor))
+
+
+def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]:
+ """
+ Return the package's loader if it's a ResourceReader.
+ """
+ # We can't use
+ # a issubclass() check here because apparently abc.'s __subclasscheck__()
+ # hook wants to create a weak reference to the object, but
+ # zipimport.zipimporter does not support weak references, resulting in a
+ # TypeError. That seems terrible.
+ spec = package.__spec__
+ reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
+ if reader is None:
+ return None
+ return reader(spec.name) # type: ignore
+
+
+@functools.singledispatch
+def resolve(cand: Optional[Anchor]) -> types.ModuleType:
+ return cast(types.ModuleType, cand)
+
+
+@resolve.register
+def _(cand: str) -> types.ModuleType:
+ return importlib.import_module(cand)
+
+
+@resolve.register
+def _(cand: None) -> types.ModuleType:
+ return resolve(_infer_caller().f_globals['__name__'])
+
+
+def _infer_caller():
+ """
+ Walk the stack and find the frame of the first caller not in this module.
+ """
+
+ def is_this_file(frame_info):
+ return frame_info.filename == __file__
+
+ def is_wrapper(frame_info):
+ return frame_info.function == 'wrapper'
+
+ not_this_file = itertools.filterfalse(is_this_file, inspect.stack())
+ # also exclude 'wrapper' due to singledispatch in the call stack
+ callers = itertools.filterfalse(is_wrapper, not_this_file)
+ return next(callers).frame
+
+
+def from_package(package: types.ModuleType):
+ """
+ Return a Traversable object for the given package.
+
+ """
+ spec = wrap_spec(package)
+ reader = spec.loader.get_resource_reader(spec.name)
+ return reader.files()
+
+
+@contextlib.contextmanager
+def _tempfile(
+ reader,
+ suffix='',
+ # gh-93353: Keep a reference to call os.remove() in late Python
+ # finalization.
+ *,
+ _os_remove=os.remove,
+):
+ # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
+ # blocks due to the need to close the temporary file to work on Windows
+ # properly.
+ fd, raw_path = tempfile.mkstemp(suffix=suffix)
+ try:
+ try:
+ os.write(fd, reader())
+ finally:
+ os.close(fd)
+ del reader
+ yield pathlib.Path(raw_path)
+ finally:
+ try:
+ _os_remove(raw_path)
+ except FileNotFoundError:
+ pass
+
+
+def _temp_file(path):
+ return _tempfile(path.read_bytes, suffix=path.name)
+
+
+def _is_present_dir(path: Traversable) -> bool:
+ """
+ Some Traversables implement ``is_dir()`` to raise an
+ exception (i.e. ``FileNotFoundError``) when the
+ directory doesn't exist. This function wraps that call
+ to always return a boolean and only return True
+ if there's a dir and it exists.
+ """
+ with contextlib.suppress(FileNotFoundError):
+ return path.is_dir()
+ return False
+
+
+@functools.singledispatch
+def as_file(path):
+ """
+ Given a Traversable object, return that object as a
+ path on the local file system in a context manager.
+ """
+ return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
+
+
+@as_file.register(pathlib.Path)
+@contextlib.contextmanager
+def _(path):
+ """
+ Degenerate behavior for pathlib.Path objects.
+ """
+ yield path
+
+
+@contextlib.contextmanager
+def _temp_path(dir: tempfile.TemporaryDirectory):
+ """
+ Wrap tempfile.TemporyDirectory to return a pathlib object.
+ """
+ with dir as result:
+ yield pathlib.Path(result)
+
+
+@contextlib.contextmanager
+def _temp_dir(path):
+ """
+ Given a traversable dir, recursively replicate the whole tree
+ to the file system in a context manager.
+ """
+ assert path.is_dir()
+ with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
+ yield _write_contents(temp_dir, path)
+
+
+def _write_contents(target, source):
+ child = target.joinpath(source.name)
+ if source.is_dir():
+ child.mkdir()
+ for item in source.iterdir():
+ _write_contents(child, item)
+ else:
+ child.write_bytes(source.read_bytes())
+ return child