diff options
Diffstat (limited to 'third_party/python/importlib_metadata/importlib_metadata/__init__.py')
-rw-r--r-- | third_party/python/importlib_metadata/importlib_metadata/__init__.py | 904 |
1 files changed, 904 insertions, 0 deletions
diff --git a/third_party/python/importlib_metadata/importlib_metadata/__init__.py b/third_party/python/importlib_metadata/importlib_metadata/__init__.py new file mode 100644 index 0000000000..9a36a8e621 --- /dev/null +++ b/third_party/python/importlib_metadata/importlib_metadata/__init__.py @@ -0,0 +1,904 @@ +import os +import re +import abc +import csv +import sys +import zipp +import email +import pathlib +import operator +import textwrap +import warnings +import functools +import itertools +import posixpath +import collections + +from . import _adapters, _meta, _py39compat +from ._collections import FreezableDefaultDict, Pair +from ._compat import ( + NullFinder, + install, + pypy_partial, +) +from ._functools import method_cache, pass_none +from ._itertools import always_iterable, unique_everseen +from ._meta import PackageMetadata, SimplePath + +from contextlib import suppress +from importlib import import_module +from importlib.abc import MetaPathFinder +from itertools import starmap +from typing import List, Mapping, Optional + + +__all__ = [ + 'Distribution', + 'DistributionFinder', + 'PackageMetadata', + 'PackageNotFoundError', + 'distribution', + 'distributions', + 'entry_points', + 'files', + 'metadata', + 'packages_distributions', + 'requires', + 'version', +] + + +class PackageNotFoundError(ModuleNotFoundError): + """The package was not found.""" + + def __str__(self): + return f"No package metadata was found for {self.name}" + + @property + def name(self): + (name,) = self.args + return name + + +class Sectioned: + """ + A simple entry point config parser for performance + + >>> for item in Sectioned.read(Sectioned._sample): + ... print(item) + Pair(name='sec1', value='# comments ignored') + Pair(name='sec1', value='a = 1') + Pair(name='sec1', value='b = 2') + Pair(name='sec2', value='a = 2') + + >>> res = Sectioned.section_pairs(Sectioned._sample) + >>> item = next(res) + >>> item.name + 'sec1' + >>> item.value + Pair(name='a', value='1') + >>> item = next(res) + >>> item.value + Pair(name='b', value='2') + >>> item = next(res) + >>> item.name + 'sec2' + >>> item.value + Pair(name='a', value='2') + >>> list(res) + [] + """ + + _sample = textwrap.dedent( + """ + [sec1] + # comments ignored + a = 1 + b = 2 + + [sec2] + a = 2 + """ + ).lstrip() + + @classmethod + def section_pairs(cls, text): + return ( + section._replace(value=Pair.parse(section.value)) + for section in cls.read(text, filter_=cls.valid) + if section.name is not None + ) + + @staticmethod + def read(text, filter_=None): + lines = filter(filter_, map(str.strip, text.splitlines())) + name = None + for value in lines: + section_match = value.startswith('[') and value.endswith(']') + if section_match: + name = value.strip('[]') + continue + yield Pair(name, value) + + @staticmethod + def valid(line): + return line and not line.startswith('#') + + +class DeprecatedTuple: + """ + Provide subscript item access for backward compatibility. + + >>> recwarn = getfixture('recwarn') + >>> ep = EntryPoint(name='name', value='value', group='group') + >>> ep[:] + ('name', 'value', 'group') + >>> ep[0] + 'name' + >>> len(recwarn) + 1 + """ + + # Do not remove prior to 2023-05-01 or Python 3.13 + _warn = functools.partial( + warnings.warn, + "EntryPoint tuple interface is deprecated. Access members by name.", + DeprecationWarning, + stacklevel=pypy_partial(2), + ) + + def __getitem__(self, item): + self._warn() + return self._key()[item] + + +class EntryPoint(DeprecatedTuple): + """An entry point as defined by Python packaging conventions. + + See `the packaging docs on entry points + <https://packaging.python.org/specifications/entry-points/>`_ + for more information. + + >>> ep = EntryPoint( + ... name=None, group=None, value='package.module:attr [extra1, extra2]') + >>> ep.module + 'package.module' + >>> ep.attr + 'attr' + >>> ep.extras + ['extra1', 'extra2'] + """ + + pattern = re.compile( + r'(?P<module>[\w.]+)\s*' + r'(:\s*(?P<attr>[\w.]+)\s*)?' + r'((?P<extras>\[.*\])\s*)?$' + ) + """ + A regular expression describing the syntax for an entry point, + which might look like: + + - module + - package.module + - package.module:attribute + - package.module:object.attribute + - package.module:attr [extra1, extra2] + + Other combinations are possible as well. + + The expression is lenient about whitespace around the ':', + following the attr, and following any extras. + """ + + name: str + value: str + group: str + + dist: Optional['Distribution'] = None + + def __init__(self, name, value, group): + vars(self).update(name=name, value=value, group=group) + + def load(self): + """Load the entry point from its definition. If only a module + is indicated by the value, return that module. Otherwise, + return the named object. + """ + match = self.pattern.match(self.value) + module = import_module(match.group('module')) + attrs = filter(None, (match.group('attr') or '').split('.')) + return functools.reduce(getattr, attrs, module) + + @property + def module(self): + match = self.pattern.match(self.value) + return match.group('module') + + @property + def attr(self): + match = self.pattern.match(self.value) + return match.group('attr') + + @property + def extras(self): + match = self.pattern.match(self.value) + return re.findall(r'\w+', match.group('extras') or '') + + def _for(self, dist): + vars(self).update(dist=dist) + return self + + def matches(self, **params): + """ + EntryPoint matches the given parameters. + + >>> ep = EntryPoint(group='foo', name='bar', value='bing:bong [extra1, extra2]') + >>> ep.matches(group='foo') + True + >>> ep.matches(name='bar', value='bing:bong [extra1, extra2]') + True + >>> ep.matches(group='foo', name='other') + False + >>> ep.matches() + True + >>> ep.matches(extras=['extra1', 'extra2']) + True + >>> ep.matches(module='bing') + True + >>> ep.matches(attr='bong') + True + """ + attrs = (getattr(self, param) for param in params) + return all(map(operator.eq, params.values(), attrs)) + + def _key(self): + return self.name, self.value, self.group + + def __lt__(self, other): + return self._key() < other._key() + + def __eq__(self, other): + return self._key() == other._key() + + def __setattr__(self, name, value): + raise AttributeError("EntryPoint objects are immutable.") + + def __repr__(self): + return ( + f'EntryPoint(name={self.name!r}, value={self.value!r}, ' + f'group={self.group!r})' + ) + + def __hash__(self): + return hash(self._key()) + + +class EntryPoints(tuple): + """ + An immutable collection of selectable EntryPoint objects. + """ + + __slots__ = () + + def __getitem__(self, name): # -> EntryPoint: + """ + Get the EntryPoint in self matching name. + """ + try: + return next(iter(self.select(name=name))) + except StopIteration: + raise KeyError(name) + + def select(self, **params): + """ + Select entry points from self that match the + given parameters (typically group and/or name). + """ + return EntryPoints(ep for ep in self if _py39compat.ep_matches(ep, **params)) + + @property + def names(self): + """ + Return the set of all names of all entry points. + """ + return {ep.name for ep in self} + + @property + def groups(self): + """ + Return the set of all groups of all entry points. + """ + return {ep.group for ep in self} + + @classmethod + def _from_text_for(cls, text, dist): + return cls(ep._for(dist) for ep in cls._from_text(text)) + + @staticmethod + def _from_text(text): + return ( + EntryPoint(name=item.value.name, value=item.value.value, group=item.name) + for item in Sectioned.section_pairs(text or '') + ) + + +class PackagePath(pathlib.PurePosixPath): + """A reference to a path in a package""" + + def read_text(self, encoding='utf-8'): + with self.locate().open(encoding=encoding) as stream: + return stream.read() + + def read_binary(self): + with self.locate().open('rb') as stream: + return stream.read() + + def locate(self): + """Return a path-like object for this path""" + return self.dist.locate_file(self) + + +class FileHash: + def __init__(self, spec): + self.mode, _, self.value = spec.partition('=') + + def __repr__(self): + return f'<FileHash mode: {self.mode} value: {self.value}>' + + +class Distribution(metaclass=abc.ABCMeta): + """A Python distribution package.""" + + @abc.abstractmethod + def read_text(self, filename): + """Attempt to load metadata file given by the name. + + :param filename: The name of the file in the distribution info. + :return: The text if found, otherwise None. + """ + + @abc.abstractmethod + def locate_file(self, path): + """ + Given a path to a file in this distribution, return a path + to it. + """ + + @classmethod + def from_name(cls, name: str): + """Return the Distribution for the given package name. + + :param name: The name of the distribution package to search for. + :return: The Distribution instance (or subclass thereof) for the named + package, if found. + :raises PackageNotFoundError: When the named package's distribution + metadata cannot be found. + :raises ValueError: When an invalid value is supplied for name. + """ + if not name: + raise ValueError("A distribution name is required.") + try: + return next(cls.discover(name=name)) + except StopIteration: + raise PackageNotFoundError(name) + + @classmethod + def discover(cls, **kwargs): + """Return an iterable of Distribution objects for all packages. + + Pass a ``context`` or pass keyword arguments for constructing + a context. + + :context: A ``DistributionFinder.Context`` object. + :return: Iterable of Distribution objects for all packages. + """ + context = kwargs.pop('context', None) + if context and kwargs: + raise ValueError("cannot accept context and kwargs") + context = context or DistributionFinder.Context(**kwargs) + return itertools.chain.from_iterable( + resolver(context) for resolver in cls._discover_resolvers() + ) + + @staticmethod + def at(path): + """Return a Distribution for the indicated metadata path + + :param path: a string or path-like object + :return: a concrete Distribution instance for the path + """ + return PathDistribution(pathlib.Path(path)) + + @staticmethod + def _discover_resolvers(): + """Search the meta_path for resolvers.""" + declared = ( + getattr(finder, 'find_distributions', None) for finder in sys.meta_path + ) + return filter(None, declared) + + @property + def metadata(self) -> _meta.PackageMetadata: + """Return the parsed metadata for this Distribution. + + The returned object will have keys that name the various bits of + metadata. See PEP 566 for details. + """ + text = ( + self.read_text('METADATA') + or self.read_text('PKG-INFO') + # This last clause is here to support old egg-info files. Its + # effect is to just end up using the PathDistribution's self._path + # (which points to the egg-info file) attribute unchanged. + or self.read_text('') + ) + return _adapters.Message(email.message_from_string(text)) + + @property + def name(self): + """Return the 'Name' metadata for the distribution package.""" + return self.metadata['Name'] + + @property + def _normalized_name(self): + """Return a normalized version of the name.""" + return Prepared.normalize(self.name) + + @property + def version(self): + """Return the 'Version' metadata for the distribution package.""" + return self.metadata['Version'] + + @property + def entry_points(self): + return EntryPoints._from_text_for(self.read_text('entry_points.txt'), self) + + @property + def files(self): + """Files in this distribution. + + :return: List of PackagePath for this distribution or None + + Result is `None` if the metadata file that enumerates files + (i.e. RECORD for dist-info or SOURCES.txt for egg-info) is + missing. + Result may be empty if the metadata exists but is empty. + """ + + def make_file(name, hash=None, size_str=None): + result = PackagePath(name) + result.hash = FileHash(hash) if hash else None + result.size = int(size_str) if size_str else None + result.dist = self + return result + + @pass_none + def make_files(lines): + return list(starmap(make_file, csv.reader(lines))) + + return make_files(self._read_files_distinfo() or self._read_files_egginfo()) + + def _read_files_distinfo(self): + """ + Read the lines of RECORD + """ + text = self.read_text('RECORD') + return text and text.splitlines() + + def _read_files_egginfo(self): + """ + SOURCES.txt might contain literal commas, so wrap each line + in quotes. + """ + text = self.read_text('SOURCES.txt') + return text and map('"{}"'.format, text.splitlines()) + + @property + def requires(self): + """Generated requirements specified for this Distribution""" + reqs = self._read_dist_info_reqs() or self._read_egg_info_reqs() + return reqs and list(reqs) + + def _read_dist_info_reqs(self): + return self.metadata.get_all('Requires-Dist') + + def _read_egg_info_reqs(self): + source = self.read_text('requires.txt') + return pass_none(self._deps_from_requires_text)(source) + + @classmethod + def _deps_from_requires_text(cls, source): + return cls._convert_egg_info_reqs_to_simple_reqs(Sectioned.read(source)) + + @staticmethod + def _convert_egg_info_reqs_to_simple_reqs(sections): + """ + Historically, setuptools would solicit and store 'extra' + requirements, including those with environment markers, + in separate sections. More modern tools expect each + dependency to be defined separately, with any relevant + extras and environment markers attached directly to that + requirement. This method converts the former to the + latter. See _test_deps_from_requires_text for an example. + """ + + def make_condition(name): + return name and f'extra == "{name}"' + + def quoted_marker(section): + section = section or '' + extra, sep, markers = section.partition(':') + if extra and markers: + markers = f'({markers})' + conditions = list(filter(None, [markers, make_condition(extra)])) + return '; ' + ' and '.join(conditions) if conditions else '' + + def url_req_space(req): + """ + PEP 508 requires a space between the url_spec and the quoted_marker. + Ref python/importlib_metadata#357. + """ + # '@' is uniquely indicative of a url_req. + return ' ' * ('@' in req) + + for section in sections: + space = url_req_space(section.value) + yield section.value + space + quoted_marker(section.name) + + +class DistributionFinder(MetaPathFinder): + """ + A MetaPathFinder capable of discovering installed distributions. + """ + + class Context: + """ + Keyword arguments presented by the caller to + ``distributions()`` or ``Distribution.discover()`` + to narrow the scope of a search for distributions + in all DistributionFinders. + + Each DistributionFinder may expect any parameters + and should attempt to honor the canonical + parameters defined below when appropriate. + """ + + name = None + """ + Specific name for which a distribution finder should match. + A name of ``None`` matches all distributions. + """ + + def __init__(self, **kwargs): + vars(self).update(kwargs) + + @property + def path(self): + """ + The sequence of directory path that a distribution finder + should search. + + Typically refers to Python installed package paths such as + "site-packages" directories and defaults to ``sys.path``. + """ + return vars(self).get('path', sys.path) + + @abc.abstractmethod + def find_distributions(self, context=Context()): + """ + Find distributions. + + Return an iterable of all Distribution instances capable of + loading the metadata for packages matching the ``context``, + a DistributionFinder.Context instance. + """ + + +class FastPath: + """ + Micro-optimized class for searching a path for + children. + + >>> FastPath('').children() + ['...'] + """ + + @functools.lru_cache() # type: ignore + def __new__(cls, root): + return super().__new__(cls) + + def __init__(self, root): + self.root = root + + def joinpath(self, child): + return pathlib.Path(self.root, child) + + def children(self): + with suppress(Exception): + return os.listdir(self.root or '.') + with suppress(Exception): + return self.zip_children() + return [] + + def zip_children(self): + zip_path = zipp.Path(self.root) + names = zip_path.root.namelist() + self.joinpath = zip_path.joinpath + + return dict.fromkeys(child.split(posixpath.sep, 1)[0] for child in names) + + def search(self, name): + return self.lookup(self.mtime).search(name) + + @property + def mtime(self): + with suppress(OSError): + return os.stat(self.root).st_mtime + self.lookup.cache_clear() + + @method_cache + def lookup(self, mtime): + return Lookup(self) + + +class Lookup: + def __init__(self, path: FastPath): + base = os.path.basename(path.root).lower() + base_is_egg = base.endswith(".egg") + self.infos = FreezableDefaultDict(list) + self.eggs = FreezableDefaultDict(list) + + for child in path.children(): + low = child.lower() + if low.endswith((".dist-info", ".egg-info")): + # rpartition is faster than splitext and suitable for this purpose. + name = low.rpartition(".")[0].partition("-")[0] + normalized = Prepared.normalize(name) + self.infos[normalized].append(path.joinpath(child)) + elif base_is_egg and low == "egg-info": + name = base.rpartition(".")[0].partition("-")[0] + legacy_normalized = Prepared.legacy_normalize(name) + self.eggs[legacy_normalized].append(path.joinpath(child)) + + self.infos.freeze() + self.eggs.freeze() + + def search(self, prepared): + infos = ( + self.infos[prepared.normalized] + if prepared + else itertools.chain.from_iterable(self.infos.values()) + ) + eggs = ( + self.eggs[prepared.legacy_normalized] + if prepared + else itertools.chain.from_iterable(self.eggs.values()) + ) + return itertools.chain(infos, eggs) + + +class Prepared: + """ + A prepared search for metadata on a possibly-named package. + """ + + normalized = None + legacy_normalized = None + + def __init__(self, name): + self.name = name + if name is None: + return + self.normalized = self.normalize(name) + self.legacy_normalized = self.legacy_normalize(name) + + @staticmethod + def normalize(name): + """ + PEP 503 normalization plus dashes as underscores. + """ + return re.sub(r"[-_.]+", "-", name).lower().replace('-', '_') + + @staticmethod + def legacy_normalize(name): + """ + Normalize the package name as found in the convention in + older packaging tools versions and specs. + """ + return name.lower().replace('-', '_') + + def __bool__(self): + return bool(self.name) + + +@install +class MetadataPathFinder(NullFinder, DistributionFinder): + """A degenerate finder for distribution packages on the file system. + + This finder supplies only a find_distributions() method for versions + of Python that do not have a PathFinder find_distributions(). + """ + + def find_distributions(self, context=DistributionFinder.Context()): + """ + Find distributions. + + Return an iterable of all Distribution instances capable of + loading the metadata for packages matching ``context.name`` + (or all names if ``None`` indicated) along the paths in the list + of directories ``context.path``. + """ + found = self._search_paths(context.name, context.path) + return map(PathDistribution, found) + + @classmethod + def _search_paths(cls, name, paths): + """Find metadata directories in paths heuristically.""" + prepared = Prepared(name) + return itertools.chain.from_iterable( + path.search(prepared) for path in map(FastPath, paths) + ) + + def invalidate_caches(cls): + FastPath.__new__.cache_clear() + + +class PathDistribution(Distribution): + def __init__(self, path: SimplePath): + """Construct a distribution. + + :param path: SimplePath indicating the metadata directory. + """ + self._path = path + + def read_text(self, filename): + with suppress( + FileNotFoundError, + IsADirectoryError, + KeyError, + NotADirectoryError, + PermissionError, + ): + return self._path.joinpath(filename).read_text(encoding='utf-8') + + read_text.__doc__ = Distribution.read_text.__doc__ + + def locate_file(self, path): + return self._path.parent / path + + @property + def _normalized_name(self): + """ + Performance optimization: where possible, resolve the + normalized name from the file system path. + """ + stem = os.path.basename(str(self._path)) + return ( + pass_none(Prepared.normalize)(self._name_from_stem(stem)) + or super()._normalized_name + ) + + @staticmethod + def _name_from_stem(stem): + """ + >>> PathDistribution._name_from_stem('foo-3.0.egg-info') + 'foo' + >>> PathDistribution._name_from_stem('CherryPy-3.0.dist-info') + 'CherryPy' + >>> PathDistribution._name_from_stem('face.egg-info') + 'face' + >>> PathDistribution._name_from_stem('foo.bar') + """ + filename, ext = os.path.splitext(stem) + if ext not in ('.dist-info', '.egg-info'): + return + name, sep, rest = filename.partition('-') + return name + + +def distribution(distribution_name): + """Get the ``Distribution`` instance for the named package. + + :param distribution_name: The name of the distribution package as a string. + :return: A ``Distribution`` instance (or subclass thereof). + """ + return Distribution.from_name(distribution_name) + + +def distributions(**kwargs): + """Get all ``Distribution`` instances in the current environment. + + :return: An iterable of ``Distribution`` instances. + """ + return Distribution.discover(**kwargs) + + +def metadata(distribution_name) -> _meta.PackageMetadata: + """Get the metadata for the named package. + + :param distribution_name: The name of the distribution package to query. + :return: A PackageMetadata containing the parsed metadata. + """ + return Distribution.from_name(distribution_name).metadata + + +def version(distribution_name): + """Get the version string for the named package. + + :param distribution_name: The name of the distribution package to query. + :return: The version string for the package as defined in the package's + "Version" metadata key. + """ + return distribution(distribution_name).version + + +_unique = functools.partial( + unique_everseen, + key=_py39compat.normalized_name, +) +""" +Wrapper for ``distributions`` to return unique distributions by name. +""" + + +def entry_points(**params) -> EntryPoints: + """Return EntryPoint objects for all installed packages. + + Pass selection parameters (group or name) to filter the + result to entry points matching those properties (see + EntryPoints.select()). + + :return: EntryPoints for all installed packages. + """ + eps = itertools.chain.from_iterable( + dist.entry_points for dist in _unique(distributions()) + ) + return EntryPoints(eps).select(**params) + + +def files(distribution_name): + """Return a list of files for the named package. + + :param distribution_name: The name of the distribution package to query. + :return: List of files composing the distribution. + """ + return distribution(distribution_name).files + + +def requires(distribution_name): + """ + Return a list of requirements for the named package. + + :return: An iterator of requirements, suitable for + packaging.requirement.Requirement. + """ + return distribution(distribution_name).requires + + +def packages_distributions() -> Mapping[str, List[str]]: + """ + Return a mapping of top-level packages to their + distributions. + + >>> import collections.abc + >>> pkgs = packages_distributions() + >>> all(isinstance(dist, collections.abc.Sequence) for dist in pkgs.values()) + True + """ + pkg_to_dist = collections.defaultdict(list) + for dist in distributions(): + for pkg in _top_level_declared(dist) or _top_level_inferred(dist): + pkg_to_dist[pkg].append(dist.metadata['Name']) + return dict(pkg_to_dist) + + +def _top_level_declared(dist): + return (dist.read_text('top_level.txt') or '').split() + + +def _top_level_inferred(dist): + return { + f.parts[0] if len(f.parts) > 1 else f.with_suffix('').name + for f in always_iterable(dist.files) + if f.suffix == ".py" + } |