summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/dependency_resolution
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/galaxy/dependency_resolution')
-rw-r--r--lib/ansible/galaxy/dependency_resolution/__init__.py55
-rw-r--r--lib/ansible/galaxy/dependency_resolution/dataclasses.py573
-rw-r--r--lib/ansible/galaxy/dependency_resolution/errors.py19
-rw-r--r--lib/ansible/galaxy/dependency_resolution/providers.py548
-rw-r--r--lib/ansible/galaxy/dependency_resolution/reporters.py21
-rw-r--r--lib/ansible/galaxy/dependency_resolution/resolvers.py21
-rw-r--r--lib/ansible/galaxy/dependency_resolution/versioning.py70
7 files changed, 1307 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/dependency_resolution/__init__.py b/lib/ansible/galaxy/dependency_resolution/__init__.py
new file mode 100644
index 0000000..cfde7df
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/__init__.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Dependency resolution machinery."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import typing as t
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.api import GalaxyAPI
+ from ansible.galaxy.collection.concrete_artifact_manager import (
+ ConcreteArtifactsManager,
+ )
+ from ansible.galaxy.dependency_resolution.dataclasses import (
+ Candidate,
+ Requirement,
+ )
+
+from ansible.galaxy.collection.galaxy_api_proxy import MultiGalaxyAPIProxy
+from ansible.galaxy.dependency_resolution.providers import CollectionDependencyProvider
+from ansible.galaxy.dependency_resolution.reporters import CollectionDependencyReporter
+from ansible.galaxy.dependency_resolution.resolvers import CollectionDependencyResolver
+
+
+def build_collection_dependency_resolver(
+ galaxy_apis, # type: t.Iterable[GalaxyAPI]
+ concrete_artifacts_manager, # type: ConcreteArtifactsManager
+ user_requirements, # type: t.Iterable[Requirement]
+ preferred_candidates=None, # type: t.Iterable[Candidate]
+ with_deps=True, # type: bool
+ with_pre_releases=False, # type: bool
+ upgrade=False, # type: bool
+ include_signatures=True, # type: bool
+ offline=False, # type: bool
+): # type: (...) -> CollectionDependencyResolver
+ """Return a collection dependency resolver.
+
+ The returned instance will have a ``resolve()`` method for
+ further consumption.
+ """
+ return CollectionDependencyResolver(
+ CollectionDependencyProvider(
+ apis=MultiGalaxyAPIProxy(galaxy_apis, concrete_artifacts_manager, offline=offline),
+ concrete_artifacts_manager=concrete_artifacts_manager,
+ user_requirements=user_requirements,
+ preferred_candidates=preferred_candidates,
+ with_deps=with_deps,
+ with_pre_releases=with_pre_releases,
+ upgrade=upgrade,
+ include_signatures=include_signatures,
+ ),
+ CollectionDependencyReporter(),
+ )
diff --git a/lib/ansible/galaxy/dependency_resolution/dataclasses.py b/lib/ansible/galaxy/dependency_resolution/dataclasses.py
new file mode 100644
index 0000000..16fd631
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/dataclasses.py
@@ -0,0 +1,573 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Dependency structs."""
+# FIXME: add caching all over the place
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import typing as t
+
+from collections import namedtuple
+from collections.abc import MutableSequence, MutableMapping
+from glob import iglob
+from urllib.parse import urlparse
+from yaml import safe_load
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.collection.concrete_artifact_manager import (
+ ConcreteArtifactsManager,
+ )
+ Collection = t.TypeVar(
+ 'Collection',
+ 'Candidate', 'Requirement',
+ '_ComputedReqKindsMixin',
+ )
+
+
+from ansible.errors import AnsibleError
+from ansible.galaxy.api import GalaxyAPI
+from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.common.arg_spec import ArgumentSpecValidator
+from ansible.utils.collection_loader import AnsibleCollectionRef
+from ansible.utils.display import Display
+
+
+_ALLOW_CONCRETE_POINTER_IN_SOURCE = False # NOTE: This is a feature flag
+_GALAXY_YAML = b'galaxy.yml'
+_MANIFEST_JSON = b'MANIFEST.json'
+_SOURCE_METADATA_FILE = b'GALAXY.yml'
+
+display = Display()
+
+
+def get_validated_source_info(b_source_info_path, namespace, name, version):
+ source_info_path = to_text(b_source_info_path, errors='surrogate_or_strict')
+
+ if not os.path.isfile(b_source_info_path):
+ return None
+
+ try:
+ with open(b_source_info_path, mode='rb') as fd:
+ metadata = safe_load(fd)
+ except OSError as e:
+ display.warning(
+ f"Error getting collection source information at '{source_info_path}': {to_text(e, errors='surrogate_or_strict')}"
+ )
+ return None
+
+ if not isinstance(metadata, MutableMapping):
+ display.warning(f"Error getting collection source information at '{source_info_path}': expected a YAML dictionary")
+ return None
+
+ schema_errors = _validate_v1_source_info_schema(namespace, name, version, metadata)
+ if schema_errors:
+ display.warning(f"Ignoring source metadata file at {source_info_path} due to the following errors:")
+ display.warning("\n".join(schema_errors))
+ display.warning("Correct the source metadata file by reinstalling the collection.")
+ return None
+
+ return metadata
+
+
+def _validate_v1_source_info_schema(namespace, name, version, provided_arguments):
+ argument_spec_data = dict(
+ format_version=dict(choices=["1.0.0"]),
+ download_url=dict(),
+ version_url=dict(),
+ server=dict(),
+ signatures=dict(
+ type=list,
+ suboptions=dict(
+ signature=dict(),
+ pubkey_fingerprint=dict(),
+ signing_service=dict(),
+ pulp_created=dict(),
+ )
+ ),
+ name=dict(choices=[name]),
+ namespace=dict(choices=[namespace]),
+ version=dict(choices=[version]),
+ )
+
+ if not isinstance(provided_arguments, dict):
+ raise AnsibleError(
+ f'Invalid offline source info for {namespace}.{name}:{version}, expected a dict and got {type(provided_arguments)}'
+ )
+ validator = ArgumentSpecValidator(argument_spec_data)
+ validation_result = validator.validate(provided_arguments)
+
+ return validation_result.error_messages
+
+
+def _is_collection_src_dir(dir_path):
+ b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
+ return os.path.isfile(os.path.join(b_dir_path, _GALAXY_YAML))
+
+
+def _is_installed_collection_dir(dir_path):
+ b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
+ return os.path.isfile(os.path.join(b_dir_path, _MANIFEST_JSON))
+
+
+def _is_collection_dir(dir_path):
+ return (
+ _is_installed_collection_dir(dir_path) or
+ _is_collection_src_dir(dir_path)
+ )
+
+
+def _find_collections_in_subdirs(dir_path):
+ b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
+
+ subdir_glob_pattern = os.path.join(
+ b_dir_path,
+ # b'*', # namespace is supposed to be top-level per spec
+ b'*', # collection name
+ )
+
+ for subdir in iglob(subdir_glob_pattern):
+ if os.path.isfile(os.path.join(subdir, _MANIFEST_JSON)):
+ yield subdir
+ elif os.path.isfile(os.path.join(subdir, _GALAXY_YAML)):
+ yield subdir
+
+
+def _is_collection_namespace_dir(tested_str):
+ return any(_find_collections_in_subdirs(tested_str))
+
+
+def _is_file_path(tested_str):
+ return os.path.isfile(to_bytes(tested_str, errors='surrogate_or_strict'))
+
+
+def _is_http_url(tested_str):
+ return urlparse(tested_str).scheme.lower() in {'http', 'https'}
+
+
+def _is_git_url(tested_str):
+ return tested_str.startswith(('git+', 'git@'))
+
+
+def _is_concrete_artifact_pointer(tested_str):
+ return any(
+ predicate(tested_str)
+ for predicate in (
+ # NOTE: Maintain the checks to be sorted from light to heavy:
+ _is_git_url,
+ _is_http_url,
+ _is_file_path,
+ _is_collection_dir,
+ _is_collection_namespace_dir,
+ )
+ )
+
+
+class _ComputedReqKindsMixin:
+
+ def __init__(self, *args, **kwargs):
+ if not self.may_have_offline_galaxy_info:
+ self._source_info = None
+ else:
+ info_path = self.construct_galaxy_info_path(to_bytes(self.src, errors='surrogate_or_strict'))
+
+ self._source_info = get_validated_source_info(
+ info_path,
+ self.namespace,
+ self.name,
+ self.ver
+ )
+
+ @classmethod
+ def from_dir_path_as_unknown( # type: ignore[misc]
+ cls, # type: t.Type[Collection]
+ dir_path, # type: bytes
+ art_mgr, # type: ConcreteArtifactsManager
+ ): # type: (...) -> Collection
+ """Make collection from an unspecified dir type.
+
+ This alternative constructor attempts to grab metadata from the
+ given path if it's a directory. If there's no metadata, it
+ falls back to guessing the FQCN based on the directory path and
+ sets the version to "*".
+
+ It raises a ValueError immediately if the input is not an
+ existing directory path.
+ """
+ if not os.path.isdir(dir_path):
+ raise ValueError(
+ "The collection directory '{path!s}' doesn't exist".
+ format(path=to_native(dir_path)),
+ )
+
+ try:
+ return cls.from_dir_path(dir_path, art_mgr)
+ except ValueError:
+ return cls.from_dir_path_implicit(dir_path)
+
+ @classmethod
+ def from_dir_path(cls, dir_path, art_mgr):
+ """Make collection from an directory with metadata."""
+ b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
+ if not _is_collection_dir(b_dir_path):
+ display.warning(
+ u"Collection at '{path!s}' does not have a {manifest_json!s} "
+ u'file, nor has it {galaxy_yml!s}: cannot detect version.'.
+ format(
+ galaxy_yml=to_text(_GALAXY_YAML),
+ manifest_json=to_text(_MANIFEST_JSON),
+ path=to_text(dir_path, errors='surrogate_or_strict'),
+ ),
+ )
+ raise ValueError(
+ '`dir_path` argument must be an installed or a source'
+ ' collection directory.',
+ )
+
+ tmp_inst_req = cls(None, None, dir_path, 'dir', None)
+ req_version = art_mgr.get_direct_collection_version(tmp_inst_req)
+ try:
+ req_name = art_mgr.get_direct_collection_fqcn(tmp_inst_req)
+ except TypeError as err:
+ # Looks like installed/source dir but isn't: doesn't have valid metadata.
+ display.warning(
+ u"Collection at '{path!s}' has a {manifest_json!s} "
+ u"or {galaxy_yml!s} file but it contains invalid metadata.".
+ format(
+ galaxy_yml=to_text(_GALAXY_YAML),
+ manifest_json=to_text(_MANIFEST_JSON),
+ path=to_text(dir_path, errors='surrogate_or_strict'),
+ ),
+ )
+ raise ValueError(
+ "Collection at '{path!s}' has invalid metadata".
+ format(path=to_text(dir_path, errors='surrogate_or_strict'))
+ ) from err
+
+ return cls(req_name, req_version, dir_path, 'dir', None)
+
+ @classmethod
+ def from_dir_path_implicit( # type: ignore[misc]
+ cls, # type: t.Type[Collection]
+ dir_path, # type: bytes
+ ): # type: (...) -> Collection
+ """Construct a collection instance based on an arbitrary dir.
+
+ This alternative constructor infers the FQCN based on the parent
+ and current directory names. It also sets the version to "*"
+ regardless of whether any of known metadata files are present.
+ """
+ # There is no metadata, but it isn't required for a functional collection. Determine the namespace.name from the path.
+ u_dir_path = to_text(dir_path, errors='surrogate_or_strict')
+ path_list = u_dir_path.split(os.path.sep)
+ req_name = '.'.join(path_list[-2:])
+ return cls(req_name, '*', dir_path, 'dir', None) # type: ignore[call-arg]
+
+ @classmethod
+ def from_string(cls, collection_input, artifacts_manager, supplemental_signatures):
+ req = {}
+ if _is_concrete_artifact_pointer(collection_input):
+ # Arg is a file path or URL to a collection
+ req['name'] = collection_input
+ else:
+ req['name'], _sep, req['version'] = collection_input.partition(':')
+ if not req['version']:
+ del req['version']
+ req['signatures'] = supplemental_signatures
+
+ return cls.from_requirement_dict(req, artifacts_manager)
+
+ @classmethod
+ def from_requirement_dict(cls, collection_req, art_mgr, validate_signature_options=True):
+ req_name = collection_req.get('name', None)
+ req_version = collection_req.get('version', '*')
+ req_type = collection_req.get('type')
+ # TODO: decide how to deprecate the old src API behavior
+ req_source = collection_req.get('source', None)
+ req_signature_sources = collection_req.get('signatures', None)
+ if req_signature_sources is not None:
+ if validate_signature_options and art_mgr.keyring is None:
+ raise AnsibleError(
+ f"Signatures were provided to verify {req_name} but no keyring was configured."
+ )
+
+ if not isinstance(req_signature_sources, MutableSequence):
+ req_signature_sources = [req_signature_sources]
+ req_signature_sources = frozenset(req_signature_sources)
+
+ if req_type is None:
+ if ( # FIXME: decide on the future behavior:
+ _ALLOW_CONCRETE_POINTER_IN_SOURCE
+ and req_source is not None
+ and _is_concrete_artifact_pointer(req_source)
+ ):
+ src_path = req_source
+ elif (
+ req_name is not None
+ and AnsibleCollectionRef.is_valid_collection_name(req_name)
+ ):
+ req_type = 'galaxy'
+ elif (
+ req_name is not None
+ and _is_concrete_artifact_pointer(req_name)
+ ):
+ src_path, req_name = req_name, None
+ else:
+ dir_tip_tmpl = ( # NOTE: leading LFs are for concat
+ '\n\nTip: Make sure you are pointing to the right '
+ 'subdirectory — `{src!s}` looks like a directory '
+ 'but it is neither a collection, nor a namespace '
+ 'dir.'
+ )
+
+ if req_source is not None and os.path.isdir(req_source):
+ tip = dir_tip_tmpl.format(src=req_source)
+ elif req_name is not None and os.path.isdir(req_name):
+ tip = dir_tip_tmpl.format(src=req_name)
+ elif req_name:
+ tip = '\n\nCould not find {0}.'.format(req_name)
+ else:
+ tip = ''
+
+ raise AnsibleError( # NOTE: I'd prefer a ValueError instead
+ 'Neither the collection requirement entry key '
+ "'name', nor 'source' point to a concrete "
+ "resolvable collection artifact. Also 'name' is "
+ 'not an FQCN. A valid collection name must be in '
+ 'the format <namespace>.<collection>. Please make '
+ 'sure that the namespace and the collection name '
+ 'contain characters from [a-zA-Z0-9_] only.'
+ '{extra_tip!s}'.format(extra_tip=tip),
+ )
+
+ if req_type is None:
+ if _is_git_url(src_path):
+ req_type = 'git'
+ req_source = src_path
+ elif _is_http_url(src_path):
+ req_type = 'url'
+ req_source = src_path
+ elif _is_file_path(src_path):
+ req_type = 'file'
+ req_source = src_path
+ elif _is_collection_dir(src_path):
+ if _is_installed_collection_dir(src_path) and _is_collection_src_dir(src_path):
+ # Note that ``download`` requires a dir with a ``galaxy.yml`` and fails if it
+ # doesn't exist, but if a ``MANIFEST.json`` also exists, it would be used
+ # instead of the ``galaxy.yml``.
+ raise AnsibleError(
+ u"Collection requirement at '{path!s}' has both a {manifest_json!s} "
+ u"file and a {galaxy_yml!s}.\nThe requirement must either be an installed "
+ u"collection directory or a source collection directory, not both.".
+ format(
+ path=to_text(src_path, errors='surrogate_or_strict'),
+ manifest_json=to_text(_MANIFEST_JSON),
+ galaxy_yml=to_text(_GALAXY_YAML),
+ )
+ )
+ req_type = 'dir'
+ req_source = src_path
+ elif _is_collection_namespace_dir(src_path):
+ req_name = None # No name for a virtual req or "namespace."?
+ req_type = 'subdirs'
+ req_source = src_path
+ else:
+ raise AnsibleError( # NOTE: this is never supposed to be hit
+ 'Failed to automatically detect the collection '
+ 'requirement type.',
+ )
+
+ if req_type not in {'file', 'galaxy', 'git', 'url', 'dir', 'subdirs'}:
+ raise AnsibleError(
+ "The collection requirement entry key 'type' must be "
+ 'one of file, galaxy, git, dir, subdirs, or url.'
+ )
+
+ if req_name is None and req_type == 'galaxy':
+ raise AnsibleError(
+ 'Collections requirement entry should contain '
+ "the key 'name' if it's requested from a Galaxy-like "
+ 'index server.',
+ )
+
+ if req_type != 'galaxy' and req_source is None:
+ req_source, req_name = req_name, None
+
+ if (
+ req_type == 'galaxy' and
+ isinstance(req_source, GalaxyAPI) and
+ not _is_http_url(req_source.api_server)
+ ):
+ raise AnsibleError(
+ "Collections requirement 'source' entry should contain "
+ 'a valid Galaxy API URL but it does not: {not_url!s} '
+ 'is not an HTTP URL.'.
+ format(not_url=req_source.api_server),
+ )
+
+ tmp_inst_req = cls(req_name, req_version, req_source, req_type, req_signature_sources)
+
+ if req_type not in {'galaxy', 'subdirs'} and req_name is None:
+ req_name = art_mgr.get_direct_collection_fqcn(tmp_inst_req) # TODO: fix the cache key in artifacts manager?
+
+ if req_type not in {'galaxy', 'subdirs'} and req_version == '*':
+ req_version = art_mgr.get_direct_collection_version(tmp_inst_req)
+
+ return cls(
+ req_name, req_version,
+ req_source, req_type,
+ req_signature_sources,
+ )
+
+ def __repr__(self):
+ return (
+ '<{self!s} of type {coll_type!r} from {src!s}>'.
+ format(self=self, coll_type=self.type, src=self.src or 'Galaxy')
+ )
+
+ def __str__(self):
+ return to_native(self.__unicode__())
+
+ def __unicode__(self):
+ if self.fqcn is None:
+ return (
+ u'"virtual collection Git repo"' if self.is_scm
+ else u'"virtual collection namespace"'
+ )
+
+ return (
+ u'{fqcn!s}:{ver!s}'.
+ format(fqcn=to_text(self.fqcn), ver=to_text(self.ver))
+ )
+
+ @property
+ def may_have_offline_galaxy_info(self):
+ if self.fqcn is None:
+ # Virtual collection
+ return False
+ elif not self.is_dir or self.src is None or not _is_collection_dir(self.src):
+ # Not a dir or isn't on-disk
+ return False
+ return True
+
+ def construct_galaxy_info_path(self, b_collection_path):
+ if not self.may_have_offline_galaxy_info and not self.type == 'galaxy':
+ raise TypeError('Only installed collections from a Galaxy server have offline Galaxy info')
+
+ # Store Galaxy metadata adjacent to the namespace of the collection
+ # Chop off the last two parts of the path (/ns/coll) to get the dir containing the ns
+ b_src = to_bytes(b_collection_path, errors='surrogate_or_strict')
+ b_path_parts = b_src.split(to_bytes(os.path.sep))[0:-2]
+ b_metadata_dir = to_bytes(os.path.sep).join(b_path_parts)
+
+ # ns.coll-1.0.0.info
+ b_dir_name = to_bytes(f"{self.namespace}.{self.name}-{self.ver}.info", errors="surrogate_or_strict")
+
+ # collections/ansible_collections/ns.coll-1.0.0.info/GALAXY.yml
+ return os.path.join(b_metadata_dir, b_dir_name, _SOURCE_METADATA_FILE)
+
+ def _get_separate_ns_n_name(self): # FIXME: use LRU cache
+ return self.fqcn.split('.')
+
+ @property
+ def namespace(self):
+ if self.is_virtual:
+ raise TypeError('Virtual collections do not have a namespace')
+
+ return self._get_separate_ns_n_name()[0]
+
+ @property
+ def name(self):
+ if self.is_virtual:
+ raise TypeError('Virtual collections do not have a name')
+
+ return self._get_separate_ns_n_name()[-1]
+
+ @property
+ def canonical_package_id(self):
+ if not self.is_virtual:
+ return to_native(self.fqcn)
+
+ return (
+ '<virtual namespace from {src!s} of type {src_type!s}>'.
+ format(src=to_native(self.src), src_type=to_native(self.type))
+ )
+
+ @property
+ def is_virtual(self):
+ return self.is_scm or self.is_subdirs
+
+ @property
+ def is_file(self):
+ return self.type == 'file'
+
+ @property
+ def is_dir(self):
+ return self.type == 'dir'
+
+ @property
+ def namespace_collection_paths(self):
+ return [
+ to_native(path)
+ for path in _find_collections_in_subdirs(self.src)
+ ]
+
+ @property
+ def is_subdirs(self):
+ return self.type == 'subdirs'
+
+ @property
+ def is_url(self):
+ return self.type == 'url'
+
+ @property
+ def is_scm(self):
+ return self.type == 'git'
+
+ @property
+ def is_concrete_artifact(self):
+ return self.type in {'git', 'url', 'file', 'dir', 'subdirs'}
+
+ @property
+ def is_online_index_pointer(self):
+ return not self.is_concrete_artifact
+
+ @property
+ def source_info(self):
+ return self._source_info
+
+
+RequirementNamedTuple = namedtuple('Requirement', ('fqcn', 'ver', 'src', 'type', 'signature_sources')) # type: ignore[name-match]
+
+
+CandidateNamedTuple = namedtuple('Candidate', ('fqcn', 'ver', 'src', 'type', 'signatures')) # type: ignore[name-match]
+
+
+class Requirement(
+ _ComputedReqKindsMixin,
+ RequirementNamedTuple,
+):
+ """An abstract requirement request."""
+
+ def __new__(cls, *args, **kwargs):
+ self = RequirementNamedTuple.__new__(cls, *args, **kwargs)
+ return self
+
+ def __init__(self, *args, **kwargs):
+ super(Requirement, self).__init__()
+
+
+class Candidate(
+ _ComputedReqKindsMixin,
+ CandidateNamedTuple,
+):
+ """A concrete collection candidate with its version resolved."""
+
+ def __new__(cls, *args, **kwargs):
+ self = CandidateNamedTuple.__new__(cls, *args, **kwargs)
+ return self
+
+ def __init__(self, *args, **kwargs):
+ super(Candidate, self).__init__()
diff --git a/lib/ansible/galaxy/dependency_resolution/errors.py b/lib/ansible/galaxy/dependency_resolution/errors.py
new file mode 100644
index 0000000..ae3b439
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/errors.py
@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Dependency resolution exceptions."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+try:
+ from resolvelib.resolvers import (
+ ResolutionImpossible as CollectionDependencyResolutionImpossible,
+ InconsistentCandidate as CollectionDependencyInconsistentCandidate,
+ )
+except ImportError:
+ class CollectionDependencyResolutionImpossible(Exception): # type: ignore[no-redef]
+ pass
+
+ class CollectionDependencyInconsistentCandidate(Exception): # type: ignore[no-redef]
+ pass
diff --git a/lib/ansible/galaxy/dependency_resolution/providers.py b/lib/ansible/galaxy/dependency_resolution/providers.py
new file mode 100644
index 0000000..817a1eb
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/providers.py
@@ -0,0 +1,548 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Requirement provider interfaces."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import functools
+import typing as t
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.collection.concrete_artifact_manager import (
+ ConcreteArtifactsManager,
+ )
+ from ansible.galaxy.collection.galaxy_api_proxy import MultiGalaxyAPIProxy
+ from ansible.galaxy.api import GalaxyAPI
+
+from ansible.galaxy.collection.gpg import get_signature_from_source
+from ansible.galaxy.dependency_resolution.dataclasses import (
+ Candidate,
+ Requirement,
+)
+from ansible.galaxy.dependency_resolution.versioning import (
+ is_pre_release,
+ meets_requirements,
+)
+from ansible.module_utils.six import string_types
+from ansible.utils.version import SemanticVersion, LooseVersion
+
+from collections.abc import Set
+
+try:
+ from resolvelib import AbstractProvider
+ from resolvelib import __version__ as resolvelib_version
+except ImportError:
+ class AbstractProvider: # type: ignore[no-redef]
+ pass
+
+ resolvelib_version = '0.0.0'
+
+
+# TODO: add python requirements to ansible-test's ansible-core distribution info and remove the hardcoded lowerbound/upperbound fallback
+RESOLVELIB_LOWERBOUND = SemanticVersion("0.5.3")
+RESOLVELIB_UPPERBOUND = SemanticVersion("0.9.0")
+RESOLVELIB_VERSION = SemanticVersion.from_loose_version(LooseVersion(resolvelib_version))
+
+
+class PinnedCandidateRequests(Set):
+ """Custom set class to store Candidate objects. Excludes the 'signatures' attribute when determining if a Candidate instance is in the set."""
+ CANDIDATE_ATTRS = ('fqcn', 'ver', 'src', 'type')
+
+ def __init__(self, candidates):
+ self._candidates = set(candidates)
+
+ def __iter__(self):
+ return iter(self._candidates)
+
+ def __contains__(self, value):
+ if not isinstance(value, Candidate):
+ raise ValueError(f"Expected a Candidate object but got {value!r}")
+ for candidate in self._candidates:
+ # Compare Candidate attributes excluding "signatures" since it is
+ # unrelated to whether or not a matching Candidate is user-requested.
+ # Candidate objects in the set are not expected to have signatures.
+ for attr in PinnedCandidateRequests.CANDIDATE_ATTRS:
+ if getattr(value, attr) != getattr(candidate, attr):
+ break
+ else:
+ return True
+ return False
+
+ def __len__(self):
+ return len(self._candidates)
+
+
+class CollectionDependencyProviderBase(AbstractProvider):
+ """Delegate providing a requirement interface for the resolver."""
+
+ def __init__(
+ self, # type: CollectionDependencyProviderBase
+ apis, # type: MultiGalaxyAPIProxy
+ concrete_artifacts_manager=None, # type: ConcreteArtifactsManager
+ user_requirements=None, # type: t.Iterable[Requirement]
+ preferred_candidates=None, # type: t.Iterable[Candidate]
+ with_deps=True, # type: bool
+ with_pre_releases=False, # type: bool
+ upgrade=False, # type: bool
+ include_signatures=True, # type: bool
+ ): # type: (...) -> None
+ r"""Initialize helper attributes.
+
+ :param api: An instance of the multiple Galaxy APIs wrapper.
+
+ :param concrete_artifacts_manager: An instance of the caching \
+ concrete artifacts manager.
+
+ :param with_deps: A flag specifying whether the resolver \
+ should attempt to pull-in the deps of the \
+ requested requirements. On by default.
+
+ :param with_pre_releases: A flag specifying whether the \
+ resolver should skip pre-releases. \
+ Off by default.
+
+ :param upgrade: A flag specifying whether the resolver should \
+ skip matching versions that are not upgrades. \
+ Off by default.
+
+ :param include_signatures: A flag to determine whether to retrieve \
+ signatures from the Galaxy APIs and \
+ include signatures in matching Candidates. \
+ On by default.
+ """
+ self._api_proxy = apis
+ self._make_req_from_dict = functools.partial(
+ Requirement.from_requirement_dict,
+ art_mgr=concrete_artifacts_manager,
+ )
+ self._pinned_candidate_requests = PinnedCandidateRequests(
+ # NOTE: User-provided signatures are supplemental, so signatures
+ # NOTE: are not used to determine if a candidate is user-requested
+ Candidate(req.fqcn, req.ver, req.src, req.type, None)
+ for req in (user_requirements or ())
+ if req.is_concrete_artifact or (
+ req.ver != '*' and
+ not req.ver.startswith(('<', '>', '!='))
+ )
+ )
+ self._preferred_candidates = set(preferred_candidates or ())
+ self._with_deps = with_deps
+ self._with_pre_releases = with_pre_releases
+ self._upgrade = upgrade
+ self._include_signatures = include_signatures
+
+ def _is_user_requested(self, candidate): # type: (Candidate) -> bool
+ """Check if the candidate is requested by the user."""
+ if candidate in self._pinned_candidate_requests:
+ return True
+
+ if candidate.is_online_index_pointer and candidate.src is not None:
+ # NOTE: Candidate is a namedtuple, it has a source server set
+ # NOTE: to a specific GalaxyAPI instance or `None`. When the
+ # NOTE: user runs
+ # NOTE:
+ # NOTE: $ ansible-galaxy collection install ns.coll
+ # NOTE:
+ # NOTE: then it's saved in `self._pinned_candidate_requests`
+ # NOTE: as `('ns.coll', '*', None, 'galaxy')` but then
+ # NOTE: `self.find_matches()` calls `self.is_satisfied_by()`
+ # NOTE: with Candidate instances bound to each specific
+ # NOTE: server available, those look like
+ # NOTE: `('ns.coll', '*', GalaxyAPI(...), 'galaxy')` and
+ # NOTE: wouldn't match the user requests saved in
+ # NOTE: `self._pinned_candidate_requests`. This is why we
+ # NOTE: normalize the collection to have `src=None` and try
+ # NOTE: again.
+ # NOTE:
+ # NOTE: When the user request comes from `requirements.yml`
+ # NOTE: with the `source:` set, it'll match the first check
+ # NOTE: but it still can have entries with `src=None` so this
+ # NOTE: normalized check is still necessary.
+ # NOTE:
+ # NOTE: User-provided signatures are supplemental, so signatures
+ # NOTE: are not used to determine if a candidate is user-requested
+ return Candidate(
+ candidate.fqcn, candidate.ver, None, candidate.type, None
+ ) in self._pinned_candidate_requests
+
+ return False
+
+ def identify(self, requirement_or_candidate):
+ # type: (t.Union[Candidate, Requirement]) -> str
+ """Given requirement or candidate, return an identifier for it.
+
+ This is used to identify a requirement or candidate, e.g.
+ whether two requirements should have their specifier parts
+ (version ranges or pins) merged, whether two candidates would
+ conflict with each other (because they have same name but
+ different versions).
+ """
+ return requirement_or_candidate.canonical_package_id
+
+ def get_preference(self, *args, **kwargs):
+ # type: (t.Any, t.Any) -> t.Union[float, int]
+ """Return sort key function return value for given requirement.
+
+ This result should be based on preference that is defined as
+ "I think this requirement should be resolved first".
+ The lower the return value is, the more preferred this
+ group of arguments is.
+
+ resolvelib >=0.5.3, <0.7.0
+
+ :param resolution: Currently pinned candidate, or ``None``.
+
+ :param candidates: A list of possible candidates.
+
+ :param information: A list of requirement information.
+
+ Each ``information`` instance is a named tuple with two entries:
+
+ * ``requirement`` specifies a requirement contributing to
+ the current candidate list
+
+ * ``parent`` specifies the candidate that provides
+ (dependend on) the requirement, or `None`
+ to indicate a root requirement.
+
+ resolvelib >=0.7.0, < 0.8.0
+
+ :param identifier: The value returned by ``identify()``.
+
+ :param resolutions: Mapping of identifier, candidate pairs.
+
+ :param candidates: Possible candidates for the identifer.
+ Mapping of identifier, list of candidate pairs.
+
+ :param information: Requirement information of each package.
+ Mapping of identifier, list of named tuple pairs.
+ The named tuples have the entries ``requirement`` and ``parent``.
+
+ resolvelib >=0.8.0, <= 0.8.1
+
+ :param identifier: The value returned by ``identify()``.
+
+ :param resolutions: Mapping of identifier, candidate pairs.
+
+ :param candidates: Possible candidates for the identifer.
+ Mapping of identifier, list of candidate pairs.
+
+ :param information: Requirement information of each package.
+ Mapping of identifier, list of named tuple pairs.
+ The named tuples have the entries ``requirement`` and ``parent``.
+
+ :param backtrack_causes: Sequence of requirement information that were
+ the requirements that caused the resolver to most recently backtrack.
+
+ The preference could depend on a various of issues, including
+ (not necessarily in this order):
+
+ * Is this package pinned in the current resolution result?
+
+ * How relaxed is the requirement? Stricter ones should
+ probably be worked on first? (I don't know, actually.)
+
+ * How many possibilities are there to satisfy this
+ requirement? Those with few left should likely be worked on
+ first, I guess?
+
+ * Are there any known conflicts for this requirement?
+ We should probably work on those with the most
+ known conflicts.
+
+ A sortable value should be returned (this will be used as the
+ `key` parameter of the built-in sorting function). The smaller
+ the value is, the more preferred this requirement is (i.e. the
+ sorting function is called with ``reverse=False``).
+ """
+ raise NotImplementedError
+
+ def _get_preference(self, candidates):
+ # type: (list[Candidate]) -> t.Union[float, int]
+ if any(
+ candidate in self._preferred_candidates
+ for candidate in candidates
+ ):
+ # NOTE: Prefer pre-installed candidates over newer versions
+ # NOTE: available from Galaxy or other sources.
+ return float('-inf')
+ return len(candidates)
+
+ def find_matches(self, *args, **kwargs):
+ # type: (t.Any, t.Any) -> list[Candidate]
+ r"""Find all possible candidates satisfying given requirements.
+
+ This tries to get candidates based on the requirements' types.
+
+ For concrete requirements (SCM, dir, namespace dir, local or
+ remote archives), the one-and-only match is returned
+
+ For a "named" requirement, Galaxy-compatible APIs are consulted
+ to find concrete candidates for this requirement. Of theres a
+ pre-installed candidate, it's prepended in front of others.
+
+ resolvelib >=0.5.3, <0.6.0
+
+ :param requirements: A collection of requirements which all of \
+ the returned candidates must match. \
+ All requirements are guaranteed to have \
+ the same identifier. \
+ The collection is never empty.
+
+ resolvelib >=0.6.0
+
+ :param identifier: The value returned by ``identify()``.
+
+ :param requirements: The requirements all returned candidates must satisfy.
+ Mapping of identifier, iterator of requirement pairs.
+
+ :param incompatibilities: Incompatible versions that must be excluded
+ from the returned list.
+
+ :returns: An iterable that orders candidates by preference, \
+ e.g. the most preferred candidate comes first.
+ """
+ raise NotImplementedError
+
+ def _find_matches(self, requirements):
+ # type: (list[Requirement]) -> list[Candidate]
+ # FIXME: The first requirement may be a Git repo followed by
+ # FIXME: its cloned tmp dir. Using only the first one creates
+ # FIXME: loops that prevent any further dependency exploration.
+ # FIXME: We need to figure out how to prevent this.
+ first_req = requirements[0]
+ fqcn = first_req.fqcn
+ # The fqcn is guaranteed to be the same
+ version_req = "A SemVer-compliant version or '*' is required. See https://semver.org to learn how to compose it correctly. "
+ version_req += "This is an issue with the collection."
+
+ # If we're upgrading collections, we can't calculate preinstalled_candidates until the latest matches are found.
+ # Otherwise, we can potentially avoid a Galaxy API call by doing this first.
+ preinstalled_candidates = set()
+ if not self._upgrade and first_req.type == 'galaxy':
+ preinstalled_candidates = {
+ candidate for candidate in self._preferred_candidates
+ if candidate.fqcn == fqcn and
+ all(self.is_satisfied_by(requirement, candidate) for requirement in requirements)
+ }
+ try:
+ coll_versions = [] if preinstalled_candidates else self._api_proxy.get_collection_versions(first_req) # type: t.Iterable[t.Tuple[str, GalaxyAPI]]
+ except TypeError as exc:
+ if first_req.is_concrete_artifact:
+ # Non hashable versions will cause a TypeError
+ raise ValueError(
+ f"Invalid version found for the collection '{first_req}'. {version_req}"
+ ) from exc
+ # Unexpected error from a Galaxy server
+ raise
+
+ if first_req.is_concrete_artifact:
+ # FIXME: do we assume that all the following artifacts are also concrete?
+ # FIXME: does using fqcn==None cause us problems here?
+
+ # Ensure the version found in the concrete artifact is SemVer-compliant
+ for version, req_src in coll_versions:
+ version_err = f"Invalid version found for the collection '{first_req}': {version} ({type(version)}). {version_req}"
+ # NOTE: The known cases causing the version to be a non-string object come from
+ # NOTE: the differences in how the YAML parser normalizes ambiguous values and
+ # NOTE: how the end-users sometimes expect them to be parsed. Unless the users
+ # NOTE: explicitly use the double quotes of one of the multiline string syntaxes
+ # NOTE: in the collection metadata file, PyYAML will parse a value containing
+ # NOTE: two dot-separated integers as `float`, a single integer as `int`, and 3+
+ # NOTE: integers as a `str`. In some cases, they may also use an empty value
+ # NOTE: which is normalized as `null` and turned into `None` in the Python-land.
+ # NOTE: Another known mistake is setting a minor part of the SemVer notation
+ # NOTE: skipping the "patch" bit like "1.0" which is assumed non-compliant even
+ # NOTE: after the conversion to string.
+ if not isinstance(version, string_types):
+ raise ValueError(version_err)
+ elif version != '*':
+ try:
+ SemanticVersion(version)
+ except ValueError as ex:
+ raise ValueError(version_err) from ex
+
+ return [
+ Candidate(fqcn, version, _none_src_server, first_req.type, None)
+ for version, _none_src_server in coll_versions
+ ]
+
+ latest_matches = []
+ signatures = []
+ extra_signature_sources = [] # type: list[str]
+ for version, src_server in coll_versions:
+ tmp_candidate = Candidate(fqcn, version, src_server, 'galaxy', None)
+
+ unsatisfied = False
+ for requirement in requirements:
+ unsatisfied |= not self.is_satisfied_by(requirement, tmp_candidate)
+ # FIXME
+ # unsatisfied |= not self.is_satisfied_by(requirement, tmp_candidate) or not (
+ # requirement.src is None or # if this is true for some candidates but not all it will break key param - Nonetype can't be compared to str
+ # or requirement.src == candidate.src
+ # )
+ if unsatisfied:
+ break
+ if not self._include_signatures:
+ continue
+
+ extra_signature_sources.extend(requirement.signature_sources or [])
+
+ if not unsatisfied:
+ if self._include_signatures:
+ signatures = src_server.get_collection_signatures(first_req.namespace, first_req.name, version)
+ for extra_source in extra_signature_sources:
+ signatures.append(get_signature_from_source(extra_source))
+ latest_matches.append(
+ Candidate(fqcn, version, src_server, 'galaxy', frozenset(signatures))
+ )
+
+ latest_matches.sort(
+ key=lambda candidate: (
+ SemanticVersion(candidate.ver), candidate.src,
+ ),
+ reverse=True, # prefer newer versions over older ones
+ )
+
+ if not preinstalled_candidates:
+ preinstalled_candidates = {
+ candidate for candidate in self._preferred_candidates
+ if candidate.fqcn == fqcn and
+ (
+ # check if an upgrade is necessary
+ all(self.is_satisfied_by(requirement, candidate) for requirement in requirements) and
+ (
+ not self._upgrade or
+ # check if an upgrade is preferred
+ all(SemanticVersion(latest.ver) <= SemanticVersion(candidate.ver) for latest in latest_matches)
+ )
+ )
+ }
+
+ return list(preinstalled_candidates) + latest_matches
+
+ def is_satisfied_by(self, requirement, candidate):
+ # type: (Requirement, Candidate) -> bool
+ r"""Whether the given requirement is satisfiable by a candidate.
+
+ :param requirement: A requirement that produced the `candidate`.
+
+ :param candidate: A pinned candidate supposedly matchine the \
+ `requirement` specifier. It is guaranteed to \
+ have been generated from the `requirement`.
+
+ :returns: Indication whether the `candidate` is a viable \
+ solution to the `requirement`.
+ """
+ # NOTE: Only allow pre-release candidates if we want pre-releases
+ # NOTE: or the req ver was an exact match with the pre-release
+ # NOTE: version. Another case where we'd want to allow
+ # NOTE: pre-releases is when there are several user requirements
+ # NOTE: and one of them is a pre-release that also matches a
+ # NOTE: transitive dependency of another requirement.
+ allow_pre_release = self._with_pre_releases or not (
+ requirement.ver == '*' or
+ requirement.ver.startswith('<') or
+ requirement.ver.startswith('>') or
+ requirement.ver.startswith('!=')
+ ) or self._is_user_requested(candidate)
+ if is_pre_release(candidate.ver) and not allow_pre_release:
+ return False
+
+ # NOTE: This is a set of Pipenv-inspired optimizations. Ref:
+ # https://github.com/sarugaku/passa/blob/2ac00f1/src/passa/models/providers.py#L58-L74
+ if (
+ requirement.is_virtual or
+ candidate.is_virtual or
+ requirement.ver == '*'
+ ):
+ return True
+
+ return meets_requirements(
+ version=candidate.ver,
+ requirements=requirement.ver,
+ )
+
+ def get_dependencies(self, candidate):
+ # type: (Candidate) -> list[Candidate]
+ r"""Get direct dependencies of a candidate.
+
+ :returns: A collection of requirements that `candidate` \
+ specifies as its dependencies.
+ """
+ # FIXME: If there's several galaxy servers set, there may be a
+ # FIXME: situation when the metadata of the same collection
+ # FIXME: differs. So how do we resolve this case? Priority?
+ # FIXME: Taking into account a pinned hash? Exploding on
+ # FIXME: any differences?
+ # NOTE: The underlying implmentation currently uses first found
+ req_map = self._api_proxy.get_collection_dependencies(candidate)
+
+ # NOTE: This guard expression MUST perform an early exit only
+ # NOTE: after the `get_collection_dependencies()` call because
+ # NOTE: internally it polulates the artifact URL of the candidate,
+ # NOTE: its SHA hash and the Galaxy API token. These are still
+ # NOTE: necessary with `--no-deps` because even with the disabled
+ # NOTE: dependency resolution the outer layer will still need to
+ # NOTE: know how to download and validate the artifact.
+ #
+ # NOTE: Virtual candidates should always return dependencies
+ # NOTE: because they are ephemeral and non-installable.
+ if not self._with_deps and not candidate.is_virtual:
+ return []
+
+ return [
+ self._make_req_from_dict({'name': dep_name, 'version': dep_req})
+ for dep_name, dep_req in req_map.items()
+ ]
+
+
+# Classes to handle resolvelib API changes between minor versions for 0.X
+class CollectionDependencyProvider050(CollectionDependencyProviderBase):
+ def find_matches(self, requirements): # type: ignore[override]
+ # type: (list[Requirement]) -> list[Candidate]
+ return self._find_matches(requirements)
+
+ def get_preference(self, resolution, candidates, information): # type: ignore[override]
+ # type: (t.Optional[Candidate], list[Candidate], list[t.NamedTuple]) -> t.Union[float, int]
+ return self._get_preference(candidates)
+
+
+class CollectionDependencyProvider060(CollectionDependencyProviderBase):
+ def find_matches(self, identifier, requirements, incompatibilities): # type: ignore[override]
+ # type: (str, t.Mapping[str, t.Iterator[Requirement]], t.Mapping[str, t.Iterator[Requirement]]) -> list[Candidate]
+ return [
+ match for match in self._find_matches(list(requirements[identifier]))
+ if not any(match.ver == incompat.ver for incompat in incompatibilities[identifier])
+ ]
+
+ def get_preference(self, resolution, candidates, information): # type: ignore[override]
+ # type: (t.Optional[Candidate], list[Candidate], list[t.NamedTuple]) -> t.Union[float, int]
+ return self._get_preference(candidates)
+
+
+class CollectionDependencyProvider070(CollectionDependencyProvider060):
+ def get_preference(self, identifier, resolutions, candidates, information): # type: ignore[override]
+ # type: (str, t.Mapping[str, Candidate], t.Mapping[str, t.Iterator[Candidate]], t.Iterator[t.NamedTuple]) -> t.Union[float, int]
+ return self._get_preference(list(candidates[identifier]))
+
+
+class CollectionDependencyProvider080(CollectionDependencyProvider060):
+ def get_preference(self, identifier, resolutions, candidates, information, backtrack_causes): # type: ignore[override]
+ # type: (str, t.Mapping[str, Candidate], t.Mapping[str, t.Iterator[Candidate]], t.Iterator[t.NamedTuple], t.Sequence) -> t.Union[float, int]
+ return self._get_preference(list(candidates[identifier]))
+
+
+def _get_provider(): # type () -> CollectionDependencyProviderBase
+ if RESOLVELIB_VERSION >= SemanticVersion("0.8.0"):
+ return CollectionDependencyProvider080
+ if RESOLVELIB_VERSION >= SemanticVersion("0.7.0"):
+ return CollectionDependencyProvider070
+ if RESOLVELIB_VERSION >= SemanticVersion("0.6.0"):
+ return CollectionDependencyProvider060
+ return CollectionDependencyProvider050
+
+
+CollectionDependencyProvider = _get_provider()
diff --git a/lib/ansible/galaxy/dependency_resolution/reporters.py b/lib/ansible/galaxy/dependency_resolution/reporters.py
new file mode 100644
index 0000000..69908b2
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/reporters.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Requiement reporter implementations."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+try:
+ from resolvelib import BaseReporter
+except ImportError:
+ class BaseReporter: # type: ignore[no-redef]
+ pass
+
+
+class CollectionDependencyReporter(BaseReporter):
+ """A dependency reporter for Ansible Collections.
+
+ This is a proxy class allowing us to abstract away importing resolvelib
+ outside of the `ansible.galaxy.dependency_resolution` Python package.
+ """
diff --git a/lib/ansible/galaxy/dependency_resolution/resolvers.py b/lib/ansible/galaxy/dependency_resolution/resolvers.py
new file mode 100644
index 0000000..87ca38d
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/resolvers.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Requirement resolver implementations."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+try:
+ from resolvelib import Resolver
+except ImportError:
+ class Resolver: # type: ignore[no-redef]
+ pass
+
+
+class CollectionDependencyResolver(Resolver):
+ """A dependency resolver for Ansible Collections.
+
+ This is a proxy class allowing us to abstract away importing resolvelib
+ outside of the `ansible.galaxy.dependency_resolution` Python package.
+ """
diff --git a/lib/ansible/galaxy/dependency_resolution/versioning.py b/lib/ansible/galaxy/dependency_resolution/versioning.py
new file mode 100644
index 0000000..93adce4
--- /dev/null
+++ b/lib/ansible/galaxy/dependency_resolution/versioning.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2019-2020, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Version comparison helpers."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import operator
+
+from ansible.module_utils.compat.version import LooseVersion
+from ansible.utils.version import SemanticVersion
+
+
+def is_pre_release(version):
+ # type: (str) -> bool
+ """Figure out if a given version is a pre-release."""
+ try:
+ return SemanticVersion(version).is_prerelease
+ except ValueError:
+ return False
+
+
+def meets_requirements(version, requirements):
+ # type: (str, str) -> bool
+ """Verify if a given version satisfies all the requirements.
+
+ Supported version identifiers are:
+ * '=='
+ * '!='
+ * '>'
+ * '>='
+ * '<'
+ * '<='
+ * '*'
+
+ Each requirement is delimited by ','.
+ """
+ op_map = {
+ '!=': operator.ne,
+ '==': operator.eq,
+ '=': operator.eq,
+ '>=': operator.ge,
+ '>': operator.gt,
+ '<=': operator.le,
+ '<': operator.lt,
+ }
+
+ for req in requirements.split(','):
+ op_pos = 2 if len(req) > 1 and req[1] == '=' else 1
+ op = op_map.get(req[:op_pos])
+
+ requirement = req[op_pos:]
+ if not op:
+ requirement = req
+ op = operator.eq
+
+ if requirement == '*' or version == '*':
+ continue
+
+ if not op(
+ SemanticVersion(version),
+ SemanticVersion.from_loose_version(LooseVersion(requirement)),
+ ):
+ break
+ else:
+ return True
+
+ # The loop was broken early, it does not meet all the requirements
+ return False