summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/collection
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 16:04:21 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 16:04:21 +0000
commit8a754e0858d922e955e71b253c139e071ecec432 (patch)
tree527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/galaxy/collection
parentInitial commit. (diff)
downloadansible-core-8a754e0858d922e955e71b253c139e071ecec432.tar.xz
ansible-core-8a754e0858d922e955e71b253c139e071ecec432.zip
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/galaxy/collection')
-rw-r--r--lib/ansible/galaxy/collection/__init__.py1836
-rw-r--r--lib/ansible/galaxy/collection/concrete_artifact_manager.py755
-rw-r--r--lib/ansible/galaxy/collection/galaxy_api_proxy.py216
-rw-r--r--lib/ansible/galaxy/collection/gpg.py282
4 files changed, 3089 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/collection/__init__.py b/lib/ansible/galaxy/collection/__init__.py
new file mode 100644
index 0000000..7a144c0
--- /dev/null
+++ b/lib/ansible/galaxy/collection/__init__.py
@@ -0,0 +1,1836 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2019-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Installed collections management package."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import errno
+import fnmatch
+import functools
+import json
+import os
+import queue
+import re
+import shutil
+import stat
+import sys
+import tarfile
+import tempfile
+import textwrap
+import threading
+import time
+import typing as t
+
+from collections import namedtuple
+from contextlib import contextmanager
+from dataclasses import dataclass, fields as dc_fields
+from hashlib import sha256
+from io import BytesIO
+from importlib.metadata import distribution
+from itertools import chain
+
+try:
+ from packaging.requirements import Requirement as PkgReq
+except ImportError:
+ class PkgReq: # type: ignore[no-redef]
+ pass
+
+ HAS_PACKAGING = False
+else:
+ HAS_PACKAGING = True
+
+try:
+ from distlib.manifest import Manifest # type: ignore[import]
+ from distlib import DistlibException # type: ignore[import]
+except ImportError:
+ HAS_DISTLIB = False
+else:
+ HAS_DISTLIB = True
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.collection.concrete_artifact_manager import (
+ ConcreteArtifactsManager,
+ )
+
+ ManifestKeysType = t.Literal[
+ 'collection_info', 'file_manifest_file', 'format',
+ ]
+ FileMetaKeysType = t.Literal[
+ 'name',
+ 'ftype',
+ 'chksum_type',
+ 'chksum_sha256',
+ 'format',
+ ]
+ CollectionInfoKeysType = t.Literal[
+ # collection meta:
+ 'namespace', 'name', 'version',
+ 'authors', 'readme',
+ 'tags', 'description',
+ 'license', 'license_file',
+ 'dependencies',
+ 'repository', 'documentation',
+ 'homepage', 'issues',
+
+ # files meta:
+ FileMetaKeysType,
+ ]
+ ManifestValueType = t.Dict[CollectionInfoKeysType, t.Union[int, str, t.List[str], t.Dict[str, str], None]]
+ CollectionManifestType = t.Dict[ManifestKeysType, ManifestValueType]
+ FileManifestEntryType = t.Dict[FileMetaKeysType, t.Union[str, int, None]]
+ FilesManifestType = t.Dict[t.Literal['files', 'format'], t.Union[t.List[FileManifestEntryType], int]]
+
+import ansible.constants as C
+from ansible.errors import AnsibleError
+from ansible.galaxy.api import GalaxyAPI
+from ansible.galaxy.collection.concrete_artifact_manager import (
+ _consume_file,
+ _download_file,
+ _get_json_from_installed_dir,
+ _get_meta_from_src_dir,
+ _tarfile_extract,
+)
+from ansible.galaxy.collection.galaxy_api_proxy import MultiGalaxyAPIProxy
+from ansible.galaxy.collection.gpg import (
+ run_gpg_verify,
+ parse_gpg_errors,
+ get_signature_from_source,
+ GPG_ERROR_MAP,
+)
+try:
+ from ansible.galaxy.dependency_resolution import (
+ build_collection_dependency_resolver,
+ )
+ from ansible.galaxy.dependency_resolution.errors import (
+ CollectionDependencyResolutionImpossible,
+ CollectionDependencyInconsistentCandidate,
+ )
+ from ansible.galaxy.dependency_resolution.providers import (
+ RESOLVELIB_VERSION,
+ RESOLVELIB_LOWERBOUND,
+ RESOLVELIB_UPPERBOUND,
+ )
+except ImportError:
+ HAS_RESOLVELIB = False
+else:
+ HAS_RESOLVELIB = True
+
+from ansible.galaxy.dependency_resolution.dataclasses import (
+ Candidate, Requirement, _is_installed_collection_dir,
+)
+from ansible.galaxy.dependency_resolution.versioning import meets_requirements
+from ansible.plugins.loader import get_all_plugin_loaders
+from ansible.module_utils.six import raise_from
+from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.common.collections import is_sequence
+from ansible.module_utils.common.yaml import yaml_dump
+from ansible.utils.collection_loader import AnsibleCollectionRef
+from ansible.utils.display import Display
+from ansible.utils.hashing import secure_hash, secure_hash_s
+from ansible.utils.sentinel import Sentinel
+
+
+display = Display()
+
+MANIFEST_FORMAT = 1
+MANIFEST_FILENAME = 'MANIFEST.json'
+
+ModifiedContent = namedtuple('ModifiedContent', ['filename', 'expected', 'installed'])
+
+SIGNATURE_COUNT_RE = r"^(?P<strict>\+)?(?:(?P<count>\d+)|(?P<all>all))$"
+
+
+@dataclass
+class ManifestControl:
+ directives: list[str] = None
+ omit_default_directives: bool = False
+
+ def __post_init__(self):
+ # Allow a dict representing this dataclass to be splatted directly.
+ # Requires attrs to have a default value, so anything with a default
+ # of None is swapped for its, potentially mutable, default
+ for field in dc_fields(self):
+ if getattr(self, field.name) is None:
+ super().__setattr__(field.name, field.type())
+
+
+class CollectionSignatureError(Exception):
+ def __init__(self, reasons=None, stdout=None, rc=None, ignore=False):
+ self.reasons = reasons
+ self.stdout = stdout
+ self.rc = rc
+ self.ignore = ignore
+
+ self._reason_wrapper = None
+
+ def _report_unexpected(self, collection_name):
+ return (
+ f"Unexpected error for '{collection_name}': "
+ f"GnuPG signature verification failed with the return code {self.rc} and output {self.stdout}"
+ )
+
+ def _report_expected(self, collection_name):
+ header = f"Signature verification failed for '{collection_name}' (return code {self.rc}):"
+ return header + self._format_reasons()
+
+ def _format_reasons(self):
+ if self._reason_wrapper is None:
+ self._reason_wrapper = textwrap.TextWrapper(
+ initial_indent=" * ", # 6 chars
+ subsequent_indent=" ", # 6 chars
+ )
+
+ wrapped_reasons = [
+ '\n'.join(self._reason_wrapper.wrap(reason))
+ for reason in self.reasons
+ ]
+
+ return '\n' + '\n'.join(wrapped_reasons)
+
+ def report(self, collection_name):
+ if self.reasons:
+ return self._report_expected(collection_name)
+
+ return self._report_unexpected(collection_name)
+
+
+# FUTURE: expose actual verify result details for a collection on this object, maybe reimplement as dataclass on py3.8+
+class CollectionVerifyResult:
+ def __init__(self, collection_name): # type: (str) -> None
+ self.collection_name = collection_name # type: str
+ self.success = True # type: bool
+
+
+def verify_local_collection(local_collection, remote_collection, artifacts_manager):
+ # type: (Candidate, t.Optional[Candidate], ConcreteArtifactsManager) -> CollectionVerifyResult
+ """Verify integrity of the locally installed collection.
+
+ :param local_collection: Collection being checked.
+ :param remote_collection: Upstream collection (optional, if None, only verify local artifact)
+ :param artifacts_manager: Artifacts manager.
+ :return: a collection verify result object.
+ """
+ result = CollectionVerifyResult(local_collection.fqcn)
+
+ b_collection_path = to_bytes(local_collection.src, errors='surrogate_or_strict')
+
+ display.display("Verifying '{coll!s}'.".format(coll=local_collection))
+ display.display(
+ u"Installed collection found at '{path!s}'".
+ format(path=to_text(local_collection.src)),
+ )
+
+ modified_content = [] # type: list[ModifiedContent]
+
+ verify_local_only = remote_collection is None
+
+ # partial away the local FS detail so we can just ask generically during validation
+ get_json_from_validation_source = functools.partial(_get_json_from_installed_dir, b_collection_path)
+ get_hash_from_validation_source = functools.partial(_get_file_hash, b_collection_path)
+
+ if not verify_local_only:
+ # Compare installed version versus requirement version
+ if local_collection.ver != remote_collection.ver:
+ err = (
+ "{local_fqcn!s} has the version '{local_ver!s}' but "
+ "is being compared to '{remote_ver!s}'".format(
+ local_fqcn=local_collection.fqcn,
+ local_ver=local_collection.ver,
+ remote_ver=remote_collection.ver,
+ )
+ )
+ display.display(err)
+ result.success = False
+ return result
+
+ manifest_file = os.path.join(to_text(b_collection_path, errors='surrogate_or_strict'), MANIFEST_FILENAME)
+ signatures = list(local_collection.signatures)
+ if verify_local_only and local_collection.source_info is not None:
+ signatures = [info["signature"] for info in local_collection.source_info["signatures"]] + signatures
+ elif not verify_local_only and remote_collection.signatures:
+ signatures = list(remote_collection.signatures) + signatures
+
+ keyring_configured = artifacts_manager.keyring is not None
+ if not keyring_configured and signatures:
+ display.warning(
+ "The GnuPG keyring used for collection signature "
+ "verification was not configured but signatures were "
+ "provided by the Galaxy server. "
+ "Configure a keyring for ansible-galaxy to verify "
+ "the origin of the collection. "
+ "Skipping signature verification."
+ )
+ elif keyring_configured:
+ if not verify_file_signatures(
+ local_collection.fqcn,
+ manifest_file,
+ signatures,
+ artifacts_manager.keyring,
+ artifacts_manager.required_successful_signature_count,
+ artifacts_manager.ignore_signature_errors,
+ ):
+ result.success = False
+ return result
+ display.vvvv(f"GnuPG signature verification succeeded, verifying contents of {local_collection}")
+
+ if verify_local_only:
+ # since we're not downloading this, just seed it with the value from disk
+ manifest_hash = get_hash_from_validation_source(MANIFEST_FILENAME)
+ elif keyring_configured and remote_collection.signatures:
+ manifest_hash = get_hash_from_validation_source(MANIFEST_FILENAME)
+ else:
+ # fetch remote
+ b_temp_tar_path = ( # NOTE: AnsibleError is raised on URLError
+ artifacts_manager.get_artifact_path
+ if remote_collection.is_concrete_artifact
+ else artifacts_manager.get_galaxy_artifact_path
+ )(remote_collection)
+
+ display.vvv(
+ u"Remote collection cached as '{path!s}'".format(path=to_text(b_temp_tar_path))
+ )
+
+ # partial away the tarball details so we can just ask generically during validation
+ get_json_from_validation_source = functools.partial(_get_json_from_tar_file, b_temp_tar_path)
+ get_hash_from_validation_source = functools.partial(_get_tar_file_hash, b_temp_tar_path)
+
+ # Verify the downloaded manifest hash matches the installed copy before verifying the file manifest
+ manifest_hash = get_hash_from_validation_source(MANIFEST_FILENAME)
+ _verify_file_hash(b_collection_path, MANIFEST_FILENAME, manifest_hash, modified_content)
+
+ display.display('MANIFEST.json hash: {manifest_hash}'.format(manifest_hash=manifest_hash))
+
+ manifest = get_json_from_validation_source(MANIFEST_FILENAME)
+
+ # Use the manifest to verify the file manifest checksum
+ file_manifest_data = manifest['file_manifest_file']
+ file_manifest_filename = file_manifest_data['name']
+ expected_hash = file_manifest_data['chksum_%s' % file_manifest_data['chksum_type']]
+
+ # Verify the file manifest before using it to verify individual files
+ _verify_file_hash(b_collection_path, file_manifest_filename, expected_hash, modified_content)
+ file_manifest = get_json_from_validation_source(file_manifest_filename)
+
+ collection_dirs = set()
+ collection_files = {
+ os.path.join(b_collection_path, b'MANIFEST.json'),
+ os.path.join(b_collection_path, b'FILES.json'),
+ }
+
+ # Use the file manifest to verify individual file checksums
+ for manifest_data in file_manifest['files']:
+ name = manifest_data['name']
+
+ if manifest_data['ftype'] == 'file':
+ collection_files.add(
+ os.path.join(b_collection_path, to_bytes(name, errors='surrogate_or_strict'))
+ )
+ expected_hash = manifest_data['chksum_%s' % manifest_data['chksum_type']]
+ _verify_file_hash(b_collection_path, name, expected_hash, modified_content)
+
+ if manifest_data['ftype'] == 'dir':
+ collection_dirs.add(
+ os.path.join(b_collection_path, to_bytes(name, errors='surrogate_or_strict'))
+ )
+
+ # Find any paths not in the FILES.json
+ for root, dirs, files in os.walk(b_collection_path):
+ for name in files:
+ full_path = os.path.join(root, name)
+ path = to_text(full_path[len(b_collection_path) + 1::], errors='surrogate_or_strict')
+
+ if full_path not in collection_files:
+ modified_content.append(
+ ModifiedContent(filename=path, expected='the file does not exist', installed='the file exists')
+ )
+ for name in dirs:
+ full_path = os.path.join(root, name)
+ path = to_text(full_path[len(b_collection_path) + 1::], errors='surrogate_or_strict')
+
+ if full_path not in collection_dirs:
+ modified_content.append(
+ ModifiedContent(filename=path, expected='the directory does not exist', installed='the directory exists')
+ )
+
+ if modified_content:
+ result.success = False
+ display.display(
+ 'Collection {fqcn!s} contains modified content '
+ 'in the following files:'.
+ format(fqcn=to_text(local_collection.fqcn)),
+ )
+ for content_change in modified_content:
+ display.display(' %s' % content_change.filename)
+ display.v(" Expected: %s\n Found: %s" % (content_change.expected, content_change.installed))
+ else:
+ what = "are internally consistent with its manifest" if verify_local_only else "match the remote collection"
+ display.display(
+ "Successfully verified that checksums for '{coll!s}' {what!s}.".
+ format(coll=local_collection, what=what),
+ )
+
+ return result
+
+
+def verify_file_signatures(fqcn, manifest_file, detached_signatures, keyring, required_successful_count, ignore_signature_errors):
+ # type: (str, str, list[str], str, str, list[str]) -> bool
+ successful = 0
+ error_messages = []
+
+ signature_count_requirements = re.match(SIGNATURE_COUNT_RE, required_successful_count).groupdict()
+
+ strict = signature_count_requirements['strict'] or False
+ require_all = signature_count_requirements['all']
+ require_count = signature_count_requirements['count']
+ if require_count is not None:
+ require_count = int(require_count)
+
+ for signature in detached_signatures:
+ signature = to_text(signature, errors='surrogate_or_strict')
+ try:
+ verify_file_signature(manifest_file, signature, keyring, ignore_signature_errors)
+ except CollectionSignatureError as error:
+ if error.ignore:
+ # Do not include ignored errors in either the failed or successful count
+ continue
+ error_messages.append(error.report(fqcn))
+ else:
+ successful += 1
+
+ if require_all:
+ continue
+
+ if successful == require_count:
+ break
+
+ if strict and not successful:
+ verified = False
+ display.display(f"Signature verification failed for '{fqcn}': no successful signatures")
+ elif require_all:
+ verified = not error_messages
+ if not verified:
+ display.display(f"Signature verification failed for '{fqcn}': some signatures failed")
+ else:
+ verified = not detached_signatures or require_count == successful
+ if not verified:
+ display.display(f"Signature verification failed for '{fqcn}': fewer successful signatures than required")
+
+ if not verified:
+ for msg in error_messages:
+ display.vvvv(msg)
+
+ return verified
+
+
+def verify_file_signature(manifest_file, detached_signature, keyring, ignore_signature_errors):
+ # type: (str, str, str, list[str]) -> None
+ """Run the gpg command and parse any errors. Raises CollectionSignatureError on failure."""
+ gpg_result, gpg_verification_rc = run_gpg_verify(manifest_file, detached_signature, keyring, display)
+
+ if gpg_result:
+ errors = parse_gpg_errors(gpg_result)
+ try:
+ error = next(errors)
+ except StopIteration:
+ pass
+ else:
+ reasons = []
+ ignored_reasons = 0
+
+ for error in chain([error], errors):
+ # Get error status (dict key) from the class (dict value)
+ status_code = list(GPG_ERROR_MAP.keys())[list(GPG_ERROR_MAP.values()).index(error.__class__)]
+ if status_code in ignore_signature_errors:
+ ignored_reasons += 1
+ reasons.append(error.get_gpg_error_description())
+
+ ignore = len(reasons) == ignored_reasons
+ raise CollectionSignatureError(reasons=set(reasons), stdout=gpg_result, rc=gpg_verification_rc, ignore=ignore)
+
+ if gpg_verification_rc:
+ raise CollectionSignatureError(stdout=gpg_result, rc=gpg_verification_rc)
+
+ # No errors and rc is 0, verify was successful
+ return None
+
+
+def build_collection(u_collection_path, u_output_path, force):
+ # type: (str, str, bool) -> str
+ """Creates the Ansible collection artifact in a .tar.gz file.
+
+ :param u_collection_path: The path to the collection to build. This should be the directory that contains the
+ galaxy.yml file.
+ :param u_output_path: The path to create the collection build artifact. This should be a directory.
+ :param force: Whether to overwrite an existing collection build artifact or fail.
+ :return: The path to the collection build artifact.
+ """
+ b_collection_path = to_bytes(u_collection_path, errors='surrogate_or_strict')
+ try:
+ collection_meta = _get_meta_from_src_dir(b_collection_path)
+ except LookupError as lookup_err:
+ raise_from(AnsibleError(to_native(lookup_err)), lookup_err)
+
+ collection_manifest = _build_manifest(**collection_meta)
+ file_manifest = _build_files_manifest(
+ b_collection_path,
+ collection_meta['namespace'], # type: ignore[arg-type]
+ collection_meta['name'], # type: ignore[arg-type]
+ collection_meta['build_ignore'], # type: ignore[arg-type]
+ collection_meta['manifest'], # type: ignore[arg-type]
+ )
+
+ artifact_tarball_file_name = '{ns!s}-{name!s}-{ver!s}.tar.gz'.format(
+ name=collection_meta['name'],
+ ns=collection_meta['namespace'],
+ ver=collection_meta['version'],
+ )
+ b_collection_output = os.path.join(
+ to_bytes(u_output_path),
+ to_bytes(artifact_tarball_file_name, errors='surrogate_or_strict'),
+ )
+
+ if os.path.exists(b_collection_output):
+ if os.path.isdir(b_collection_output):
+ raise AnsibleError("The output collection artifact '%s' already exists, "
+ "but is a directory - aborting" % to_native(b_collection_output))
+ elif not force:
+ raise AnsibleError("The file '%s' already exists. You can use --force to re-create "
+ "the collection artifact." % to_native(b_collection_output))
+
+ collection_output = _build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest)
+ return collection_output
+
+
+def download_collections(
+ collections, # type: t.Iterable[Requirement]
+ output_path, # type: str
+ apis, # type: t.Iterable[GalaxyAPI]
+ no_deps, # type: bool
+ allow_pre_release, # type: bool
+ artifacts_manager, # type: ConcreteArtifactsManager
+): # type: (...) -> None
+ """Download Ansible collections as their tarball from a Galaxy server to the path specified and creates a requirements
+ file of the downloaded requirements to be used for an install.
+
+ :param collections: The collections to download, should be a list of tuples with (name, requirement, Galaxy Server).
+ :param output_path: The path to download the collections to.
+ :param apis: A list of GalaxyAPIs to query when search for a collection.
+ :param validate_certs: Whether to validate the certificate if downloading a tarball from a non-Galaxy host.
+ :param no_deps: Ignore any collection dependencies and only download the base requirements.
+ :param allow_pre_release: Do not ignore pre-release versions when selecting the latest.
+ """
+ with _display_progress("Process download dependency map"):
+ dep_map = _resolve_depenency_map(
+ set(collections),
+ galaxy_apis=apis,
+ preferred_candidates=None,
+ concrete_artifacts_manager=artifacts_manager,
+ no_deps=no_deps,
+ allow_pre_release=allow_pre_release,
+ upgrade=False,
+ # Avoid overhead getting signatures since they are not currently applicable to downloaded collections
+ include_signatures=False,
+ offline=False,
+ )
+
+ b_output_path = to_bytes(output_path, errors='surrogate_or_strict')
+
+ requirements = []
+ with _display_progress(
+ "Starting collection download process to '{path!s}'".
+ format(path=output_path),
+ ):
+ for fqcn, concrete_coll_pin in dep_map.copy().items(): # FIXME: move into the provider
+ if concrete_coll_pin.is_virtual:
+ display.display(
+ 'Virtual collection {coll!s} is not downloadable'.
+ format(coll=to_text(concrete_coll_pin)),
+ )
+ continue
+
+ display.display(
+ u"Downloading collection '{coll!s}' to '{path!s}'".
+ format(coll=to_text(concrete_coll_pin), path=to_text(b_output_path)),
+ )
+
+ b_src_path = (
+ artifacts_manager.get_artifact_path
+ if concrete_coll_pin.is_concrete_artifact
+ else artifacts_manager.get_galaxy_artifact_path
+ )(concrete_coll_pin)
+
+ b_dest_path = os.path.join(
+ b_output_path,
+ os.path.basename(b_src_path),
+ )
+
+ if concrete_coll_pin.is_dir:
+ b_dest_path = to_bytes(
+ build_collection(
+ to_text(b_src_path, errors='surrogate_or_strict'),
+ to_text(output_path, errors='surrogate_or_strict'),
+ force=True,
+ ),
+ errors='surrogate_or_strict',
+ )
+ else:
+ shutil.copy(to_native(b_src_path), to_native(b_dest_path))
+
+ display.display(
+ "Collection '{coll!s}' was downloaded successfully".
+ format(coll=concrete_coll_pin),
+ )
+ requirements.append({
+ # FIXME: Consider using a more specific upgraded format
+ # FIXME: having FQCN in the name field, with src field
+ # FIXME: pointing to the file path, and explicitly set
+ # FIXME: type. If version and name are set, it'd
+ # FIXME: perform validation against the actual metadata
+ # FIXME: in the artifact src points at.
+ 'name': to_native(os.path.basename(b_dest_path)),
+ 'version': concrete_coll_pin.ver,
+ })
+
+ requirements_path = os.path.join(output_path, 'requirements.yml')
+ b_requirements_path = to_bytes(
+ requirements_path, errors='surrogate_or_strict',
+ )
+ display.display(
+ u'Writing requirements.yml file of downloaded collections '
+ "to '{path!s}'".format(path=to_text(requirements_path)),
+ )
+ yaml_bytes = to_bytes(
+ yaml_dump({'collections': requirements}),
+ errors='surrogate_or_strict',
+ )
+ with open(b_requirements_path, mode='wb') as req_fd:
+ req_fd.write(yaml_bytes)
+
+
+def publish_collection(collection_path, api, wait, timeout):
+ """Publish an Ansible collection tarball into an Ansible Galaxy server.
+
+ :param collection_path: The path to the collection tarball to publish.
+ :param api: A GalaxyAPI to publish the collection to.
+ :param wait: Whether to wait until the import process is complete.
+ :param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite.
+ """
+ import_uri = api.publish_collection(collection_path)
+
+ if wait:
+ # Galaxy returns a url fragment which differs between v2 and v3. The second to last entry is
+ # always the task_id, though.
+ # v2: {"task": "https://galaxy-dev.ansible.com/api/v2/collection-imports/35573/"}
+ # v3: {"task": "/api/automation-hub/v3/imports/collections/838d1308-a8f4-402c-95cb-7823f3806cd8/"}
+ task_id = None
+ for path_segment in reversed(import_uri.split('/')):
+ if path_segment:
+ task_id = path_segment
+ break
+
+ if not task_id:
+ raise AnsibleError("Publishing the collection did not return valid task info. Cannot wait for task status. Returned task info: '%s'" % import_uri)
+
+ with _display_progress(
+ "Collection has been published to the Galaxy server "
+ "{api.name!s} {api.api_server!s}".format(api=api),
+ ):
+ api.wait_import_task(task_id, timeout)
+ display.display("Collection has been successfully published and imported to the Galaxy server %s %s"
+ % (api.name, api.api_server))
+ else:
+ display.display("Collection has been pushed to the Galaxy server %s %s, not waiting until import has "
+ "completed due to --no-wait being set. Import task results can be found at %s"
+ % (api.name, api.api_server, import_uri))
+
+
+def install_collections(
+ collections, # type: t.Iterable[Requirement]
+ output_path, # type: str
+ apis, # type: t.Iterable[GalaxyAPI]
+ ignore_errors, # type: bool
+ no_deps, # type: bool
+ force, # type: bool
+ force_deps, # type: bool
+ upgrade, # type: bool
+ allow_pre_release, # type: bool
+ artifacts_manager, # type: ConcreteArtifactsManager
+ disable_gpg_verify, # type: bool
+ offline, # type: bool
+): # type: (...) -> None
+ """Install Ansible collections to the path specified.
+
+ :param collections: The collections to install.
+ :param output_path: The path to install the collections to.
+ :param apis: A list of GalaxyAPIs to query when searching for a collection.
+ :param validate_certs: Whether to validate the certificates if downloading a tarball.
+ :param ignore_errors: Whether to ignore any errors when installing the collection.
+ :param no_deps: Ignore any collection dependencies and only install the base requirements.
+ :param force: Re-install a collection if it has already been installed.
+ :param force_deps: Re-install a collection as well as its dependencies if they have already been installed.
+ """
+ existing_collections = {
+ Requirement(coll.fqcn, coll.ver, coll.src, coll.type, None)
+ for coll in find_existing_collections(output_path, artifacts_manager)
+ }
+
+ unsatisfied_requirements = set(
+ chain.from_iterable(
+ (
+ Requirement.from_dir_path(sub_coll, artifacts_manager)
+ for sub_coll in (
+ artifacts_manager.
+ get_direct_collection_dependencies(install_req).
+ keys()
+ )
+ )
+ if install_req.is_subdirs else (install_req, )
+ for install_req in collections
+ ),
+ )
+ requested_requirements_names = {req.fqcn for req in unsatisfied_requirements}
+
+ # NOTE: Don't attempt to reevaluate already installed deps
+ # NOTE: unless `--force` or `--force-with-deps` is passed
+ unsatisfied_requirements -= set() if force or force_deps else {
+ req
+ for req in unsatisfied_requirements
+ for exs in existing_collections
+ if req.fqcn == exs.fqcn and meets_requirements(exs.ver, req.ver)
+ }
+
+ if not unsatisfied_requirements and not upgrade:
+ display.display(
+ 'Nothing to do. All requested collections are already '
+ 'installed. If you want to reinstall them, '
+ 'consider using `--force`.'
+ )
+ return
+
+ # FIXME: This probably needs to be improved to
+ # FIXME: properly match differing src/type.
+ existing_non_requested_collections = {
+ coll for coll in existing_collections
+ if coll.fqcn not in requested_requirements_names
+ }
+
+ preferred_requirements = (
+ [] if force_deps
+ else existing_non_requested_collections if force
+ else existing_collections
+ )
+ preferred_collections = {
+ # NOTE: No need to include signatures if the collection is already installed
+ Candidate(coll.fqcn, coll.ver, coll.src, coll.type, None)
+ for coll in preferred_requirements
+ }
+ with _display_progress("Process install dependency map"):
+ dependency_map = _resolve_depenency_map(
+ collections,
+ galaxy_apis=apis,
+ preferred_candidates=preferred_collections,
+ concrete_artifacts_manager=artifacts_manager,
+ no_deps=no_deps,
+ allow_pre_release=allow_pre_release,
+ upgrade=upgrade,
+ include_signatures=not disable_gpg_verify,
+ offline=offline,
+ )
+
+ keyring_exists = artifacts_manager.keyring is not None
+ with _display_progress("Starting collection install process"):
+ for fqcn, concrete_coll_pin in dependency_map.items():
+ if concrete_coll_pin.is_virtual:
+ display.vvvv(
+ "'{coll!s}' is virtual, skipping.".
+ format(coll=to_text(concrete_coll_pin)),
+ )
+ continue
+
+ if concrete_coll_pin in preferred_collections:
+ display.display(
+ "'{coll!s}' is already installed, skipping.".
+ format(coll=to_text(concrete_coll_pin)),
+ )
+ continue
+
+ if not disable_gpg_verify and concrete_coll_pin.signatures and not keyring_exists:
+ # Duplicate warning msgs are not displayed
+ display.warning(
+ "The GnuPG keyring used for collection signature "
+ "verification was not configured but signatures were "
+ "provided by the Galaxy server to verify authenticity. "
+ "Configure a keyring for ansible-galaxy to use "
+ "or disable signature verification. "
+ "Skipping signature verification."
+ )
+
+ try:
+ install(concrete_coll_pin, output_path, artifacts_manager)
+ except AnsibleError as err:
+ if ignore_errors:
+ display.warning(
+ 'Failed to install collection {coll!s} but skipping '
+ 'due to --ignore-errors being set. Error: {error!s}'.
+ format(
+ coll=to_text(concrete_coll_pin),
+ error=to_text(err),
+ )
+ )
+ else:
+ raise
+
+
+# NOTE: imported in ansible.cli.galaxy
+def validate_collection_name(name): # type: (str) -> str
+ """Validates the collection name as an input from the user or a requirements file fit the requirements.
+
+ :param name: The input name with optional range specifier split by ':'.
+ :return: The input value, required for argparse validation.
+ """
+ collection, dummy, dummy = name.partition(':')
+ if AnsibleCollectionRef.is_valid_collection_name(collection):
+ return name
+
+ raise AnsibleError("Invalid collection name '%s', "
+ "name must be in the format <namespace>.<collection>. \n"
+ "Please make sure namespace and collection name contains "
+ "characters from [a-zA-Z0-9_] only." % name)
+
+
+# NOTE: imported in ansible.cli.galaxy
+def validate_collection_path(collection_path): # type: (str) -> str
+ """Ensure a given path ends with 'ansible_collections'
+
+ :param collection_path: The path that should end in 'ansible_collections'
+ :return: collection_path ending in 'ansible_collections' if it does not already.
+ """
+
+ if os.path.split(collection_path)[1] != 'ansible_collections':
+ return os.path.join(collection_path, 'ansible_collections')
+
+ return collection_path
+
+
+def verify_collections(
+ collections, # type: t.Iterable[Requirement]
+ search_paths, # type: t.Iterable[str]
+ apis, # type: t.Iterable[GalaxyAPI]
+ ignore_errors, # type: bool
+ local_verify_only, # type: bool
+ artifacts_manager, # type: ConcreteArtifactsManager
+): # type: (...) -> list[CollectionVerifyResult]
+ r"""Verify the integrity of locally installed collections.
+
+ :param collections: The collections to check.
+ :param search_paths: Locations for the local collection lookup.
+ :param apis: A list of GalaxyAPIs to query when searching for a collection.
+ :param ignore_errors: Whether to ignore any errors when verifying the collection.
+ :param local_verify_only: When True, skip downloads and only verify local manifests.
+ :param artifacts_manager: Artifacts manager.
+ :return: list of CollectionVerifyResult objects describing the results of each collection verification
+ """
+ results = [] # type: list[CollectionVerifyResult]
+
+ api_proxy = MultiGalaxyAPIProxy(apis, artifacts_manager)
+
+ with _display_progress():
+ for collection in collections:
+ try:
+ if collection.is_concrete_artifact:
+ raise AnsibleError(
+ message="'{coll_type!s}' type is not supported. "
+ 'The format namespace.name is expected.'.
+ format(coll_type=collection.type)
+ )
+
+ # NOTE: Verify local collection exists before
+ # NOTE: downloading its source artifact from
+ # NOTE: a galaxy server.
+ default_err = 'Collection %s is not installed in any of the collection paths.' % collection.fqcn
+ for search_path in search_paths:
+ b_search_path = to_bytes(
+ os.path.join(
+ search_path,
+ collection.namespace, collection.name,
+ ),
+ errors='surrogate_or_strict',
+ )
+ if not os.path.isdir(b_search_path):
+ continue
+ if not _is_installed_collection_dir(b_search_path):
+ default_err = (
+ "Collection %s does not have a MANIFEST.json. "
+ "A MANIFEST.json is expected if the collection has been built "
+ "and installed via ansible-galaxy" % collection.fqcn
+ )
+ continue
+
+ local_collection = Candidate.from_dir_path(
+ b_search_path, artifacts_manager,
+ )
+ supplemental_signatures = [
+ get_signature_from_source(source, display)
+ for source in collection.signature_sources or []
+ ]
+ local_collection = Candidate(
+ local_collection.fqcn,
+ local_collection.ver,
+ local_collection.src,
+ local_collection.type,
+ signatures=frozenset(supplemental_signatures),
+ )
+
+ break
+ else:
+ raise AnsibleError(message=default_err)
+
+ if local_verify_only:
+ remote_collection = None
+ else:
+ signatures = api_proxy.get_signatures(local_collection)
+ signatures.extend([
+ get_signature_from_source(source, display)
+ for source in collection.signature_sources or []
+ ])
+
+ remote_collection = Candidate(
+ collection.fqcn,
+ collection.ver if collection.ver != '*'
+ else local_collection.ver,
+ None, 'galaxy',
+ frozenset(signatures),
+ )
+
+ # Download collection on a galaxy server for comparison
+ try:
+ # NOTE: If there are no signatures, trigger the lookup. If found,
+ # NOTE: it'll cache download URL and token in artifact manager.
+ # NOTE: If there are no Galaxy server signatures, only user-provided signature URLs,
+ # NOTE: those alone validate the MANIFEST.json and the remote collection is not downloaded.
+ # NOTE: The remote MANIFEST.json is only used in verification if there are no signatures.
+ if not signatures and not collection.signature_sources:
+ api_proxy.get_collection_version_metadata(
+ remote_collection,
+ )
+ except AnsibleError as e: # FIXME: does this actually emit any errors?
+ # FIXME: extract the actual message and adjust this:
+ expected_error_msg = (
+ 'Failed to find collection {coll.fqcn!s}:{coll.ver!s}'.
+ format(coll=collection)
+ )
+ if e.message == expected_error_msg:
+ raise AnsibleError(
+ 'Failed to find remote collection '
+ "'{coll!s}' on any of the galaxy servers".
+ format(coll=collection)
+ )
+ raise
+
+ result = verify_local_collection(local_collection, remote_collection, artifacts_manager)
+
+ results.append(result)
+
+ except AnsibleError as err:
+ if ignore_errors:
+ display.warning(
+ "Failed to verify collection '{coll!s}' but skipping "
+ 'due to --ignore-errors being set. '
+ 'Error: {err!s}'.
+ format(coll=collection, err=to_text(err)),
+ )
+ else:
+ raise
+
+ return results
+
+
+@contextmanager
+def _tempdir():
+ b_temp_path = tempfile.mkdtemp(dir=to_bytes(C.DEFAULT_LOCAL_TMP, errors='surrogate_or_strict'))
+ try:
+ yield b_temp_path
+ finally:
+ shutil.rmtree(b_temp_path)
+
+
+@contextmanager
+def _display_progress(msg=None):
+ config_display = C.GALAXY_DISPLAY_PROGRESS
+ display_wheel = sys.stdout.isatty() if config_display is None else config_display
+
+ global display
+ if msg is not None:
+ display.display(msg)
+
+ if not display_wheel:
+ yield
+ return
+
+ def progress(display_queue, actual_display):
+ actual_display.debug("Starting display_progress display thread")
+ t = threading.current_thread()
+
+ while True:
+ for c in "|/-\\":
+ actual_display.display(c + "\b", newline=False)
+ time.sleep(0.1)
+
+ # Display a message from the main thread
+ while True:
+ try:
+ method, args, kwargs = display_queue.get(block=False, timeout=0.1)
+ except queue.Empty:
+ break
+ else:
+ func = getattr(actual_display, method)
+ func(*args, **kwargs)
+
+ if getattr(t, "finish", False):
+ actual_display.debug("Received end signal for display_progress display thread")
+ return
+
+ class DisplayThread(object):
+
+ def __init__(self, display_queue):
+ self.display_queue = display_queue
+
+ def __getattr__(self, attr):
+ def call_display(*args, **kwargs):
+ self.display_queue.put((attr, args, kwargs))
+
+ return call_display
+
+ # Temporary override the global display class with our own which add the calls to a queue for the thread to call.
+ old_display = display
+ try:
+ display_queue = queue.Queue()
+ display = DisplayThread(display_queue)
+ t = threading.Thread(target=progress, args=(display_queue, old_display))
+ t.daemon = True
+ t.start()
+
+ try:
+ yield
+ finally:
+ t.finish = True
+ t.join()
+ except Exception:
+ # The exception is re-raised so we can sure the thread is finished and not using the display anymore
+ raise
+ finally:
+ display = old_display
+
+
+def _verify_file_hash(b_path, filename, expected_hash, error_queue):
+ b_file_path = to_bytes(os.path.join(to_text(b_path), filename), errors='surrogate_or_strict')
+
+ if not os.path.isfile(b_file_path):
+ actual_hash = None
+ else:
+ with open(b_file_path, mode='rb') as file_object:
+ actual_hash = _consume_file(file_object)
+
+ if expected_hash != actual_hash:
+ error_queue.append(ModifiedContent(filename=filename, expected=expected_hash, installed=actual_hash))
+
+
+def _make_manifest():
+ return {
+ 'files': [
+ {
+ 'name': '.',
+ 'ftype': 'dir',
+ 'chksum_type': None,
+ 'chksum_sha256': None,
+ 'format': MANIFEST_FORMAT,
+ },
+ ],
+ 'format': MANIFEST_FORMAT,
+ }
+
+
+def _make_entry(name, ftype, chksum_type='sha256', chksum=None):
+ return {
+ 'name': name,
+ 'ftype': ftype,
+ 'chksum_type': chksum_type if chksum else None,
+ f'chksum_{chksum_type}': chksum,
+ 'format': MANIFEST_FORMAT
+ }
+
+
+def _build_files_manifest(b_collection_path, namespace, name, ignore_patterns, manifest_control):
+ # type: (bytes, str, str, list[str], dict[str, t.Any]) -> FilesManifestType
+ if ignore_patterns and manifest_control is not Sentinel:
+ raise AnsibleError('"build_ignore" and "manifest" are mutually exclusive')
+
+ if manifest_control is not Sentinel:
+ return _build_files_manifest_distlib(
+ b_collection_path,
+ namespace,
+ name,
+ manifest_control,
+ )
+
+ return _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patterns)
+
+
+def _build_files_manifest_distlib(b_collection_path, namespace, name, manifest_control):
+ # type: (bytes, str, str, dict[str, t.Any]) -> FilesManifestType
+
+ if not HAS_DISTLIB:
+ raise AnsibleError('Use of "manifest" requires the python "distlib" library')
+
+ if manifest_control is None:
+ manifest_control = {}
+
+ try:
+ control = ManifestControl(**manifest_control)
+ except TypeError as ex:
+ raise AnsibleError(f'Invalid "manifest" provided: {ex}')
+
+ if not is_sequence(control.directives):
+ raise AnsibleError(f'"manifest.directives" must be a list, got: {control.directives.__class__.__name__}')
+
+ if not isinstance(control.omit_default_directives, bool):
+ raise AnsibleError(
+ '"manifest.omit_default_directives" is expected to be a boolean, got: '
+ f'{control.omit_default_directives.__class__.__name__}'
+ )
+
+ if control.omit_default_directives and not control.directives:
+ raise AnsibleError(
+ '"manifest.omit_default_directives" was set to True, but no directives were defined '
+ 'in "manifest.directives". This would produce an empty collection artifact.'
+ )
+
+ directives = []
+ if control.omit_default_directives:
+ directives.extend(control.directives)
+ else:
+ directives.extend([
+ 'include meta/*.yml',
+ 'include *.txt *.md *.rst COPYING LICENSE',
+ 'recursive-include tests **',
+ 'recursive-include docs **.rst **.yml **.yaml **.json **.j2 **.txt',
+ 'recursive-include roles **.yml **.yaml **.json **.j2',
+ 'recursive-include playbooks **.yml **.yaml **.json',
+ 'recursive-include changelogs **.yml **.yaml',
+ 'recursive-include plugins */**.py',
+ ])
+
+ plugins = set(l.package.split('.')[-1] for d, l in get_all_plugin_loaders())
+ for plugin in sorted(plugins):
+ if plugin in ('modules', 'module_utils'):
+ continue
+ elif plugin in C.DOCUMENTABLE_PLUGINS:
+ directives.append(
+ f'recursive-include plugins/{plugin} **.yml **.yaml'
+ )
+
+ directives.extend([
+ 'recursive-include plugins/modules **.ps1 **.yml **.yaml',
+ 'recursive-include plugins/module_utils **.ps1 **.psm1 **.cs',
+ ])
+
+ directives.extend(control.directives)
+
+ directives.extend([
+ f'exclude galaxy.yml galaxy.yaml MANIFEST.json FILES.json {namespace}-{name}-*.tar.gz',
+ 'recursive-exclude tests/output **',
+ 'global-exclude /.* /__pycache__',
+ ])
+
+ display.vvv('Manifest Directives:')
+ display.vvv(textwrap.indent('\n'.join(directives), ' '))
+
+ u_collection_path = to_text(b_collection_path, errors='surrogate_or_strict')
+ m = Manifest(u_collection_path)
+ for directive in directives:
+ try:
+ m.process_directive(directive)
+ except DistlibException as e:
+ raise AnsibleError(f'Invalid manifest directive: {e}')
+ except Exception as e:
+ raise AnsibleError(f'Unknown error processing manifest directive: {e}')
+
+ manifest = _make_manifest()
+
+ for abs_path in m.sorted(wantdirs=True):
+ rel_path = os.path.relpath(abs_path, u_collection_path)
+ if os.path.isdir(abs_path):
+ manifest_entry = _make_entry(rel_path, 'dir')
+ else:
+ manifest_entry = _make_entry(
+ rel_path,
+ 'file',
+ chksum_type='sha256',
+ chksum=secure_hash(abs_path, hash_func=sha256)
+ )
+
+ manifest['files'].append(manifest_entry)
+
+ return manifest
+
+
+def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patterns):
+ # type: (bytes, str, str, list[str]) -> FilesManifestType
+ # We always ignore .pyc and .retry files as well as some well known version control directories. The ignore
+ # patterns can be extended by the build_ignore key in galaxy.yml
+ b_ignore_patterns = [
+ b'MANIFEST.json',
+ b'FILES.json',
+ b'galaxy.yml',
+ b'galaxy.yaml',
+ b'.git',
+ b'*.pyc',
+ b'*.retry',
+ b'tests/output', # Ignore ansible-test result output directory.
+ to_bytes('{0}-{1}-*.tar.gz'.format(namespace, name)), # Ignores previously built artifacts in the root dir.
+ ]
+ b_ignore_patterns += [to_bytes(p) for p in ignore_patterns]
+ b_ignore_dirs = frozenset([b'CVS', b'.bzr', b'.hg', b'.git', b'.svn', b'__pycache__', b'.tox'])
+
+ manifest = _make_manifest()
+
+ def _walk(b_path, b_top_level_dir):
+ for b_item in os.listdir(b_path):
+ b_abs_path = os.path.join(b_path, b_item)
+ b_rel_base_dir = b'' if b_path == b_top_level_dir else b_path[len(b_top_level_dir) + 1:]
+ b_rel_path = os.path.join(b_rel_base_dir, b_item)
+ rel_path = to_text(b_rel_path, errors='surrogate_or_strict')
+
+ if os.path.isdir(b_abs_path):
+ if any(b_item == b_path for b_path in b_ignore_dirs) or \
+ any(fnmatch.fnmatch(b_rel_path, b_pattern) for b_pattern in b_ignore_patterns):
+ display.vvv("Skipping '%s' for collection build" % to_text(b_abs_path))
+ continue
+
+ if os.path.islink(b_abs_path):
+ b_link_target = os.path.realpath(b_abs_path)
+
+ if not _is_child_path(b_link_target, b_top_level_dir):
+ display.warning("Skipping '%s' as it is a symbolic link to a directory outside the collection"
+ % to_text(b_abs_path))
+ continue
+
+ manifest['files'].append(_make_entry(rel_path, 'dir'))
+
+ if not os.path.islink(b_abs_path):
+ _walk(b_abs_path, b_top_level_dir)
+ else:
+ if any(fnmatch.fnmatch(b_rel_path, b_pattern) for b_pattern in b_ignore_patterns):
+ display.vvv("Skipping '%s' for collection build" % to_text(b_abs_path))
+ continue
+
+ # Handling of file symlinks occur in _build_collection_tar, the manifest for a symlink is the same for
+ # a normal file.
+ manifest['files'].append(
+ _make_entry(
+ rel_path,
+ 'file',
+ chksum_type='sha256',
+ chksum=secure_hash(b_abs_path, hash_func=sha256)
+ )
+ )
+
+ _walk(b_collection_path, b_collection_path)
+
+ return manifest
+
+
+# FIXME: accept a dict produced from `galaxy.yml` instead of separate args
+def _build_manifest(namespace, name, version, authors, readme, tags, description, license_file,
+ dependencies, repository, documentation, homepage, issues, **kwargs):
+ manifest = {
+ 'collection_info': {
+ 'namespace': namespace,
+ 'name': name,
+ 'version': version,
+ 'authors': authors,
+ 'readme': readme,
+ 'tags': tags,
+ 'description': description,
+ 'license': kwargs['license'],
+ 'license_file': license_file or None, # Handle galaxy.yml having an empty string (None)
+ 'dependencies': dependencies,
+ 'repository': repository,
+ 'documentation': documentation,
+ 'homepage': homepage,
+ 'issues': issues,
+ },
+ 'file_manifest_file': {
+ 'name': 'FILES.json',
+ 'ftype': 'file',
+ 'chksum_type': 'sha256',
+ 'chksum_sha256': None, # Filled out in _build_collection_tar
+ 'format': MANIFEST_FORMAT
+ },
+ 'format': MANIFEST_FORMAT,
+ }
+
+ return manifest
+
+
+def _build_collection_tar(
+ b_collection_path, # type: bytes
+ b_tar_path, # type: bytes
+ collection_manifest, # type: CollectionManifestType
+ file_manifest, # type: FilesManifestType
+): # type: (...) -> str
+ """Build a tar.gz collection artifact from the manifest data."""
+ files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict')
+ collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256)
+ collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict')
+
+ with _tempdir() as b_temp_path:
+ b_tar_filepath = os.path.join(b_temp_path, os.path.basename(b_tar_path))
+
+ with tarfile.open(b_tar_filepath, mode='w:gz') as tar_file:
+ # Add the MANIFEST.json and FILES.json file to the archive
+ for name, b in [(MANIFEST_FILENAME, collection_manifest_json), ('FILES.json', files_manifest_json)]:
+ b_io = BytesIO(b)
+ tar_info = tarfile.TarInfo(name)
+ tar_info.size = len(b)
+ tar_info.mtime = int(time.time())
+ tar_info.mode = 0o0644
+ tar_file.addfile(tarinfo=tar_info, fileobj=b_io)
+
+ for file_info in file_manifest['files']: # type: ignore[union-attr]
+ if file_info['name'] == '.':
+ continue
+
+ # arcname expects a native string, cannot be bytes
+ filename = to_native(file_info['name'], errors='surrogate_or_strict')
+ b_src_path = os.path.join(b_collection_path, to_bytes(filename, errors='surrogate_or_strict'))
+
+ def reset_stat(tarinfo):
+ if tarinfo.type != tarfile.SYMTYPE:
+ existing_is_exec = tarinfo.mode & stat.S_IXUSR
+ tarinfo.mode = 0o0755 if existing_is_exec or tarinfo.isdir() else 0o0644
+ tarinfo.uid = tarinfo.gid = 0
+ tarinfo.uname = tarinfo.gname = ''
+
+ return tarinfo
+
+ if os.path.islink(b_src_path):
+ b_link_target = os.path.realpath(b_src_path)
+ if _is_child_path(b_link_target, b_collection_path):
+ b_rel_path = os.path.relpath(b_link_target, start=os.path.dirname(b_src_path))
+
+ tar_info = tarfile.TarInfo(filename)
+ tar_info.type = tarfile.SYMTYPE
+ tar_info.linkname = to_native(b_rel_path, errors='surrogate_or_strict')
+ tar_info = reset_stat(tar_info)
+ tar_file.addfile(tarinfo=tar_info)
+
+ continue
+
+ # Dealing with a normal file, just add it by name.
+ tar_file.add(
+ to_native(os.path.realpath(b_src_path)),
+ arcname=filename,
+ recursive=False,
+ filter=reset_stat,
+ )
+
+ shutil.copy(to_native(b_tar_filepath), to_native(b_tar_path))
+ collection_name = "%s.%s" % (collection_manifest['collection_info']['namespace'],
+ collection_manifest['collection_info']['name'])
+ tar_path = to_text(b_tar_path)
+ display.display(u'Created collection for %s at %s' % (collection_name, tar_path))
+ return tar_path
+
+
+def _build_collection_dir(b_collection_path, b_collection_output, collection_manifest, file_manifest):
+ """Build a collection directory from the manifest data.
+
+ This should follow the same pattern as _build_collection_tar.
+ """
+ os.makedirs(b_collection_output, mode=0o0755)
+
+ files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict')
+ collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256)
+ collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict')
+
+ # Write contents to the files
+ for name, b in [(MANIFEST_FILENAME, collection_manifest_json), ('FILES.json', files_manifest_json)]:
+ b_path = os.path.join(b_collection_output, to_bytes(name, errors='surrogate_or_strict'))
+ with open(b_path, 'wb') as file_obj, BytesIO(b) as b_io:
+ shutil.copyfileobj(b_io, file_obj)
+
+ os.chmod(b_path, 0o0644)
+
+ base_directories = []
+ for file_info in sorted(file_manifest['files'], key=lambda x: x['name']):
+ if file_info['name'] == '.':
+ continue
+
+ src_file = os.path.join(b_collection_path, to_bytes(file_info['name'], errors='surrogate_or_strict'))
+ dest_file = os.path.join(b_collection_output, to_bytes(file_info['name'], errors='surrogate_or_strict'))
+
+ existing_is_exec = os.stat(src_file).st_mode & stat.S_IXUSR
+ mode = 0o0755 if existing_is_exec else 0o0644
+
+ if os.path.isdir(src_file):
+ mode = 0o0755
+ base_directories.append(src_file)
+ os.mkdir(dest_file, mode)
+ else:
+ shutil.copyfile(src_file, dest_file)
+
+ os.chmod(dest_file, mode)
+ collection_output = to_text(b_collection_output)
+ return collection_output
+
+
+def find_existing_collections(path, artifacts_manager):
+ """Locate all collections under a given path.
+
+ :param path: Collection dirs layout search path.
+ :param artifacts_manager: Artifacts manager.
+ """
+ b_path = to_bytes(path, errors='surrogate_or_strict')
+
+ # FIXME: consider using `glob.glob()` to simplify looping
+ for b_namespace in os.listdir(b_path):
+ b_namespace_path = os.path.join(b_path, b_namespace)
+ if os.path.isfile(b_namespace_path):
+ continue
+
+ # FIXME: consider feeding b_namespace_path to Candidate.from_dir_path to get subdirs automatically
+ for b_collection in os.listdir(b_namespace_path):
+ b_collection_path = os.path.join(b_namespace_path, b_collection)
+ if not os.path.isdir(b_collection_path):
+ continue
+
+ try:
+ req = Candidate.from_dir_path_as_unknown(b_collection_path, artifacts_manager)
+ except ValueError as val_err:
+ raise_from(AnsibleError(val_err), val_err)
+
+ display.vvv(
+ u"Found installed collection {coll!s} at '{path!s}'".
+ format(coll=to_text(req), path=to_text(req.src))
+ )
+ yield req
+
+
+def install(collection, path, artifacts_manager): # FIXME: mv to dataclasses?
+ # type: (Candidate, str, ConcreteArtifactsManager) -> None
+ """Install a collection under a given path.
+
+ :param collection: Collection to be installed.
+ :param path: Collection dirs layout path.
+ :param artifacts_manager: Artifacts manager.
+ """
+ b_artifact_path = (
+ artifacts_manager.get_artifact_path if collection.is_concrete_artifact
+ else artifacts_manager.get_galaxy_artifact_path
+ )(collection)
+
+ collection_path = os.path.join(path, collection.namespace, collection.name)
+ b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
+ display.display(
+ u"Installing '{coll!s}' to '{path!s}'".
+ format(coll=to_text(collection), path=collection_path),
+ )
+
+ if os.path.exists(b_collection_path):
+ shutil.rmtree(b_collection_path)
+
+ if collection.is_dir:
+ install_src(collection, b_artifact_path, b_collection_path, artifacts_manager)
+ else:
+ install_artifact(
+ b_artifact_path,
+ b_collection_path,
+ artifacts_manager._b_working_directory,
+ collection.signatures,
+ artifacts_manager.keyring,
+ artifacts_manager.required_successful_signature_count,
+ artifacts_manager.ignore_signature_errors,
+ )
+ if (collection.is_online_index_pointer and isinstance(collection.src, GalaxyAPI)):
+ write_source_metadata(
+ collection,
+ b_collection_path,
+ artifacts_manager
+ )
+
+ display.display(
+ '{coll!s} was installed successfully'.
+ format(coll=to_text(collection)),
+ )
+
+
+def write_source_metadata(collection, b_collection_path, artifacts_manager):
+ # type: (Candidate, bytes, ConcreteArtifactsManager) -> None
+ source_data = artifacts_manager.get_galaxy_artifact_source_info(collection)
+
+ b_yaml_source_data = to_bytes(yaml_dump(source_data), errors='surrogate_or_strict')
+ b_info_dest = collection.construct_galaxy_info_path(b_collection_path)
+ b_info_dir = os.path.split(b_info_dest)[0]
+
+ if os.path.exists(b_info_dir):
+ shutil.rmtree(b_info_dir)
+
+ try:
+ os.mkdir(b_info_dir, mode=0o0755)
+ with open(b_info_dest, mode='w+b') as fd:
+ fd.write(b_yaml_source_data)
+ os.chmod(b_info_dest, 0o0644)
+ except Exception:
+ # Ensure we don't leave the dir behind in case of a failure.
+ if os.path.isdir(b_info_dir):
+ shutil.rmtree(b_info_dir)
+ raise
+
+
+def verify_artifact_manifest(manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors):
+ # type: (str, list[str], str, str, list[str]) -> None
+ failed_verify = False
+ coll_path_parts = to_text(manifest_file, errors='surrogate_or_strict').split(os.path.sep)
+ collection_name = '%s.%s' % (coll_path_parts[-3], coll_path_parts[-2]) # get 'ns' and 'coll' from /path/to/ns/coll/MANIFEST.json
+ if not verify_file_signatures(collection_name, manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors):
+ raise AnsibleError(f"Not installing {collection_name} because GnuPG signature verification failed.")
+ display.vvvv(f"GnuPG signature verification succeeded for {collection_name}")
+
+
+def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring, required_signature_count, ignore_signature_errors):
+ """Install a collection from tarball under a given path.
+
+ :param b_coll_targz_path: Collection tarball to be installed.
+ :param b_collection_path: Collection dirs layout path.
+ :param b_temp_path: Temporary dir path.
+ :param signatures: frozenset of signatures to verify the MANIFEST.json
+ :param keyring: The keyring used during GPG verification
+ :param required_signature_count: The number of signatures that must successfully verify the collection
+ :param ignore_signature_errors: GPG errors to ignore during signature verification
+ """
+ try:
+ with tarfile.open(b_coll_targz_path, mode='r') as collection_tar:
+ # Verify the signature on the MANIFEST.json before extracting anything else
+ _extract_tar_file(collection_tar, MANIFEST_FILENAME, b_collection_path, b_temp_path)
+
+ if keyring is not None:
+ manifest_file = os.path.join(to_text(b_collection_path, errors='surrogate_or_strict'), MANIFEST_FILENAME)
+ verify_artifact_manifest(manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors)
+
+ files_member_obj = collection_tar.getmember('FILES.json')
+ with _tarfile_extract(collection_tar, files_member_obj) as (dummy, files_obj):
+ files = json.loads(to_text(files_obj.read(), errors='surrogate_or_strict'))
+
+ _extract_tar_file(collection_tar, 'FILES.json', b_collection_path, b_temp_path)
+
+ for file_info in files['files']:
+ file_name = file_info['name']
+ if file_name == '.':
+ continue
+
+ if file_info['ftype'] == 'file':
+ _extract_tar_file(collection_tar, file_name, b_collection_path, b_temp_path,
+ expected_hash=file_info['chksum_sha256'])
+
+ else:
+ _extract_tar_dir(collection_tar, file_name, b_collection_path)
+
+ except Exception:
+ # Ensure we don't leave the dir behind in case of a failure.
+ shutil.rmtree(b_collection_path)
+
+ b_namespace_path = os.path.dirname(b_collection_path)
+ if not os.listdir(b_namespace_path):
+ os.rmdir(b_namespace_path)
+
+ raise
+
+
+def install_src(collection, b_collection_path, b_collection_output_path, artifacts_manager):
+ r"""Install the collection from source control into given dir.
+
+ Generates the Ansible collection artifact data from a galaxy.yml and
+ installs the artifact to a directory.
+ This should follow the same pattern as build_collection, but instead
+ of creating an artifact, install it.
+
+ :param collection: Collection to be installed.
+ :param b_collection_path: Collection dirs layout path.
+ :param b_collection_output_path: The installation directory for the \
+ collection artifact.
+ :param artifacts_manager: Artifacts manager.
+
+ :raises AnsibleError: If no collection metadata found.
+ """
+ collection_meta = artifacts_manager.get_direct_collection_meta(collection)
+
+ if 'build_ignore' not in collection_meta: # installed collection, not src
+ # FIXME: optimize this? use a different process? copy instead of build?
+ collection_meta['build_ignore'] = []
+ collection_meta['manifest'] = Sentinel
+ collection_manifest = _build_manifest(**collection_meta)
+ file_manifest = _build_files_manifest(
+ b_collection_path,
+ collection_meta['namespace'], collection_meta['name'],
+ collection_meta['build_ignore'],
+ collection_meta['manifest'],
+ )
+
+ collection_output_path = _build_collection_dir(
+ b_collection_path, b_collection_output_path,
+ collection_manifest, file_manifest,
+ )
+
+ display.display(
+ 'Created collection for {coll!s} at {path!s}'.
+ format(coll=collection, path=collection_output_path)
+ )
+
+
+def _extract_tar_dir(tar, dirname, b_dest):
+ """ Extracts a directory from a collection tar. """
+ member_names = [to_native(dirname, errors='surrogate_or_strict')]
+
+ # Create list of members with and without trailing separator
+ if not member_names[-1].endswith(os.path.sep):
+ member_names.append(member_names[-1] + os.path.sep)
+
+ # Try all of the member names and stop on the first one that are able to successfully get
+ for member in member_names:
+ try:
+ tar_member = tar.getmember(member)
+ except KeyError:
+ continue
+ break
+ else:
+ # If we still can't find the member, raise a nice error.
+ raise AnsibleError("Unable to extract '%s' from collection" % to_native(member, errors='surrogate_or_strict'))
+
+ b_dir_path = os.path.join(b_dest, to_bytes(dirname, errors='surrogate_or_strict'))
+
+ b_parent_path = os.path.dirname(b_dir_path)
+ try:
+ os.makedirs(b_parent_path, mode=0o0755)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ if tar_member.type == tarfile.SYMTYPE:
+ b_link_path = to_bytes(tar_member.linkname, errors='surrogate_or_strict')
+ if not _is_child_path(b_link_path, b_dest, link_name=b_dir_path):
+ raise AnsibleError("Cannot extract symlink '%s' in collection: path points to location outside of "
+ "collection '%s'" % (to_native(dirname), b_link_path))
+
+ os.symlink(b_link_path, b_dir_path)
+
+ else:
+ if not os.path.isdir(b_dir_path):
+ os.mkdir(b_dir_path, 0o0755)
+
+
+def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
+ """ Extracts a file from a collection tar. """
+ with _get_tar_file_member(tar, filename) as (tar_member, tar_obj):
+ if tar_member.type == tarfile.SYMTYPE:
+ actual_hash = _consume_file(tar_obj)
+
+ else:
+ with tempfile.NamedTemporaryFile(dir=b_temp_path, delete=False) as tmpfile_obj:
+ actual_hash = _consume_file(tar_obj, tmpfile_obj)
+
+ if expected_hash and actual_hash != expected_hash:
+ raise AnsibleError("Checksum mismatch for '%s' inside collection at '%s'"
+ % (to_native(filename, errors='surrogate_or_strict'), to_native(tar.name)))
+
+ b_dest_filepath = os.path.abspath(os.path.join(b_dest, to_bytes(filename, errors='surrogate_or_strict')))
+ b_parent_dir = os.path.dirname(b_dest_filepath)
+ if not _is_child_path(b_parent_dir, b_dest):
+ raise AnsibleError("Cannot extract tar entry '%s' as it will be placed outside the collection directory"
+ % to_native(filename, errors='surrogate_or_strict'))
+
+ if not os.path.exists(b_parent_dir):
+ # Seems like Galaxy does not validate if all file entries have a corresponding dir ftype entry. This check
+ # makes sure we create the parent directory even if it wasn't set in the metadata.
+ os.makedirs(b_parent_dir, mode=0o0755)
+
+ if tar_member.type == tarfile.SYMTYPE:
+ b_link_path = to_bytes(tar_member.linkname, errors='surrogate_or_strict')
+ if not _is_child_path(b_link_path, b_dest, link_name=b_dest_filepath):
+ raise AnsibleError("Cannot extract symlink '%s' in collection: path points to location outside of "
+ "collection '%s'" % (to_native(filename), b_link_path))
+
+ os.symlink(b_link_path, b_dest_filepath)
+
+ else:
+ shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath)
+
+ # Default to rw-r--r-- and only add execute if the tar file has execute.
+ tar_member = tar.getmember(to_native(filename, errors='surrogate_or_strict'))
+ new_mode = 0o644
+ if stat.S_IMODE(tar_member.mode) & stat.S_IXUSR:
+ new_mode |= 0o0111
+
+ os.chmod(b_dest_filepath, new_mode)
+
+
+def _get_tar_file_member(tar, filename):
+ n_filename = to_native(filename, errors='surrogate_or_strict')
+ try:
+ member = tar.getmember(n_filename)
+ except KeyError:
+ raise AnsibleError("Collection tar at '%s' does not contain the expected file '%s'." % (
+ to_native(tar.name),
+ n_filename))
+
+ return _tarfile_extract(tar, member)
+
+
+def _get_json_from_tar_file(b_path, filename):
+ file_contents = ''
+
+ with tarfile.open(b_path, mode='r') as collection_tar:
+ with _get_tar_file_member(collection_tar, filename) as (dummy, tar_obj):
+ bufsize = 65536
+ data = tar_obj.read(bufsize)
+ while data:
+ file_contents += to_text(data)
+ data = tar_obj.read(bufsize)
+
+ return json.loads(file_contents)
+
+
+def _get_tar_file_hash(b_path, filename):
+ with tarfile.open(b_path, mode='r') as collection_tar:
+ with _get_tar_file_member(collection_tar, filename) as (dummy, tar_obj):
+ return _consume_file(tar_obj)
+
+
+def _get_file_hash(b_path, filename): # type: (bytes, str) -> str
+ filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict'))
+ with open(filepath, 'rb') as fp:
+ return _consume_file(fp)
+
+
+def _is_child_path(path, parent_path, link_name=None):
+ """ Checks that path is a path within the parent_path specified. """
+ b_path = to_bytes(path, errors='surrogate_or_strict')
+
+ if link_name and not os.path.isabs(b_path):
+ # If link_name is specified, path is the source of the link and we need to resolve the absolute path.
+ b_link_dir = os.path.dirname(to_bytes(link_name, errors='surrogate_or_strict'))
+ b_path = os.path.abspath(os.path.join(b_link_dir, b_path))
+
+ b_parent_path = to_bytes(parent_path, errors='surrogate_or_strict')
+ return b_path == b_parent_path or b_path.startswith(b_parent_path + to_bytes(os.path.sep))
+
+
+def _resolve_depenency_map(
+ requested_requirements, # type: t.Iterable[Requirement]
+ galaxy_apis, # type: t.Iterable[GalaxyAPI]
+ concrete_artifacts_manager, # type: ConcreteArtifactsManager
+ preferred_candidates, # type: t.Iterable[Candidate] | None
+ no_deps, # type: bool
+ allow_pre_release, # type: bool
+ upgrade, # type: bool
+ include_signatures, # type: bool
+ offline, # type: bool
+): # type: (...) -> dict[str, Candidate]
+ """Return the resolved dependency map."""
+ if not HAS_RESOLVELIB:
+ raise AnsibleError("Failed to import resolvelib, check that a supported version is installed")
+ if not HAS_PACKAGING:
+ raise AnsibleError("Failed to import packaging, check that a supported version is installed")
+
+ req = None
+
+ try:
+ dist = distribution('ansible-core')
+ except Exception:
+ pass
+ else:
+ req = next((rr for r in (dist.requires or []) if (rr := PkgReq(r)).name == 'resolvelib'), None)
+ finally:
+ if req is None:
+ # TODO: replace the hardcoded versions with a warning if the dist info is missing
+ # display.warning("Unable to find 'ansible-core' distribution requirements to verify the resolvelib version is supported.")
+ if not RESOLVELIB_LOWERBOUND <= RESOLVELIB_VERSION < RESOLVELIB_UPPERBOUND:
+ raise AnsibleError(
+ f"ansible-galaxy requires resolvelib<{RESOLVELIB_UPPERBOUND.vstring},>={RESOLVELIB_LOWERBOUND.vstring}"
+ )
+ elif not req.specifier.contains(RESOLVELIB_VERSION.vstring):
+ raise AnsibleError(f"ansible-galaxy requires {req.name}{req.specifier}")
+
+ collection_dep_resolver = build_collection_dependency_resolver(
+ galaxy_apis=galaxy_apis,
+ concrete_artifacts_manager=concrete_artifacts_manager,
+ user_requirements=requested_requirements,
+ preferred_candidates=preferred_candidates,
+ with_deps=not no_deps,
+ with_pre_releases=allow_pre_release,
+ upgrade=upgrade,
+ include_signatures=include_signatures,
+ offline=offline,
+ )
+ try:
+ return collection_dep_resolver.resolve(
+ requested_requirements,
+ max_rounds=2000000, # NOTE: same constant pip uses
+ ).mapping
+ except CollectionDependencyResolutionImpossible as dep_exc:
+ conflict_causes = (
+ '* {req.fqcn!s}:{req.ver!s} ({dep_origin!s})'.format(
+ req=req_inf.requirement,
+ dep_origin='direct request'
+ if req_inf.parent is None
+ else 'dependency of {parent!s}'.
+ format(parent=req_inf.parent),
+ )
+ for req_inf in dep_exc.causes
+ )
+ error_msg_lines = list(chain(
+ (
+ 'Failed to resolve the requested '
+ 'dependencies map. Could not satisfy the following '
+ 'requirements:',
+ ),
+ conflict_causes,
+ ))
+ raise raise_from( # NOTE: Leading "raise" is a hack for mypy bug #9717
+ AnsibleError('\n'.join(error_msg_lines)),
+ dep_exc,
+ )
+ except CollectionDependencyInconsistentCandidate as dep_exc:
+ parents = [
+ "%s.%s:%s" % (p.namespace, p.name, p.ver)
+ for p in dep_exc.criterion.iter_parent()
+ if p is not None
+ ]
+
+ error_msg_lines = [
+ (
+ 'Failed to resolve the requested dependencies map. '
+ 'Got the candidate {req.fqcn!s}:{req.ver!s} ({dep_origin!s}) '
+ 'which didn\'t satisfy all of the following requirements:'.
+ format(
+ req=dep_exc.candidate,
+ dep_origin='direct request'
+ if not parents else 'dependency of {parent!s}'.
+ format(parent=', '.join(parents))
+ )
+ )
+ ]
+
+ for req in dep_exc.criterion.iter_requirement():
+ error_msg_lines.append(
+ '* {req.fqcn!s}:{req.ver!s}'.format(req=req)
+ )
+
+ raise raise_from( # NOTE: Leading "raise" is a hack for mypy bug #9717
+ AnsibleError('\n'.join(error_msg_lines)),
+ dep_exc,
+ )
+ except ValueError as exc:
+ raise AnsibleError(to_native(exc)) from exc
diff --git a/lib/ansible/galaxy/collection/concrete_artifact_manager.py b/lib/ansible/galaxy/collection/concrete_artifact_manager.py
new file mode 100644
index 0000000..7c920b8
--- /dev/null
+++ b/lib/ansible/galaxy/collection/concrete_artifact_manager.py
@@ -0,0 +1,755 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Concrete collection candidate management helper module."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+import os
+import tarfile
+import subprocess
+import typing as t
+
+from contextlib import contextmanager
+from hashlib import sha256
+from urllib.error import URLError
+from urllib.parse import urldefrag
+from shutil import rmtree
+from tempfile import mkdtemp
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.dependency_resolution.dataclasses import (
+ Candidate, Requirement,
+ )
+ from ansible.galaxy.token import GalaxyToken
+
+from ansible.errors import AnsibleError
+from ansible.galaxy import get_collections_galaxy_meta_info
+from ansible.galaxy.dependency_resolution.dataclasses import _GALAXY_YAML
+from ansible.galaxy.user_agent import user_agent
+from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.common.process import get_bin_path
+from ansible.module_utils.common._collections_compat import MutableMapping
+from ansible.module_utils.common.yaml import yaml_load
+from ansible.module_utils.six import raise_from
+from ansible.module_utils.urls import open_url
+from ansible.utils.display import Display
+from ansible.utils.sentinel import Sentinel
+
+import yaml
+
+
+display = Display()
+
+MANIFEST_FILENAME = 'MANIFEST.json'
+
+
+class ConcreteArtifactsManager:
+ """Manager for on-disk collection artifacts.
+
+ It is responsible for:
+ * downloading remote collections from Galaxy-compatible servers and
+ direct links to tarballs or SCM repositories
+ * keeping track of local ones
+ * keeping track of Galaxy API tokens for downloads from Galaxy'ish
+ as well as the artifact hashes
+ * keeping track of Galaxy API signatures for downloads from Galaxy'ish
+ * caching all of above
+ * retrieving the metadata out of the downloaded artifacts
+ """
+ def __init__(self, b_working_directory, validate_certs=True, keyring=None, timeout=60, required_signature_count=None, ignore_signature_errors=None):
+ # type: (bytes, bool, str, int, str, list[str]) -> None
+ """Initialize ConcreteArtifactsManager caches and costraints."""
+ self._validate_certs = validate_certs # type: bool
+ self._artifact_cache = {} # type: dict[bytes, bytes]
+ self._galaxy_artifact_cache = {} # type: dict[Candidate | Requirement, bytes]
+ self._artifact_meta_cache = {} # type: dict[bytes, dict[str, str | list[str] | dict[str, str] | None | t.Type[Sentinel]]]
+ self._galaxy_collection_cache = {} # type: dict[Candidate | Requirement, tuple[str, str, GalaxyToken]]
+ self._galaxy_collection_origin_cache = {} # type: dict[Candidate, tuple[str, list[dict[str, str]]]]
+ self._b_working_directory = b_working_directory # type: bytes
+ self._supplemental_signature_cache = {} # type: dict[str, str]
+ self._keyring = keyring # type: str
+ self.timeout = timeout # type: int
+ self._required_signature_count = required_signature_count # type: str
+ self._ignore_signature_errors = ignore_signature_errors # type: list[str]
+ self._require_build_metadata = True # type: bool
+
+ @property
+ def keyring(self):
+ return self._keyring
+
+ @property
+ def required_successful_signature_count(self):
+ return self._required_signature_count
+
+ @property
+ def ignore_signature_errors(self):
+ if self._ignore_signature_errors is None:
+ return []
+ return self._ignore_signature_errors
+
+ @property
+ def require_build_metadata(self):
+ # type: () -> bool
+ return self._require_build_metadata
+
+ @require_build_metadata.setter
+ def require_build_metadata(self, value):
+ # type: (bool) -> None
+ self._require_build_metadata = value
+
+ def get_galaxy_artifact_source_info(self, collection):
+ # type: (Candidate) -> dict[str, t.Union[str, list[dict[str, str]]]]
+ server = collection.src.api_server
+
+ try:
+ download_url = self._galaxy_collection_cache[collection][0]
+ signatures_url, signatures = self._galaxy_collection_origin_cache[collection]
+ except KeyError as key_err:
+ raise RuntimeError(
+ 'The is no known source for {coll!s}'.
+ format(coll=collection),
+ ) from key_err
+
+ return {
+ "format_version": "1.0.0",
+ "namespace": collection.namespace,
+ "name": collection.name,
+ "version": collection.ver,
+ "server": server,
+ "version_url": signatures_url,
+ "download_url": download_url,
+ "signatures": signatures,
+ }
+
+ def get_galaxy_artifact_path(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> bytes
+ """Given a Galaxy-stored collection, return a cached path.
+
+ If it's not yet on disk, this method downloads the artifact first.
+ """
+ try:
+ return self._galaxy_artifact_cache[collection]
+ except KeyError:
+ pass
+
+ try:
+ url, sha256_hash, token = self._galaxy_collection_cache[collection]
+ except KeyError as key_err:
+ raise_from(
+ RuntimeError(
+ 'The is no known source for {coll!s}'.
+ format(coll=collection),
+ ),
+ key_err,
+ )
+
+ display.vvvv(
+ "Fetching a collection tarball for '{collection!s}' from "
+ 'Ansible Galaxy'.format(collection=collection),
+ )
+
+ try:
+ b_artifact_path = _download_file(
+ url,
+ self._b_working_directory,
+ expected_hash=sha256_hash,
+ validate_certs=self._validate_certs,
+ token=token,
+ ) # type: bytes
+ except URLError as err:
+ raise_from(
+ AnsibleError(
+ 'Failed to download collection tar '
+ "from '{coll_src!s}': {download_err!s}".
+ format(
+ coll_src=to_native(collection.src),
+ download_err=to_native(err),
+ ),
+ ),
+ err,
+ )
+ else:
+ display.vvv(
+ "Collection '{coll!s}' obtained from "
+ 'server {server!s} {url!s}'.format(
+ coll=collection, server=collection.src or 'Galaxy',
+ url=collection.src.api_server if collection.src is not None
+ else '',
+ )
+ )
+
+ self._galaxy_artifact_cache[collection] = b_artifact_path
+ return b_artifact_path
+
+ def get_artifact_path(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> bytes
+ """Given a concrete collection pointer, return a cached path.
+
+ If it's not yet on disk, this method downloads the artifact first.
+ """
+ try:
+ return self._artifact_cache[collection.src]
+ except KeyError:
+ pass
+
+ # NOTE: SCM needs to be special-cased as it may contain either
+ # NOTE: one collection in its root, or a number of top-level
+ # NOTE: collection directories instead.
+ # NOTE: The idea is to store the SCM collection as unpacked
+ # NOTE: directory structure under the temporary location and use
+ # NOTE: a "virtual" collection that has pinned requirements on
+ # NOTE: the directories under that SCM checkout that correspond
+ # NOTE: to collections.
+ # NOTE: This brings us to the idea that we need two separate
+ # NOTE: virtual Requirement/Candidate types --
+ # NOTE: (single) dir + (multidir) subdirs
+ if collection.is_url:
+ display.vvvv(
+ "Collection requirement '{collection!s}' is a URL "
+ 'to a tar artifact'.format(collection=collection.fqcn),
+ )
+ try:
+ b_artifact_path = _download_file(
+ collection.src,
+ self._b_working_directory,
+ expected_hash=None, # NOTE: URLs don't support checksums
+ validate_certs=self._validate_certs,
+ timeout=self.timeout
+ )
+ except Exception as err:
+ raise_from(
+ AnsibleError(
+ 'Failed to download collection tar '
+ "from '{coll_src!s}': {download_err!s}".
+ format(
+ coll_src=to_native(collection.src),
+ download_err=to_native(err),
+ ),
+ ),
+ err,
+ )
+ elif collection.is_scm:
+ b_artifact_path = _extract_collection_from_git(
+ collection.src,
+ collection.ver,
+ self._b_working_directory,
+ )
+ elif collection.is_file or collection.is_dir or collection.is_subdirs:
+ b_artifact_path = to_bytes(collection.src)
+ else:
+ # NOTE: This may happen `if collection.is_online_index_pointer`
+ raise RuntimeError(
+ 'The artifact is of an unexpected type {art_type!s}'.
+ format(art_type=collection.type)
+ )
+
+ self._artifact_cache[collection.src] = b_artifact_path
+ return b_artifact_path
+
+ def _get_direct_collection_namespace(self, collection):
+ # type: (Candidate) -> t.Optional[str]
+ return self.get_direct_collection_meta(collection)['namespace'] # type: ignore[return-value]
+
+ def _get_direct_collection_name(self, collection):
+ # type: (Candidate) -> t.Optional[str]
+ return self.get_direct_collection_meta(collection)['name'] # type: ignore[return-value]
+
+ def get_direct_collection_fqcn(self, collection):
+ # type: (Candidate) -> t.Optional[str]
+ """Extract FQCN from the given on-disk collection artifact.
+
+ If the collection is virtual, ``None`` is returned instead
+ of a string.
+ """
+ if collection.is_virtual:
+ # NOTE: should it be something like "<virtual>"?
+ return None
+
+ return '.'.join(( # type: ignore[type-var]
+ self._get_direct_collection_namespace(collection), # type: ignore[arg-type]
+ self._get_direct_collection_name(collection),
+ ))
+
+ def get_direct_collection_version(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> str
+ """Extract version from the given on-disk collection artifact."""
+ return self.get_direct_collection_meta(collection)['version'] # type: ignore[return-value]
+
+ def get_direct_collection_dependencies(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> dict[str, str]
+ """Extract deps from the given on-disk collection artifact."""
+ collection_dependencies = self.get_direct_collection_meta(collection)['dependencies']
+ if collection_dependencies is None:
+ collection_dependencies = {}
+ return collection_dependencies # type: ignore[return-value]
+
+ def get_direct_collection_meta(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> dict[str, t.Union[str, dict[str, str], list[str], None, t.Type[Sentinel]]]
+ """Extract meta from the given on-disk collection artifact."""
+ try: # FIXME: use unique collection identifier as a cache key?
+ return self._artifact_meta_cache[collection.src]
+ except KeyError:
+ b_artifact_path = self.get_artifact_path(collection)
+
+ if collection.is_url or collection.is_file:
+ collection_meta = _get_meta_from_tar(b_artifact_path)
+ elif collection.is_dir: # should we just build a coll instead?
+ # FIXME: what if there's subdirs?
+ try:
+ collection_meta = _get_meta_from_dir(b_artifact_path, self.require_build_metadata)
+ except LookupError as lookup_err:
+ raise_from(
+ AnsibleError(
+ 'Failed to find the collection dir deps: {err!s}'.
+ format(err=to_native(lookup_err)),
+ ),
+ lookup_err,
+ )
+ elif collection.is_scm:
+ collection_meta = {
+ 'name': None,
+ 'namespace': None,
+ 'dependencies': {to_native(b_artifact_path): '*'},
+ 'version': '*',
+ }
+ elif collection.is_subdirs:
+ collection_meta = {
+ 'name': None,
+ 'namespace': None,
+ # NOTE: Dropping b_artifact_path since it's based on src anyway
+ 'dependencies': dict.fromkeys(
+ map(to_native, collection.namespace_collection_paths),
+ '*',
+ ),
+ 'version': '*',
+ }
+ else:
+ raise RuntimeError
+
+ self._artifact_meta_cache[collection.src] = collection_meta
+ return collection_meta
+
+ def save_collection_source(self, collection, url, sha256_hash, token, signatures_url, signatures):
+ # type: (Candidate, str, str, GalaxyToken, str, list[dict[str, str]]) -> None
+ """Store collection URL, SHA256 hash and Galaxy API token.
+
+ This is a hook that is supposed to be called before attempting to
+ download Galaxy-based collections with ``get_galaxy_artifact_path()``.
+ """
+ self._galaxy_collection_cache[collection] = url, sha256_hash, token
+ self._galaxy_collection_origin_cache[collection] = signatures_url, signatures
+
+ @classmethod
+ @contextmanager
+ def under_tmpdir(
+ cls,
+ temp_dir_base, # type: str
+ validate_certs=True, # type: bool
+ keyring=None, # type: str
+ required_signature_count=None, # type: str
+ ignore_signature_errors=None, # type: list[str]
+ require_build_metadata=True, # type: bool
+ ): # type: (...) -> t.Iterator[ConcreteArtifactsManager]
+ """Custom ConcreteArtifactsManager constructor with temp dir.
+
+ This method returns a context manager that allocates and cleans
+ up a temporary directory for caching the collection artifacts
+ during the dependency resolution process.
+ """
+ # NOTE: Can't use `with tempfile.TemporaryDirectory:`
+ # NOTE: because it's not in Python 2 stdlib.
+ temp_path = mkdtemp(
+ dir=to_bytes(temp_dir_base, errors='surrogate_or_strict'),
+ )
+ b_temp_path = to_bytes(temp_path, errors='surrogate_or_strict')
+ try:
+ yield cls(
+ b_temp_path,
+ validate_certs,
+ keyring=keyring,
+ required_signature_count=required_signature_count,
+ ignore_signature_errors=ignore_signature_errors
+ )
+ finally:
+ rmtree(b_temp_path)
+
+
+def parse_scm(collection, version):
+ """Extract name, version, path and subdir out of the SCM pointer."""
+ if ',' in collection:
+ collection, version = collection.split(',', 1)
+ elif version == '*' or not version:
+ version = 'HEAD'
+
+ if collection.startswith('git+'):
+ path = collection[4:]
+ else:
+ path = collection
+
+ path, fragment = urldefrag(path)
+ fragment = fragment.strip(os.path.sep)
+
+ if path.endswith(os.path.sep + '.git'):
+ name = path.split(os.path.sep)[-2]
+ elif '://' not in path and '@' not in path:
+ name = path
+ else:
+ name = path.split('/')[-1]
+ if name.endswith('.git'):
+ name = name[:-4]
+
+ return name, version, path, fragment
+
+
+def _extract_collection_from_git(repo_url, coll_ver, b_path):
+ name, version, git_url, fragment = parse_scm(repo_url, coll_ver)
+ b_checkout_path = mkdtemp(
+ dir=b_path,
+ prefix=to_bytes(name, errors='surrogate_or_strict'),
+ ) # type: bytes
+
+ try:
+ git_executable = get_bin_path('git')
+ except ValueError as err:
+ raise AnsibleError(
+ "Could not find git executable to extract the collection from the Git repository `{repo_url!s}`.".
+ format(repo_url=to_native(git_url))
+ ) from err
+
+ # Perform a shallow clone if simply cloning HEAD
+ if version == 'HEAD':
+ git_clone_cmd = git_executable, 'clone', '--depth=1', git_url, to_text(b_checkout_path)
+ else:
+ git_clone_cmd = git_executable, 'clone', git_url, to_text(b_checkout_path)
+ # FIXME: '--branch', version
+
+ try:
+ subprocess.check_call(git_clone_cmd)
+ except subprocess.CalledProcessError as proc_err:
+ raise_from(
+ AnsibleError( # should probably be LookupError
+ 'Failed to clone a Git repository from `{repo_url!s}`.'.
+ format(repo_url=to_native(git_url)),
+ ),
+ proc_err,
+ )
+
+ git_switch_cmd = git_executable, 'checkout', to_text(version)
+ try:
+ subprocess.check_call(git_switch_cmd, cwd=b_checkout_path)
+ except subprocess.CalledProcessError as proc_err:
+ raise_from(
+ AnsibleError( # should probably be LookupError
+ 'Failed to switch a cloned Git repo `{repo_url!s}` '
+ 'to the requested revision `{commitish!s}`.'.
+ format(
+ commitish=to_native(version),
+ repo_url=to_native(git_url),
+ ),
+ ),
+ proc_err,
+ )
+
+ return (
+ os.path.join(b_checkout_path, to_bytes(fragment))
+ if fragment else b_checkout_path
+ )
+
+
+# FIXME: use random subdirs while preserving the file names
+def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeout=60):
+ # type: (str, bytes, t.Optional[str], bool, GalaxyToken, int) -> bytes
+ # ^ NOTE: used in download and verify_collections ^
+ b_tarball_name = to_bytes(
+ url.rsplit('/', 1)[1], errors='surrogate_or_strict',
+ )
+ b_file_name = b_tarball_name[:-len('.tar.gz')]
+
+ b_tarball_dir = mkdtemp(
+ dir=b_path,
+ prefix=b'-'.join((b_file_name, b'')),
+ ) # type: bytes
+
+ b_file_path = os.path.join(b_tarball_dir, b_tarball_name)
+
+ display.display("Downloading %s to %s" % (url, to_text(b_tarball_dir)))
+ # NOTE: Galaxy redirects downloads to S3 which rejects the request
+ # NOTE: if an Authorization header is attached so don't redirect it
+ resp = open_url(
+ to_native(url, errors='surrogate_or_strict'),
+ validate_certs=validate_certs,
+ headers=None if token is None else token.headers(),
+ unredirected_headers=['Authorization'], http_agent=user_agent(),
+ timeout=timeout
+ )
+
+ with open(b_file_path, 'wb') as download_file: # type: t.BinaryIO
+ actual_hash = _consume_file(resp, write_to=download_file)
+
+ if expected_hash:
+ display.vvvv(
+ 'Validating downloaded file hash {actual_hash!s} with '
+ 'expected hash {expected_hash!s}'.
+ format(actual_hash=actual_hash, expected_hash=expected_hash)
+ )
+ if expected_hash != actual_hash:
+ raise AnsibleError('Mismatch artifact hash with downloaded file')
+
+ return b_file_path
+
+
+def _consume_file(read_from, write_to=None):
+ # type: (t.BinaryIO, t.BinaryIO) -> str
+ bufsize = 65536
+ sha256_digest = sha256()
+ data = read_from.read(bufsize)
+ while data:
+ if write_to is not None:
+ write_to.write(data)
+ write_to.flush()
+ sha256_digest.update(data)
+ data = read_from.read(bufsize)
+
+ return sha256_digest.hexdigest()
+
+
+def _normalize_galaxy_yml_manifest(
+ galaxy_yml, # type: dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ b_galaxy_yml_path, # type: bytes
+ require_build_metadata=True, # type: bool
+):
+ # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ galaxy_yml_schema = (
+ get_collections_galaxy_meta_info()
+ ) # type: list[dict[str, t.Any]] # FIXME: <--
+ # FIXME: 👆maybe precise type: list[dict[str, t.Union[bool, str, list[str]]]]
+
+ mandatory_keys = set()
+ string_keys = set() # type: set[str]
+ list_keys = set() # type: set[str]
+ dict_keys = set() # type: set[str]
+ sentinel_keys = set() # type: set[str]
+
+ for info in galaxy_yml_schema:
+ if info.get('required', False):
+ mandatory_keys.add(info['key'])
+
+ key_list_type = {
+ 'str': string_keys,
+ 'list': list_keys,
+ 'dict': dict_keys,
+ 'sentinel': sentinel_keys,
+ }[info.get('type', 'str')]
+ key_list_type.add(info['key'])
+
+ all_keys = frozenset(mandatory_keys | string_keys | list_keys | dict_keys | sentinel_keys)
+
+ set_keys = set(galaxy_yml.keys())
+ missing_keys = mandatory_keys.difference(set_keys)
+ if missing_keys:
+ msg = (
+ "The collection galaxy.yml at '%s' is missing the following mandatory keys: %s"
+ % (to_native(b_galaxy_yml_path), ", ".join(sorted(missing_keys)))
+ )
+ if require_build_metadata:
+ raise AnsibleError(msg)
+ display.warning(msg)
+ raise ValueError(msg)
+
+ extra_keys = set_keys.difference(all_keys)
+ if len(extra_keys) > 0:
+ display.warning("Found unknown keys in collection galaxy.yml at '%s': %s"
+ % (to_text(b_galaxy_yml_path), ", ".join(extra_keys)))
+
+ # Add the defaults if they have not been set
+ for optional_string in string_keys:
+ if optional_string not in galaxy_yml:
+ galaxy_yml[optional_string] = None
+
+ for optional_list in list_keys:
+ list_val = galaxy_yml.get(optional_list, None)
+
+ if list_val is None:
+ galaxy_yml[optional_list] = []
+ elif not isinstance(list_val, list):
+ galaxy_yml[optional_list] = [list_val] # type: ignore[list-item]
+
+ for optional_dict in dict_keys:
+ if optional_dict not in galaxy_yml:
+ galaxy_yml[optional_dict] = {}
+
+ for optional_sentinel in sentinel_keys:
+ if optional_sentinel not in galaxy_yml:
+ galaxy_yml[optional_sentinel] = Sentinel
+
+ # NOTE: `version: null` is only allowed for `galaxy.yml`
+ # NOTE: and not `MANIFEST.json`. The use-case for it is collections
+ # NOTE: that generate the version from Git before building a
+ # NOTE: distributable tarball artifact.
+ if not galaxy_yml.get('version'):
+ galaxy_yml['version'] = '*'
+
+ return galaxy_yml
+
+
+def _get_meta_from_dir(
+ b_path, # type: bytes
+ require_build_metadata=True, # type: bool
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ try:
+ return _get_meta_from_installed_dir(b_path)
+ except LookupError:
+ return _get_meta_from_src_dir(b_path, require_build_metadata)
+
+
+def _get_meta_from_src_dir(
+ b_path, # type: bytes
+ require_build_metadata=True, # type: bool
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ galaxy_yml = os.path.join(b_path, _GALAXY_YAML)
+ if not os.path.isfile(galaxy_yml):
+ raise LookupError(
+ "The collection galaxy.yml path '{path!s}' does not exist.".
+ format(path=to_native(galaxy_yml))
+ )
+
+ with open(galaxy_yml, 'rb') as manifest_file_obj:
+ try:
+ manifest = yaml_load(manifest_file_obj)
+ except yaml.error.YAMLError as yaml_err:
+ raise_from(
+ AnsibleError(
+ "Failed to parse the galaxy.yml at '{path!s}' with "
+ 'the following error:\n{err_txt!s}'.
+ format(
+ path=to_native(galaxy_yml),
+ err_txt=to_native(yaml_err),
+ ),
+ ),
+ yaml_err,
+ )
+
+ if not isinstance(manifest, dict):
+ if require_build_metadata:
+ raise AnsibleError(f"The collection galaxy.yml at '{to_native(galaxy_yml)}' is incorrectly formatted.")
+ # Valid build metadata is not required by ansible-galaxy list. Raise ValueError to fall back to implicit metadata.
+ display.warning(f"The collection galaxy.yml at '{to_native(galaxy_yml)}' is incorrectly formatted.")
+ raise ValueError(f"The collection galaxy.yml at '{to_native(galaxy_yml)}' is incorrectly formatted.")
+
+ return _normalize_galaxy_yml_manifest(manifest, galaxy_yml, require_build_metadata)
+
+
+def _get_json_from_installed_dir(
+ b_path, # type: bytes
+ filename, # type: str
+): # type: (...) -> dict
+
+ b_json_filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict'))
+
+ try:
+ with open(b_json_filepath, 'rb') as manifest_fd:
+ b_json_text = manifest_fd.read()
+ except (IOError, OSError):
+ raise LookupError(
+ "The collection {manifest!s} path '{path!s}' does not exist.".
+ format(
+ manifest=filename,
+ path=to_native(b_json_filepath),
+ )
+ )
+
+ manifest_txt = to_text(b_json_text, errors='surrogate_or_strict')
+
+ try:
+ manifest = json.loads(manifest_txt)
+ except ValueError:
+ raise AnsibleError(
+ 'Collection tar file member {member!s} does not '
+ 'contain a valid json string.'.
+ format(member=filename),
+ )
+
+ return manifest
+
+
+def _get_meta_from_installed_dir(
+ b_path, # type: bytes
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ manifest = _get_json_from_installed_dir(b_path, MANIFEST_FILENAME)
+ collection_info = manifest['collection_info']
+
+ version = collection_info.get('version')
+ if not version:
+ raise AnsibleError(
+ u'Collection metadata file `{manifest_filename!s}` at `{meta_file!s}` is expected '
+ u'to have a valid SemVer version value but got {version!s}'.
+ format(
+ manifest_filename=MANIFEST_FILENAME,
+ meta_file=to_text(b_path),
+ version=to_text(repr(version)),
+ ),
+ )
+
+ return collection_info
+
+
+def _get_meta_from_tar(
+ b_path, # type: bytes
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ if not tarfile.is_tarfile(b_path):
+ raise AnsibleError(
+ "Collection artifact at '{path!s}' is not a valid tar file.".
+ format(path=to_native(b_path)),
+ )
+
+ with tarfile.open(b_path, mode='r') as collection_tar: # type: tarfile.TarFile
+ try:
+ member = collection_tar.getmember(MANIFEST_FILENAME)
+ except KeyError:
+ raise AnsibleError(
+ "Collection at '{path!s}' does not contain the "
+ 'required file {manifest_file!s}.'.
+ format(
+ path=to_native(b_path),
+ manifest_file=MANIFEST_FILENAME,
+ ),
+ )
+
+ with _tarfile_extract(collection_tar, member) as (_member, member_obj):
+ if member_obj is None:
+ raise AnsibleError(
+ 'Collection tar file does not contain '
+ 'member {member!s}'.format(member=MANIFEST_FILENAME),
+ )
+
+ text_content = to_text(
+ member_obj.read(),
+ errors='surrogate_or_strict',
+ )
+
+ try:
+ manifest = json.loads(text_content)
+ except ValueError:
+ raise AnsibleError(
+ 'Collection tar file member {member!s} does not '
+ 'contain a valid json string.'.
+ format(member=MANIFEST_FILENAME),
+ )
+ return manifest['collection_info']
+
+
+@contextmanager
+def _tarfile_extract(
+ tar, # type: tarfile.TarFile
+ member, # type: tarfile.TarInfo
+):
+ # type: (...) -> t.Iterator[tuple[tarfile.TarInfo, t.Optional[t.IO[bytes]]]]
+ tar_obj = tar.extractfile(member)
+ try:
+ yield member, tar_obj
+ finally:
+ if tar_obj is not None:
+ tar_obj.close()
diff --git a/lib/ansible/galaxy/collection/galaxy_api_proxy.py b/lib/ansible/galaxy/collection/galaxy_api_proxy.py
new file mode 100644
index 0000000..51e0c9f
--- /dev/null
+++ b/lib/ansible/galaxy/collection/galaxy_api_proxy.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""A facade for interfacing with multiple Galaxy instances."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import typing as t
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.api import CollectionVersionMetadata
+ from ansible.galaxy.collection.concrete_artifact_manager import (
+ ConcreteArtifactsManager,
+ )
+ from ansible.galaxy.dependency_resolution.dataclasses import (
+ Candidate, Requirement,
+ )
+
+from ansible.galaxy.api import GalaxyAPI, GalaxyError
+from ansible.module_utils._text import to_text
+from ansible.utils.display import Display
+
+
+display = Display()
+
+
+class MultiGalaxyAPIProxy:
+ """A proxy that abstracts talking to multiple Galaxy instances."""
+
+ def __init__(self, apis, concrete_artifacts_manager, offline=False):
+ # type: (t.Iterable[GalaxyAPI], ConcreteArtifactsManager, bool) -> None
+ """Initialize the target APIs list."""
+ self._apis = apis
+ self._concrete_art_mgr = concrete_artifacts_manager
+ self._offline = offline # Prevent all GalaxyAPI calls
+
+ @property
+ def is_offline_mode_requested(self):
+ return self._offline
+
+ def _assert_that_offline_mode_is_not_requested(self): # type: () -> None
+ if self.is_offline_mode_requested:
+ raise NotImplementedError("The calling code is not supposed to be invoked in 'offline' mode.")
+
+ def _get_collection_versions(self, requirement):
+ # type: (Requirement) -> t.Iterator[tuple[GalaxyAPI, str]]
+ """Helper for get_collection_versions.
+
+ Yield api, version pairs for all APIs,
+ and reraise the last error if no valid API was found.
+ """
+ if self._offline:
+ return []
+
+ found_api = False
+ last_error = None # type: Exception | None
+
+ api_lookup_order = (
+ (requirement.src, )
+ if isinstance(requirement.src, GalaxyAPI)
+ else self._apis
+ )
+
+ for api in api_lookup_order:
+ try:
+ versions = api.get_collection_versions(requirement.namespace, requirement.name)
+ except GalaxyError as api_err:
+ last_error = api_err
+ except Exception as unknown_err:
+ display.warning(
+ "Skipping Galaxy server {server!s}. "
+ "Got an unexpected error when getting "
+ "available versions of collection {fqcn!s}: {err!s}".
+ format(
+ server=api.api_server,
+ fqcn=requirement.fqcn,
+ err=to_text(unknown_err),
+ )
+ )
+ last_error = unknown_err
+ else:
+ found_api = True
+ for version in versions:
+ yield api, version
+
+ if not found_api and last_error is not None:
+ raise last_error
+
+ def get_collection_versions(self, requirement):
+ # type: (Requirement) -> t.Iterable[tuple[str, GalaxyAPI]]
+ """Get a set of unique versions for FQCN on Galaxy servers."""
+ if requirement.is_concrete_artifact:
+ return {
+ (
+ self._concrete_art_mgr.
+ get_direct_collection_version(requirement),
+ requirement.src,
+ ),
+ }
+
+ api_lookup_order = (
+ (requirement.src, )
+ if isinstance(requirement.src, GalaxyAPI)
+ else self._apis
+ )
+ return set(
+ (version, api)
+ for api, version in self._get_collection_versions(
+ requirement,
+ )
+ )
+
+ def get_collection_version_metadata(self, collection_candidate):
+ # type: (Candidate) -> CollectionVersionMetadata
+ """Retrieve collection metadata of a given candidate."""
+ self._assert_that_offline_mode_is_not_requested()
+
+ api_lookup_order = (
+ (collection_candidate.src, )
+ if isinstance(collection_candidate.src, GalaxyAPI)
+ else self._apis
+ )
+
+ last_err: t.Optional[Exception]
+
+ for api in api_lookup_order:
+ try:
+ version_metadata = api.get_collection_version_metadata(
+ collection_candidate.namespace,
+ collection_candidate.name,
+ collection_candidate.ver,
+ )
+ except GalaxyError as api_err:
+ last_err = api_err
+ except Exception as unknown_err:
+ # `verify` doesn't use `get_collection_versions` since the version is already known.
+ # Do the same as `install` and `download` by trying all APIs before failing.
+ # Warn for debugging purposes, since the Galaxy server may be unexpectedly down.
+ last_err = unknown_err
+ display.warning(
+ "Skipping Galaxy server {server!s}. "
+ "Got an unexpected error when getting "
+ "available versions of collection {fqcn!s}: {err!s}".
+ format(
+ server=api.api_server,
+ fqcn=collection_candidate.fqcn,
+ err=to_text(unknown_err),
+ )
+ )
+ else:
+ self._concrete_art_mgr.save_collection_source(
+ collection_candidate,
+ version_metadata.download_url,
+ version_metadata.artifact_sha256,
+ api.token,
+ version_metadata.signatures_url,
+ version_metadata.signatures,
+ )
+ return version_metadata
+
+ raise last_err
+
+ def get_collection_dependencies(self, collection_candidate):
+ # type: (Candidate) -> dict[str, str]
+ # FIXME: return Requirement instances instead?
+ """Retrieve collection dependencies of a given candidate."""
+ if collection_candidate.is_concrete_artifact:
+ return (
+ self.
+ _concrete_art_mgr.
+ get_direct_collection_dependencies
+ )(collection_candidate)
+
+ return (
+ self.
+ get_collection_version_metadata(collection_candidate).
+ dependencies
+ )
+
+ def get_signatures(self, collection_candidate):
+ # type: (Candidate) -> list[str]
+ self._assert_that_offline_mode_is_not_requested()
+ namespace = collection_candidate.namespace
+ name = collection_candidate.name
+ version = collection_candidate.ver
+ last_err = None # type: Exception | None
+
+ api_lookup_order = (
+ (collection_candidate.src, )
+ if isinstance(collection_candidate.src, GalaxyAPI)
+ else self._apis
+ )
+
+ for api in api_lookup_order:
+ try:
+ return api.get_collection_signatures(namespace, name, version)
+ except GalaxyError as api_err:
+ last_err = api_err
+ except Exception as unknown_err:
+ # Warn for debugging purposes, since the Galaxy server may be unexpectedly down.
+ last_err = unknown_err
+ display.warning(
+ "Skipping Galaxy server {server!s}. "
+ "Got an unexpected error when getting "
+ "available versions of collection {fqcn!s}: {err!s}".
+ format(
+ server=api.api_server,
+ fqcn=collection_candidate.fqcn,
+ err=to_text(unknown_err),
+ )
+ )
+ if last_err:
+ raise last_err
+
+ return []
diff --git a/lib/ansible/galaxy/collection/gpg.py b/lib/ansible/galaxy/collection/gpg.py
new file mode 100644
index 0000000..8641f0d
--- /dev/null
+++ b/lib/ansible/galaxy/collection/gpg.py
@@ -0,0 +1,282 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2022, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Signature verification helpers."""
+
+from ansible.errors import AnsibleError
+from ansible.galaxy.user_agent import user_agent
+from ansible.module_utils.urls import open_url
+
+import contextlib
+import os
+import subprocess
+import sys
+import typing as t
+
+from dataclasses import dataclass, fields as dc_fields
+from functools import partial
+from urllib.error import HTTPError, URLError
+
+if t.TYPE_CHECKING:
+ from ansible.utils.display import Display
+
+IS_PY310_PLUS = sys.version_info[:2] >= (3, 10)
+
+frozen_dataclass = partial(dataclass, frozen=True, **({'slots': True} if IS_PY310_PLUS else {}))
+
+
+def get_signature_from_source(source, display=None): # type: (str, t.Optional[Display]) -> str
+ if display is not None:
+ display.vvvv(f"Using signature at {source}")
+ try:
+ with open_url(
+ source,
+ http_agent=user_agent(),
+ validate_certs=True,
+ follow_redirects='safe'
+ ) as resp:
+ signature = resp.read()
+ except (HTTPError, URLError) as e:
+ raise AnsibleError(
+ f"Failed to get signature for collection verification from '{source}': {e}"
+ ) from e
+
+ return signature
+
+
+def run_gpg_verify(
+ manifest_file, # type: str
+ signature, # type: str
+ keyring, # type: str
+ display, # type: Display
+): # type: (...) -> tuple[str, int]
+ status_fd_read, status_fd_write = os.pipe()
+
+ # running the gpg command will create the keyring if it does not exist
+ remove_keybox = not os.path.exists(keyring)
+
+ cmd = [
+ 'gpg',
+ f'--status-fd={status_fd_write}',
+ '--verify',
+ '--batch',
+ '--no-tty',
+ '--no-default-keyring',
+ f'--keyring={keyring}',
+ '-',
+ manifest_file,
+ ]
+ cmd_str = ' '.join(cmd)
+ display.vvvv(f"Running command '{cmd}'")
+
+ try:
+ p = subprocess.Popen(
+ cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ pass_fds=(status_fd_write,),
+ encoding='utf8',
+ )
+ except (FileNotFoundError, subprocess.SubprocessError) as err:
+ raise AnsibleError(
+ f"Failed during GnuPG verification with command '{cmd_str}': {err}"
+ ) from err
+ else:
+ stdout, stderr = p.communicate(input=signature)
+ finally:
+ os.close(status_fd_write)
+
+ if remove_keybox:
+ with contextlib.suppress(OSError):
+ os.remove(keyring)
+
+ with os.fdopen(status_fd_read) as f:
+ stdout = f.read()
+ display.vvvv(
+ f"stdout: \n{stdout}\nstderr: \n{stderr}\n(exit code {p.returncode})"
+ )
+ return stdout, p.returncode
+
+
+def parse_gpg_errors(status_out): # type: (str) -> t.Iterator[GpgBaseError]
+ for line in status_out.splitlines():
+ if not line:
+ continue
+ try:
+ _dummy, status, remainder = line.split(maxsplit=2)
+ except ValueError:
+ _dummy, status = line.split(maxsplit=1)
+ remainder = None
+
+ try:
+ cls = GPG_ERROR_MAP[status]
+ except KeyError:
+ continue
+
+ fields = [status]
+ if remainder:
+ fields.extend(
+ remainder.split(
+ None,
+ len(dc_fields(cls)) - 2
+ )
+ )
+
+ yield cls(*fields)
+
+
+@frozen_dataclass
+class GpgBaseError(Exception):
+ status: str
+
+ @classmethod
+ def get_gpg_error_description(cls) -> str:
+ """Return the current class description."""
+ return ' '.join(cls.__doc__.split())
+
+ def __post_init__(self):
+ for field in dc_fields(self):
+ super(GpgBaseError, self).__setattr__(field.name, field.type(getattr(self, field.name)))
+
+
+@frozen_dataclass
+class GpgExpSig(GpgBaseError):
+ """The signature with the keyid is good, but the signature is expired."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgExpKeySig(GpgBaseError):
+ """The signature with the keyid is good, but the signature was made by an expired key."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgRevKeySig(GpgBaseError):
+ """The signature with the keyid is good, but the signature was made by a revoked key."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgBadSig(GpgBaseError):
+ """The signature with the keyid has not been verified okay."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgErrSig(GpgBaseError):
+ """"It was not possible to check the signature. This may be caused by
+ a missing public key or an unsupported algorithm. A RC of 4
+ indicates unknown algorithm, a 9 indicates a missing public
+ key.
+ """
+ keyid: str
+ pkalgo: int
+ hashalgo: int
+ sig_class: str
+ time: int
+ rc: int
+ fpr: str
+
+
+@frozen_dataclass
+class GpgNoPubkey(GpgBaseError):
+ """The public key is not available."""
+ keyid: str
+
+
+@frozen_dataclass
+class GpgMissingPassPhrase(GpgBaseError):
+ """No passphrase was supplied."""
+
+
+@frozen_dataclass
+class GpgBadPassphrase(GpgBaseError):
+ """The supplied passphrase was wrong or not given."""
+ keyid: str
+
+
+@frozen_dataclass
+class GpgNoData(GpgBaseError):
+ """No data has been found. Codes for WHAT are:
+ - 1 :: No armored data.
+ - 2 :: Expected a packet but did not find one.
+ - 3 :: Invalid packet found, this may indicate a non OpenPGP
+ message.
+ - 4 :: Signature expected but not found.
+ """
+ what: str
+
+
+@frozen_dataclass
+class GpgUnexpected(GpgBaseError):
+ """No data has been found. Codes for WHAT are:
+ - 1 :: No armored data.
+ - 2 :: Expected a packet but did not find one.
+ - 3 :: Invalid packet found, this may indicate a non OpenPGP
+ message.
+ - 4 :: Signature expected but not found.
+ """
+ what: str
+
+
+@frozen_dataclass
+class GpgError(GpgBaseError):
+ """This is a generic error status message, it might be followed by error location specific data."""
+ location: str
+ code: int
+ more: str = ""
+
+
+@frozen_dataclass
+class GpgFailure(GpgBaseError):
+ """This is the counterpart to SUCCESS and used to indicate a program failure."""
+ location: str
+ code: int
+
+
+@frozen_dataclass
+class GpgBadArmor(GpgBaseError):
+ """The ASCII armor is corrupted."""
+
+
+@frozen_dataclass
+class GpgKeyExpired(GpgBaseError):
+ """The key has expired."""
+ timestamp: int
+
+
+@frozen_dataclass
+class GpgKeyRevoked(GpgBaseError):
+ """The used key has been revoked by its owner."""
+
+
+@frozen_dataclass
+class GpgNoSecKey(GpgBaseError):
+ """The secret key is not available."""
+ keyid: str
+
+
+GPG_ERROR_MAP = {
+ 'EXPSIG': GpgExpSig,
+ 'EXPKEYSIG': GpgExpKeySig,
+ 'REVKEYSIG': GpgRevKeySig,
+ 'BADSIG': GpgBadSig,
+ 'ERRSIG': GpgErrSig,
+ 'NO_PUBKEY': GpgNoPubkey,
+ 'MISSING_PASSPHRASE': GpgMissingPassPhrase,
+ 'BAD_PASSPHRASE': GpgBadPassphrase,
+ 'NODATA': GpgNoData,
+ 'UNEXPECTED': GpgUnexpected,
+ 'ERROR': GpgError,
+ 'FAILURE': GpgFailure,
+ 'BADARMOR': GpgBadArmor,
+ 'KEYEXPIRED': GpgKeyExpired,
+ 'KEYREVOKED': GpgKeyRevoked,
+ 'NO_SECKEY': GpgNoSecKey,
+}