diff options
Diffstat (limited to 'lib/ansible/galaxy/dependency_resolution/dataclasses.py')
-rw-r--r-- | lib/ansible/galaxy/dependency_resolution/dataclasses.py | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/dependency_resolution/dataclasses.py b/lib/ansible/galaxy/dependency_resolution/dataclasses.py new file mode 100644 index 0000000..16fd631 --- /dev/null +++ b/lib/ansible/galaxy/dependency_resolution/dataclasses.py @@ -0,0 +1,573 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2020-2021, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +"""Dependency structs.""" +# FIXME: add caching all over the place + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import typing as t + +from collections import namedtuple +from collections.abc import MutableSequence, MutableMapping +from glob import iglob +from urllib.parse import urlparse +from yaml import safe_load + +if t.TYPE_CHECKING: + from ansible.galaxy.collection.concrete_artifact_manager import ( + ConcreteArtifactsManager, + ) + Collection = t.TypeVar( + 'Collection', + 'Candidate', 'Requirement', + '_ComputedReqKindsMixin', + ) + + +from ansible.errors import AnsibleError +from ansible.galaxy.api import GalaxyAPI +from ansible.module_utils._text import to_bytes, to_native, to_text +from ansible.module_utils.common.arg_spec import ArgumentSpecValidator +from ansible.utils.collection_loader import AnsibleCollectionRef +from ansible.utils.display import Display + + +_ALLOW_CONCRETE_POINTER_IN_SOURCE = False # NOTE: This is a feature flag +_GALAXY_YAML = b'galaxy.yml' +_MANIFEST_JSON = b'MANIFEST.json' +_SOURCE_METADATA_FILE = b'GALAXY.yml' + +display = Display() + + +def get_validated_source_info(b_source_info_path, namespace, name, version): + source_info_path = to_text(b_source_info_path, errors='surrogate_or_strict') + + if not os.path.isfile(b_source_info_path): + return None + + try: + with open(b_source_info_path, mode='rb') as fd: + metadata = safe_load(fd) + except OSError as e: + display.warning( + f"Error getting collection source information at '{source_info_path}': {to_text(e, errors='surrogate_or_strict')}" + ) + return None + + if not isinstance(metadata, MutableMapping): + display.warning(f"Error getting collection source information at '{source_info_path}': expected a YAML dictionary") + return None + + schema_errors = _validate_v1_source_info_schema(namespace, name, version, metadata) + if schema_errors: + display.warning(f"Ignoring source metadata file at {source_info_path} due to the following errors:") + display.warning("\n".join(schema_errors)) + display.warning("Correct the source metadata file by reinstalling the collection.") + return None + + return metadata + + +def _validate_v1_source_info_schema(namespace, name, version, provided_arguments): + argument_spec_data = dict( + format_version=dict(choices=["1.0.0"]), + download_url=dict(), + version_url=dict(), + server=dict(), + signatures=dict( + type=list, + suboptions=dict( + signature=dict(), + pubkey_fingerprint=dict(), + signing_service=dict(), + pulp_created=dict(), + ) + ), + name=dict(choices=[name]), + namespace=dict(choices=[namespace]), + version=dict(choices=[version]), + ) + + if not isinstance(provided_arguments, dict): + raise AnsibleError( + f'Invalid offline source info for {namespace}.{name}:{version}, expected a dict and got {type(provided_arguments)}' + ) + validator = ArgumentSpecValidator(argument_spec_data) + validation_result = validator.validate(provided_arguments) + + return validation_result.error_messages + + +def _is_collection_src_dir(dir_path): + b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict') + return os.path.isfile(os.path.join(b_dir_path, _GALAXY_YAML)) + + +def _is_installed_collection_dir(dir_path): + b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict') + return os.path.isfile(os.path.join(b_dir_path, _MANIFEST_JSON)) + + +def _is_collection_dir(dir_path): + return ( + _is_installed_collection_dir(dir_path) or + _is_collection_src_dir(dir_path) + ) + + +def _find_collections_in_subdirs(dir_path): + b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict') + + subdir_glob_pattern = os.path.join( + b_dir_path, + # b'*', # namespace is supposed to be top-level per spec + b'*', # collection name + ) + + for subdir in iglob(subdir_glob_pattern): + if os.path.isfile(os.path.join(subdir, _MANIFEST_JSON)): + yield subdir + elif os.path.isfile(os.path.join(subdir, _GALAXY_YAML)): + yield subdir + + +def _is_collection_namespace_dir(tested_str): + return any(_find_collections_in_subdirs(tested_str)) + + +def _is_file_path(tested_str): + return os.path.isfile(to_bytes(tested_str, errors='surrogate_or_strict')) + + +def _is_http_url(tested_str): + return urlparse(tested_str).scheme.lower() in {'http', 'https'} + + +def _is_git_url(tested_str): + return tested_str.startswith(('git+', 'git@')) + + +def _is_concrete_artifact_pointer(tested_str): + return any( + predicate(tested_str) + for predicate in ( + # NOTE: Maintain the checks to be sorted from light to heavy: + _is_git_url, + _is_http_url, + _is_file_path, + _is_collection_dir, + _is_collection_namespace_dir, + ) + ) + + +class _ComputedReqKindsMixin: + + def __init__(self, *args, **kwargs): + if not self.may_have_offline_galaxy_info: + self._source_info = None + else: + info_path = self.construct_galaxy_info_path(to_bytes(self.src, errors='surrogate_or_strict')) + + self._source_info = get_validated_source_info( + info_path, + self.namespace, + self.name, + self.ver + ) + + @classmethod + def from_dir_path_as_unknown( # type: ignore[misc] + cls, # type: t.Type[Collection] + dir_path, # type: bytes + art_mgr, # type: ConcreteArtifactsManager + ): # type: (...) -> Collection + """Make collection from an unspecified dir type. + + This alternative constructor attempts to grab metadata from the + given path if it's a directory. If there's no metadata, it + falls back to guessing the FQCN based on the directory path and + sets the version to "*". + + It raises a ValueError immediately if the input is not an + existing directory path. + """ + if not os.path.isdir(dir_path): + raise ValueError( + "The collection directory '{path!s}' doesn't exist". + format(path=to_native(dir_path)), + ) + + try: + return cls.from_dir_path(dir_path, art_mgr) + except ValueError: + return cls.from_dir_path_implicit(dir_path) + + @classmethod + def from_dir_path(cls, dir_path, art_mgr): + """Make collection from an directory with metadata.""" + b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict') + if not _is_collection_dir(b_dir_path): + display.warning( + u"Collection at '{path!s}' does not have a {manifest_json!s} " + u'file, nor has it {galaxy_yml!s}: cannot detect version.'. + format( + galaxy_yml=to_text(_GALAXY_YAML), + manifest_json=to_text(_MANIFEST_JSON), + path=to_text(dir_path, errors='surrogate_or_strict'), + ), + ) + raise ValueError( + '`dir_path` argument must be an installed or a source' + ' collection directory.', + ) + + tmp_inst_req = cls(None, None, dir_path, 'dir', None) + req_version = art_mgr.get_direct_collection_version(tmp_inst_req) + try: + req_name = art_mgr.get_direct_collection_fqcn(tmp_inst_req) + except TypeError as err: + # Looks like installed/source dir but isn't: doesn't have valid metadata. + display.warning( + u"Collection at '{path!s}' has a {manifest_json!s} " + u"or {galaxy_yml!s} file but it contains invalid metadata.". + format( + galaxy_yml=to_text(_GALAXY_YAML), + manifest_json=to_text(_MANIFEST_JSON), + path=to_text(dir_path, errors='surrogate_or_strict'), + ), + ) + raise ValueError( + "Collection at '{path!s}' has invalid metadata". + format(path=to_text(dir_path, errors='surrogate_or_strict')) + ) from err + + return cls(req_name, req_version, dir_path, 'dir', None) + + @classmethod + def from_dir_path_implicit( # type: ignore[misc] + cls, # type: t.Type[Collection] + dir_path, # type: bytes + ): # type: (...) -> Collection + """Construct a collection instance based on an arbitrary dir. + + This alternative constructor infers the FQCN based on the parent + and current directory names. It also sets the version to "*" + regardless of whether any of known metadata files are present. + """ + # There is no metadata, but it isn't required for a functional collection. Determine the namespace.name from the path. + u_dir_path = to_text(dir_path, errors='surrogate_or_strict') + path_list = u_dir_path.split(os.path.sep) + req_name = '.'.join(path_list[-2:]) + return cls(req_name, '*', dir_path, 'dir', None) # type: ignore[call-arg] + + @classmethod + def from_string(cls, collection_input, artifacts_manager, supplemental_signatures): + req = {} + if _is_concrete_artifact_pointer(collection_input): + # Arg is a file path or URL to a collection + req['name'] = collection_input + else: + req['name'], _sep, req['version'] = collection_input.partition(':') + if not req['version']: + del req['version'] + req['signatures'] = supplemental_signatures + + return cls.from_requirement_dict(req, artifacts_manager) + + @classmethod + def from_requirement_dict(cls, collection_req, art_mgr, validate_signature_options=True): + req_name = collection_req.get('name', None) + req_version = collection_req.get('version', '*') + req_type = collection_req.get('type') + # TODO: decide how to deprecate the old src API behavior + req_source = collection_req.get('source', None) + req_signature_sources = collection_req.get('signatures', None) + if req_signature_sources is not None: + if validate_signature_options and art_mgr.keyring is None: + raise AnsibleError( + f"Signatures were provided to verify {req_name} but no keyring was configured." + ) + + if not isinstance(req_signature_sources, MutableSequence): + req_signature_sources = [req_signature_sources] + req_signature_sources = frozenset(req_signature_sources) + + if req_type is None: + if ( # FIXME: decide on the future behavior: + _ALLOW_CONCRETE_POINTER_IN_SOURCE + and req_source is not None + and _is_concrete_artifact_pointer(req_source) + ): + src_path = req_source + elif ( + req_name is not None + and AnsibleCollectionRef.is_valid_collection_name(req_name) + ): + req_type = 'galaxy' + elif ( + req_name is not None + and _is_concrete_artifact_pointer(req_name) + ): + src_path, req_name = req_name, None + else: + dir_tip_tmpl = ( # NOTE: leading LFs are for concat + '\n\nTip: Make sure you are pointing to the right ' + 'subdirectory — `{src!s}` looks like a directory ' + 'but it is neither a collection, nor a namespace ' + 'dir.' + ) + + if req_source is not None and os.path.isdir(req_source): + tip = dir_tip_tmpl.format(src=req_source) + elif req_name is not None and os.path.isdir(req_name): + tip = dir_tip_tmpl.format(src=req_name) + elif req_name: + tip = '\n\nCould not find {0}.'.format(req_name) + else: + tip = '' + + raise AnsibleError( # NOTE: I'd prefer a ValueError instead + 'Neither the collection requirement entry key ' + "'name', nor 'source' point to a concrete " + "resolvable collection artifact. Also 'name' is " + 'not an FQCN. A valid collection name must be in ' + 'the format <namespace>.<collection>. Please make ' + 'sure that the namespace and the collection name ' + 'contain characters from [a-zA-Z0-9_] only.' + '{extra_tip!s}'.format(extra_tip=tip), + ) + + if req_type is None: + if _is_git_url(src_path): + req_type = 'git' + req_source = src_path + elif _is_http_url(src_path): + req_type = 'url' + req_source = src_path + elif _is_file_path(src_path): + req_type = 'file' + req_source = src_path + elif _is_collection_dir(src_path): + if _is_installed_collection_dir(src_path) and _is_collection_src_dir(src_path): + # Note that ``download`` requires a dir with a ``galaxy.yml`` and fails if it + # doesn't exist, but if a ``MANIFEST.json`` also exists, it would be used + # instead of the ``galaxy.yml``. + raise AnsibleError( + u"Collection requirement at '{path!s}' has both a {manifest_json!s} " + u"file and a {galaxy_yml!s}.\nThe requirement must either be an installed " + u"collection directory or a source collection directory, not both.". + format( + path=to_text(src_path, errors='surrogate_or_strict'), + manifest_json=to_text(_MANIFEST_JSON), + galaxy_yml=to_text(_GALAXY_YAML), + ) + ) + req_type = 'dir' + req_source = src_path + elif _is_collection_namespace_dir(src_path): + req_name = None # No name for a virtual req or "namespace."? + req_type = 'subdirs' + req_source = src_path + else: + raise AnsibleError( # NOTE: this is never supposed to be hit + 'Failed to automatically detect the collection ' + 'requirement type.', + ) + + if req_type not in {'file', 'galaxy', 'git', 'url', 'dir', 'subdirs'}: + raise AnsibleError( + "The collection requirement entry key 'type' must be " + 'one of file, galaxy, git, dir, subdirs, or url.' + ) + + if req_name is None and req_type == 'galaxy': + raise AnsibleError( + 'Collections requirement entry should contain ' + "the key 'name' if it's requested from a Galaxy-like " + 'index server.', + ) + + if req_type != 'galaxy' and req_source is None: + req_source, req_name = req_name, None + + if ( + req_type == 'galaxy' and + isinstance(req_source, GalaxyAPI) and + not _is_http_url(req_source.api_server) + ): + raise AnsibleError( + "Collections requirement 'source' entry should contain " + 'a valid Galaxy API URL but it does not: {not_url!s} ' + 'is not an HTTP URL.'. + format(not_url=req_source.api_server), + ) + + tmp_inst_req = cls(req_name, req_version, req_source, req_type, req_signature_sources) + + if req_type not in {'galaxy', 'subdirs'} and req_name is None: + req_name = art_mgr.get_direct_collection_fqcn(tmp_inst_req) # TODO: fix the cache key in artifacts manager? + + if req_type not in {'galaxy', 'subdirs'} and req_version == '*': + req_version = art_mgr.get_direct_collection_version(tmp_inst_req) + + return cls( + req_name, req_version, + req_source, req_type, + req_signature_sources, + ) + + def __repr__(self): + return ( + '<{self!s} of type {coll_type!r} from {src!s}>'. + format(self=self, coll_type=self.type, src=self.src or 'Galaxy') + ) + + def __str__(self): + return to_native(self.__unicode__()) + + def __unicode__(self): + if self.fqcn is None: + return ( + u'"virtual collection Git repo"' if self.is_scm + else u'"virtual collection namespace"' + ) + + return ( + u'{fqcn!s}:{ver!s}'. + format(fqcn=to_text(self.fqcn), ver=to_text(self.ver)) + ) + + @property + def may_have_offline_galaxy_info(self): + if self.fqcn is None: + # Virtual collection + return False + elif not self.is_dir or self.src is None or not _is_collection_dir(self.src): + # Not a dir or isn't on-disk + return False + return True + + def construct_galaxy_info_path(self, b_collection_path): + if not self.may_have_offline_galaxy_info and not self.type == 'galaxy': + raise TypeError('Only installed collections from a Galaxy server have offline Galaxy info') + + # Store Galaxy metadata adjacent to the namespace of the collection + # Chop off the last two parts of the path (/ns/coll) to get the dir containing the ns + b_src = to_bytes(b_collection_path, errors='surrogate_or_strict') + b_path_parts = b_src.split(to_bytes(os.path.sep))[0:-2] + b_metadata_dir = to_bytes(os.path.sep).join(b_path_parts) + + # ns.coll-1.0.0.info + b_dir_name = to_bytes(f"{self.namespace}.{self.name}-{self.ver}.info", errors="surrogate_or_strict") + + # collections/ansible_collections/ns.coll-1.0.0.info/GALAXY.yml + return os.path.join(b_metadata_dir, b_dir_name, _SOURCE_METADATA_FILE) + + def _get_separate_ns_n_name(self): # FIXME: use LRU cache + return self.fqcn.split('.') + + @property + def namespace(self): + if self.is_virtual: + raise TypeError('Virtual collections do not have a namespace') + + return self._get_separate_ns_n_name()[0] + + @property + def name(self): + if self.is_virtual: + raise TypeError('Virtual collections do not have a name') + + return self._get_separate_ns_n_name()[-1] + + @property + def canonical_package_id(self): + if not self.is_virtual: + return to_native(self.fqcn) + + return ( + '<virtual namespace from {src!s} of type {src_type!s}>'. + format(src=to_native(self.src), src_type=to_native(self.type)) + ) + + @property + def is_virtual(self): + return self.is_scm or self.is_subdirs + + @property + def is_file(self): + return self.type == 'file' + + @property + def is_dir(self): + return self.type == 'dir' + + @property + def namespace_collection_paths(self): + return [ + to_native(path) + for path in _find_collections_in_subdirs(self.src) + ] + + @property + def is_subdirs(self): + return self.type == 'subdirs' + + @property + def is_url(self): + return self.type == 'url' + + @property + def is_scm(self): + return self.type == 'git' + + @property + def is_concrete_artifact(self): + return self.type in {'git', 'url', 'file', 'dir', 'subdirs'} + + @property + def is_online_index_pointer(self): + return not self.is_concrete_artifact + + @property + def source_info(self): + return self._source_info + + +RequirementNamedTuple = namedtuple('Requirement', ('fqcn', 'ver', 'src', 'type', 'signature_sources')) # type: ignore[name-match] + + +CandidateNamedTuple = namedtuple('Candidate', ('fqcn', 'ver', 'src', 'type', 'signatures')) # type: ignore[name-match] + + +class Requirement( + _ComputedReqKindsMixin, + RequirementNamedTuple, +): + """An abstract requirement request.""" + + def __new__(cls, *args, **kwargs): + self = RequirementNamedTuple.__new__(cls, *args, **kwargs) + return self + + def __init__(self, *args, **kwargs): + super(Requirement, self).__init__() + + +class Candidate( + _ComputedReqKindsMixin, + CandidateNamedTuple, +): + """A concrete collection candidate with its version resolved.""" + + def __new__(cls, *args, **kwargs): + self = CandidateNamedTuple.__new__(cls, *args, **kwargs) + return self + + def __init__(self, *args, **kwargs): + super(Candidate, self).__init__() |