path: root/third_party/python/pip/pip/_internal/resolution/resolvelib/
diff options
Diffstat (limited to 'third_party/python/pip/pip/_internal/resolution/resolvelib/')
1 files changed, 731 insertions, 0 deletions
diff --git a/third_party/python/pip/pip/_internal/resolution/resolvelib/ b/third_party/python/pip/pip/_internal/resolution/resolvelib/
new file mode 100644
index 0000000000..a4c24b52a1
--- /dev/null
+++ b/third_party/python/pip/pip/_internal/resolution/resolvelib/
@@ -0,0 +1,731 @@
+import contextlib
+import functools
+import logging
+from typing import (
+ Dict,
+ FrozenSet,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ NamedTuple,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ TypeVar,
+ cast,
+from pip._vendor.packaging.requirements import InvalidRequirement
+from pip._vendor.packaging.specifiers import SpecifierSet
+from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
+from pip._vendor.resolvelib import ResolutionImpossible
+from pip._internal.cache import CacheEntry, WheelCache
+from pip._internal.exceptions import (
+ DistributionNotFound,
+ InstallationError,
+ MetadataInconsistent,
+ UnsupportedPythonVersion,
+ UnsupportedWheel,
+from pip._internal.index.package_finder import PackageFinder
+from pip._internal.metadata import BaseDistribution, get_default_environment
+from import Link
+from pip._internal.models.wheel import Wheel
+from pip._internal.operations.prepare import RequirementPreparer
+from pip._internal.req.constructors import install_req_from_link_and_ireq
+from pip._internal.req.req_install import (
+ InstallRequirement,
+ check_invalid_constraint_type,
+from pip._internal.resolution.base import InstallRequirementProvider
+from pip._internal.utils.compatibility_tags import get_supported
+from pip._internal.utils.hashes import Hashes
+from pip._internal.utils.packaging import get_requirement
+from pip._internal.utils.virtualenv import running_under_virtualenv
+from .base import Candidate, CandidateVersion, Constraint, Requirement
+from .candidates import (
+ AlreadyInstalledCandidate,
+ BaseCandidate,
+ EditableCandidate,
+ ExtrasCandidate,
+ LinkCandidate,
+ RequiresPythonCandidate,
+ as_base_candidate,
+from .found_candidates import FoundCandidates, IndexCandidateInfo
+from .requirements import (
+ ExplicitRequirement,
+ RequiresPythonRequirement,
+ SpecifierRequirement,
+ UnsatisfiableRequirement,
+ from typing import Protocol
+ class ConflictCause(Protocol):
+ requirement: RequiresPythonRequirement
+ parent: Candidate
+logger = logging.getLogger(__name__)
+C = TypeVar("C")
+Cache = Dict[Link, C]
+class CollectedRootRequirements(NamedTuple):
+ requirements: List[Requirement]
+ constraints: Dict[str, Constraint]
+ user_requested: Dict[str, int]
+class Factory:
+ def __init__(
+ self,
+ finder: PackageFinder,
+ preparer: RequirementPreparer,
+ make_install_req: InstallRequirementProvider,
+ wheel_cache: Optional[WheelCache],
+ use_user_site: bool,
+ force_reinstall: bool,
+ ignore_installed: bool,
+ ignore_requires_python: bool,
+ py_version_info: Optional[Tuple[int, ...]] = None,
+ ) -> None:
+ self._finder = finder
+ self.preparer = preparer
+ self._wheel_cache = wheel_cache
+ self._python_candidate = RequiresPythonCandidate(py_version_info)
+ self._make_install_req_from_spec = make_install_req
+ self._use_user_site = use_user_site
+ self._force_reinstall = force_reinstall
+ self._ignore_requires_python = ignore_requires_python
+ self._build_failures: Cache[InstallationError] = {}
+ self._link_candidate_cache: Cache[LinkCandidate] = {}
+ self._editable_candidate_cache: Cache[EditableCandidate] = {}
+ self._installed_candidate_cache: Dict[str, AlreadyInstalledCandidate] = {}
+ self._extras_candidate_cache: Dict[
+ Tuple[int, FrozenSet[str]], ExtrasCandidate
+ ] = {}
+ if not ignore_installed:
+ env = get_default_environment()
+ self._installed_dists = {
+ dist.canonical_name: dist
+ for dist in env.iter_installed_distributions(local_only=False)
+ }
+ else:
+ self._installed_dists = {}
+ @property
+ def force_reinstall(self) -> bool:
+ return self._force_reinstall
+ def _fail_if_link_is_unsupported_wheel(self, link: Link) -> None:
+ if not link.is_wheel:
+ return
+ wheel = Wheel(link.filename)
+ if wheel.supported(self._finder.target_python.get_tags()):
+ return
+ msg = f"{link.filename} is not a supported wheel on this platform."
+ raise UnsupportedWheel(msg)
+ def _make_extras_candidate(
+ self, base: BaseCandidate, extras: FrozenSet[str]
+ ) -> ExtrasCandidate:
+ cache_key = (id(base), extras)
+ try:
+ candidate = self._extras_candidate_cache[cache_key]
+ except KeyError:
+ candidate = ExtrasCandidate(base, extras)
+ self._extras_candidate_cache[cache_key] = candidate
+ return candidate
+ def _make_candidate_from_dist(
+ self,
+ dist: BaseDistribution,
+ extras: FrozenSet[str],
+ template: InstallRequirement,
+ ) -> Candidate:
+ try:
+ base = self._installed_candidate_cache[dist.canonical_name]
+ except KeyError:
+ base = AlreadyInstalledCandidate(dist, template, factory=self)
+ self._installed_candidate_cache[dist.canonical_name] = base
+ if not extras:
+ return base
+ return self._make_extras_candidate(base, extras)
+ def _make_candidate_from_link(
+ self,
+ link: Link,
+ extras: FrozenSet[str],
+ template: InstallRequirement,
+ name: Optional[NormalizedName],
+ version: Optional[CandidateVersion],
+ ) -> Optional[Candidate]:
+ # TODO: Check already installed candidate, and use it if the link and
+ # editable flag match.
+ if link in self._build_failures:
+ # We already tried this candidate before, and it does not build.
+ # Don't bother trying again.
+ return None
+ if template.editable:
+ if link not in self._editable_candidate_cache:
+ try:
+ self._editable_candidate_cache[link] = EditableCandidate(
+ link,
+ template,
+ factory=self,
+ name=name,
+ version=version,
+ )
+ except MetadataInconsistent as e:
+ "Discarding [blue underline]%s[/]: [yellow]%s[reset]",
+ link,
+ e,
+ extra={"markup": True},
+ )
+ self._build_failures[link] = e
+ return None
+ base: BaseCandidate = self._editable_candidate_cache[link]
+ else:
+ if link not in self._link_candidate_cache:
+ try:
+ self._link_candidate_cache[link] = LinkCandidate(
+ link,
+ template,
+ factory=self,
+ name=name,
+ version=version,
+ )
+ except MetadataInconsistent as e:
+ "Discarding [blue underline]%s[/]: [yellow]%s[reset]",
+ link,
+ e,
+ extra={"markup": True},
+ )
+ self._build_failures[link] = e
+ return None
+ base = self._link_candidate_cache[link]
+ if not extras:
+ return base
+ return self._make_extras_candidate(base, extras)
+ def _iter_found_candidates(
+ self,
+ ireqs: Sequence[InstallRequirement],
+ specifier: SpecifierSet,
+ hashes: Hashes,
+ prefers_installed: bool,
+ incompatible_ids: Set[int],
+ ) -> Iterable[Candidate]:
+ if not ireqs:
+ return ()
+ # The InstallRequirement implementation requires us to give it a
+ # "template". Here we just choose the first requirement to represent
+ # all of them.
+ # Hopefully the Project model can correct this mismatch in the future.
+ template = ireqs[0]
+ assert template.req, "Candidates found on index must be PEP 508"
+ name = canonicalize_name(
+ extras: FrozenSet[str] = frozenset()
+ for ireq in ireqs:
+ assert ireq.req, "Candidates found on index must be PEP 508"
+ specifier &= ireq.req.specifier
+ hashes &= ireq.hashes(trust_internet=False)
+ extras |= frozenset(ireq.extras)
+ def _get_installed_candidate() -> Optional[Candidate]:
+ """Get the candidate for the currently-installed version."""
+ # If --force-reinstall is set, we want the version from the index
+ # instead, so we "pretend" there is nothing installed.
+ if self._force_reinstall:
+ return None
+ try:
+ installed_dist = self._installed_dists[name]
+ except KeyError:
+ return None
+ # Don't use the installed distribution if its version does not fit
+ # the current dependency graph.
+ if not specifier.contains(installed_dist.version, prereleases=True):
+ return None
+ candidate = self._make_candidate_from_dist(
+ dist=installed_dist,
+ extras=extras,
+ template=template,
+ )
+ # The candidate is a known incompatibility. Don't use it.
+ if id(candidate) in incompatible_ids:
+ return None
+ return candidate
+ def iter_index_candidate_infos() -> Iterator[IndexCandidateInfo]:
+ result = self._finder.find_best_candidate(
+ project_name=name,
+ specifier=specifier,
+ hashes=hashes,
+ )
+ icans = list(result.iter_applicable())
+ # PEP 592: Yanked releases are ignored unless the specifier
+ # explicitly pins a version (via '==' or '===') that can be
+ # solely satisfied by a yanked release.
+ all_yanked = all( for ican in icans)
+ def is_pinned(specifier: SpecifierSet) -> bool:
+ for sp in specifier:
+ if sp.operator == "===":
+ return True
+ if sp.operator != "==":
+ continue
+ if sp.version.endswith(".*"):
+ continue
+ return True
+ return False
+ pinned = is_pinned(specifier)
+ # PackageFinder returns earlier versions first, so we reverse.
+ for ican in reversed(icans):
+ if not (all_yanked and pinned) and
+ continue
+ func = functools.partial(
+ self._make_candidate_from_link,
+ extras=extras,
+ template=template,
+ name=name,
+ version=ican.version,
+ )
+ yield ican.version, func
+ return FoundCandidates(
+ iter_index_candidate_infos,
+ _get_installed_candidate(),
+ prefers_installed,
+ incompatible_ids,
+ )
+ def _iter_explicit_candidates_from_base(
+ self,
+ base_requirements: Iterable[Requirement],
+ extras: FrozenSet[str],
+ ) -> Iterator[Candidate]:
+ """Produce explicit candidates from the base given an extra-ed package.
+ :param base_requirements: Requirements known to the resolver. The
+ requirements are guaranteed to not have extras.
+ :param extras: The extras to inject into the explicit requirements'
+ candidates.
+ """
+ for req in base_requirements:
+ lookup_cand, _ = req.get_candidate_lookup()
+ if lookup_cand is None: # Not explicit.
+ continue
+ # We've stripped extras from the identifier, and should always
+ # get a BaseCandidate here, unless there's a bug elsewhere.
+ base_cand = as_base_candidate(lookup_cand)
+ assert base_cand is not None, "no extras here"
+ yield self._make_extras_candidate(base_cand, extras)
+ def _iter_candidates_from_constraints(
+ self,
+ identifier: str,
+ constraint: Constraint,
+ template: InstallRequirement,
+ ) -> Iterator[Candidate]:
+ """Produce explicit candidates from constraints.
+ This creates "fake" InstallRequirement objects that are basically clones
+ of what "should" be the template, but with original_link set to link.
+ """
+ for link in constraint.links:
+ self._fail_if_link_is_unsupported_wheel(link)
+ candidate = self._make_candidate_from_link(
+ link,
+ extras=frozenset(),
+ template=install_req_from_link_and_ireq(link, template),
+ name=canonicalize_name(identifier),
+ version=None,
+ )
+ if candidate:
+ yield candidate
+ def find_candidates(
+ self,
+ identifier: str,
+ requirements: Mapping[str, Iterable[Requirement]],
+ incompatibilities: Mapping[str, Iterator[Candidate]],
+ constraint: Constraint,
+ prefers_installed: bool,
+ ) -> Iterable[Candidate]:
+ # Collect basic lookup information from the requirements.
+ explicit_candidates: Set[Candidate] = set()
+ ireqs: List[InstallRequirement] = []
+ for req in requirements[identifier]:
+ cand, ireq = req.get_candidate_lookup()
+ if cand is not None:
+ explicit_candidates.add(cand)
+ if ireq is not None:
+ ireqs.append(ireq)
+ # If the current identifier contains extras, add explicit candidates
+ # from entries from extra-less identifier.
+ with contextlib.suppress(InvalidRequirement):
+ parsed_requirement = get_requirement(identifier)
+ explicit_candidates.update(
+ self._iter_explicit_candidates_from_base(
+ requirements.get(, ()),
+ frozenset(parsed_requirement.extras),
+ ),
+ )
+ # Add explicit candidates from constraints. We only do this if there are
+ # known ireqs, which represent requirements not already explicit. If
+ # there are no ireqs, we're constraining already-explicit requirements,
+ # which is handled later when we return the explicit candidates.
+ if ireqs:
+ try:
+ explicit_candidates.update(
+ self._iter_candidates_from_constraints(
+ identifier,
+ constraint,
+ template=ireqs[0],
+ ),
+ )
+ except UnsupportedWheel:
+ # If we're constrained to install a wheel incompatible with the
+ # target architecture, no candidates will ever be valid.
+ return ()
+ # Since we cache all the candidates, incompatibility identification
+ # can be made quicker by comparing only the id() values.
+ incompat_ids = {id(c) for c in incompatibilities.get(identifier, ())}
+ # If none of the requirements want an explicit candidate, we can ask
+ # the finder for candidates.
+ if not explicit_candidates:
+ return self._iter_found_candidates(
+ ireqs,
+ constraint.specifier,
+ constraint.hashes,
+ prefers_installed,
+ incompat_ids,
+ )
+ return (
+ c
+ for c in explicit_candidates
+ if id(c) not in incompat_ids
+ and constraint.is_satisfied_by(c)
+ and all(req.is_satisfied_by(c) for req in requirements[identifier])
+ )
+ def _make_requirement_from_install_req(
+ self, ireq: InstallRequirement, requested_extras: Iterable[str]
+ ) -> Optional[Requirement]:
+ if not ireq.match_markers(requested_extras):
+ "Ignoring %s: markers '%s' don't match your environment",
+ ireq.markers,
+ )
+ return None
+ if not
+ return SpecifierRequirement(ireq)
+ self._fail_if_link_is_unsupported_wheel(
+ cand = self._make_candidate_from_link(
+ extras=frozenset(ireq.extras),
+ template=ireq,
+ name=canonicalize_name( if else None,
+ version=None,
+ )
+ if cand is None:
+ # There's no way we can satisfy a URL requirement if the underlying
+ # candidate fails to build. An unnamed URL must be user-supplied, so
+ # we fail eagerly. If the URL is named, an unsatisfiable requirement
+ # can make the resolver do the right thing, either backtrack (and
+ # maybe find some other requirement that's buildable) or raise a
+ # ResolutionImpossible eventually.
+ if not
+ raise self._build_failures[]
+ return UnsatisfiableRequirement(canonicalize_name(
+ return self.make_requirement_from_candidate(cand)
+ def collect_root_requirements(
+ self, root_ireqs: List[InstallRequirement]
+ ) -> CollectedRootRequirements:
+ collected = CollectedRootRequirements([], {}, {})
+ for i, ireq in enumerate(root_ireqs):
+ if ireq.constraint:
+ # Ensure we only accept valid constraints
+ problem = check_invalid_constraint_type(ireq)
+ if problem:
+ raise InstallationError(problem)
+ if not ireq.match_markers():
+ continue
+ assert, "Constraint must be named"
+ name = canonicalize_name(
+ if name in collected.constraints:
+ collected.constraints[name] &= ireq
+ else:
+ collected.constraints[name] = Constraint.from_ireq(ireq)
+ else:
+ req = self._make_requirement_from_install_req(
+ ireq,
+ requested_extras=(),
+ )
+ if req is None:
+ continue
+ if ireq.user_supplied and not in collected.user_requested:
+ collected.user_requested[] = i
+ collected.requirements.append(req)
+ return collected
+ def make_requirement_from_candidate(
+ self, candidate: Candidate
+ ) -> ExplicitRequirement:
+ return ExplicitRequirement(candidate)
+ def make_requirement_from_spec(
+ self,
+ specifier: str,
+ comes_from: Optional[InstallRequirement],
+ requested_extras: Iterable[str] = (),
+ ) -> Optional[Requirement]:
+ ireq = self._make_install_req_from_spec(specifier, comes_from)
+ return self._make_requirement_from_install_req(ireq, requested_extras)
+ def make_requires_python_requirement(
+ self,
+ specifier: SpecifierSet,
+ ) -> Optional[Requirement]:
+ if self._ignore_requires_python:
+ return None
+ # Don't bother creating a dependency for an empty Requires-Python.
+ if not str(specifier):
+ return None
+ return RequiresPythonRequirement(specifier, self._python_candidate)
+ def get_wheel_cache_entry(
+ self, link: Link, name: Optional[str]
+ ) -> Optional[CacheEntry]:
+ """Look up the link in the wheel cache.
+ If ``preparer.require_hashes`` is True, don't use the wheel cache,
+ because cached wheels, always built locally, have different hashes
+ than the files downloaded from the index server and thus throw false
+ hash mismatches. Furthermore, cached wheels at present have
+ nondeterministic contents due to file modification times.
+ """
+ if self._wheel_cache is None or self.preparer.require_hashes:
+ return None
+ return self._wheel_cache.get_cache_entry(
+ link=link,
+ package_name=name,
+ supported_tags=get_supported(),
+ )
+ def get_dist_to_uninstall(self, candidate: Candidate) -> Optional[BaseDistribution]:
+ # TODO: Are there more cases this needs to return True? Editable?
+ dist = self._installed_dists.get(candidate.project_name)
+ if dist is None: # Not installed, no uninstallation required.
+ return None
+ # We're installing into global site. The current installation must
+ # be uninstalled, no matter it's in global or user site, because the
+ # user site installation has precedence over global.
+ if not self._use_user_site:
+ return dist
+ # We're installing into user site. Remove the user site installation.
+ if dist.in_usersite:
+ return dist
+ # We're installing into user site, but the installed incompatible
+ # package is in global site. We can't uninstall that, and would let
+ # the new user installation to "shadow" it. But shadowing won't work
+ # in virtual environments, so we error out.
+ if running_under_virtualenv() and dist.in_site_packages:
+ message = (
+ f"Will not install to the user site because it will lack "
+ f"sys.path precedence to {dist.raw_name} in {dist.location}"
+ )
+ raise InstallationError(message)
+ return None
+ def _report_requires_python_error(
+ self, causes: Sequence["ConflictCause"]
+ ) -> UnsupportedPythonVersion:
+ assert causes, "Requires-Python error reported with no cause"
+ version = self._python_candidate.version
+ if len(causes) == 1:
+ specifier = str(causes[0].requirement.specifier)
+ message = (
+ f"Package {causes[0]!r} requires a different "
+ f"Python: {version} not in {specifier!r}"
+ )
+ return UnsupportedPythonVersion(message)
+ message = f"Packages require a different Python. {version} not in:"
+ for cause in causes:
+ package = cause.parent.format_for_error()
+ specifier = str(cause.requirement.specifier)
+ message += f"\n{specifier!r} (required by {package})"
+ return UnsupportedPythonVersion(message)
+ def _report_single_requirement_conflict(
+ self, req: Requirement, parent: Optional[Candidate]
+ ) -> DistributionNotFound:
+ if parent is None:
+ req_disp = str(req)
+ else:
+ req_disp = f"{req} (from {})"
+ cands = self._finder.find_all_candidates(req.project_name)
+ skipped_by_requires_python = self._finder.requires_python_skipped_reasons()
+ versions = [str(v) for v in sorted({c.version for c in cands})]
+ if skipped_by_requires_python:
+ logger.critical(
+ "Ignored the following versions that require a different python "
+ "version: %s",
+ "; ".join(skipped_by_requires_python) or "none",
+ )
+ logger.critical(
+ "Could not find a version that satisfies the requirement %s "
+ "(from versions: %s)",
+ req_disp,
+ ", ".join(versions) or "none",
+ )
+ if str(req) == "requirements.txt":
+ "HINT: You are attempting to install a package literally "
+ 'named "requirements.txt" (which cannot exist). Consider '
+ "using the '-r' flag to install the packages listed in "
+ "requirements.txt"
+ )
+ return DistributionNotFound(f"No matching distribution found for {req}")
+ def get_installation_error(
+ self,
+ e: "ResolutionImpossible[Requirement, Candidate]",
+ constraints: Dict[str, Constraint],
+ ) -> InstallationError:
+ assert e.causes, "Installation error reported with no cause"
+ # If one of the things we can't solve is "we need Python X.Y",
+ # that is what we report.
+ requires_python_causes = [
+ cause
+ for cause in e.causes
+ if isinstance(cause.requirement, RequiresPythonRequirement)
+ and not cause.requirement.is_satisfied_by(self._python_candidate)
+ ]
+ if requires_python_causes:
+ # The comprehension above makes sure all Requirement instances are
+ # RequiresPythonRequirement, so let's cast for convenience.
+ return self._report_requires_python_error(
+ cast("Sequence[ConflictCause]", requires_python_causes),
+ )
+ # Otherwise, we have a set of causes which can't all be satisfied
+ # at once.
+ # The simplest case is when we have *one* cause that can't be
+ # satisfied. We just report that case.
+ if len(e.causes) == 1:
+ req, parent = e.causes[0]
+ if not in constraints:
+ return self._report_single_requirement_conflict(req, parent)
+ # OK, we now have a list of requirements that can't all be
+ # satisfied at once.
+ # A couple of formatting helpers
+ def text_join(parts: List[str]) -> str:
+ if len(parts) == 1:
+ return parts[0]
+ return ", ".join(parts[:-1]) + " and " + parts[-1]
+ def describe_trigger(parent: Candidate) -> str:
+ ireq = parent.get_install_requirement()
+ if not ireq or not ireq.comes_from:
+ return f"{}=={parent.version}"
+ if isinstance(ireq.comes_from, InstallRequirement):
+ return str(
+ return str(ireq.comes_from)
+ triggers = set()
+ for req, parent in e.causes:
+ if parent is None:
+ # This is a root requirement, so we can report it directly
+ trigger = req.format_for_error()
+ else:
+ trigger = describe_trigger(parent)
+ triggers.add(trigger)
+ if triggers:
+ info = text_join(sorted(triggers))
+ else:
+ info = "the requested packages"
+ msg = (
+ "Cannot install {} because these package versions "
+ "have conflicting dependencies.".format(info)
+ )
+ logger.critical(msg)
+ msg = "\nThe conflict is caused by:"
+ relevant_constraints = set()
+ for req, parent in e.causes:
+ if in constraints:
+ relevant_constraints.add(
+ msg = msg + "\n "
+ if parent:
+ msg = msg + f"{} {parent.version} depends on "
+ else:
+ msg = msg + "The user requested "
+ msg = msg + req.format_for_error()
+ for key in relevant_constraints:
+ spec = constraints[key].specifier
+ msg += f"\n The user requested (constraint) {key}{spec}"
+ msg = (
+ msg
+ + "\n\n"
+ + "To fix this you could try to:\n"
+ + "1. loosen the range of package versions you've specified\n"
+ + "2. remove package versions to allow pip attempt to solve "
+ + "the dependency conflict\n"
+ )
+ return DistributionNotFound(
+ "ResolutionImpossible: for help visit "
+ ""
+ "#dealing-with-dependency-conflicts"
+ )