diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/galaxy/collection/galaxy_api_proxy.py | |
parent | Initial commit. (diff) | |
download | ansible-core-8a754e0858d922e955e71b253c139e071ecec432.tar.xz ansible-core-8a754e0858d922e955e71b253c139e071ecec432.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | lib/ansible/galaxy/collection/galaxy_api_proxy.py | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/collection/galaxy_api_proxy.py b/lib/ansible/galaxy/collection/galaxy_api_proxy.py new file mode 100644 index 0000000..51e0c9f --- /dev/null +++ b/lib/ansible/galaxy/collection/galaxy_api_proxy.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2020-2021, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +"""A facade for interfacing with multiple Galaxy instances.""" + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import typing as t + +if t.TYPE_CHECKING: + from ansible.galaxy.api import CollectionVersionMetadata + from ansible.galaxy.collection.concrete_artifact_manager import ( + ConcreteArtifactsManager, + ) + from ansible.galaxy.dependency_resolution.dataclasses import ( + Candidate, Requirement, + ) + +from ansible.galaxy.api import GalaxyAPI, GalaxyError +from ansible.module_utils._text import to_text +from ansible.utils.display import Display + + +display = Display() + + +class MultiGalaxyAPIProxy: + """A proxy that abstracts talking to multiple Galaxy instances.""" + + def __init__(self, apis, concrete_artifacts_manager, offline=False): + # type: (t.Iterable[GalaxyAPI], ConcreteArtifactsManager, bool) -> None + """Initialize the target APIs list.""" + self._apis = apis + self._concrete_art_mgr = concrete_artifacts_manager + self._offline = offline # Prevent all GalaxyAPI calls + + @property + def is_offline_mode_requested(self): + return self._offline + + def _assert_that_offline_mode_is_not_requested(self): # type: () -> None + if self.is_offline_mode_requested: + raise NotImplementedError("The calling code is not supposed to be invoked in 'offline' mode.") + + def _get_collection_versions(self, requirement): + # type: (Requirement) -> t.Iterator[tuple[GalaxyAPI, str]] + """Helper for get_collection_versions. + + Yield api, version pairs for all APIs, + and reraise the last error if no valid API was found. + """ + if self._offline: + return [] + + found_api = False + last_error = None # type: Exception | None + + api_lookup_order = ( + (requirement.src, ) + if isinstance(requirement.src, GalaxyAPI) + else self._apis + ) + + for api in api_lookup_order: + try: + versions = api.get_collection_versions(requirement.namespace, requirement.name) + except GalaxyError as api_err: + last_error = api_err + except Exception as unknown_err: + display.warning( + "Skipping Galaxy server {server!s}. " + "Got an unexpected error when getting " + "available versions of collection {fqcn!s}: {err!s}". + format( + server=api.api_server, + fqcn=requirement.fqcn, + err=to_text(unknown_err), + ) + ) + last_error = unknown_err + else: + found_api = True + for version in versions: + yield api, version + + if not found_api and last_error is not None: + raise last_error + + def get_collection_versions(self, requirement): + # type: (Requirement) -> t.Iterable[tuple[str, GalaxyAPI]] + """Get a set of unique versions for FQCN on Galaxy servers.""" + if requirement.is_concrete_artifact: + return { + ( + self._concrete_art_mgr. + get_direct_collection_version(requirement), + requirement.src, + ), + } + + api_lookup_order = ( + (requirement.src, ) + if isinstance(requirement.src, GalaxyAPI) + else self._apis + ) + return set( + (version, api) + for api, version in self._get_collection_versions( + requirement, + ) + ) + + def get_collection_version_metadata(self, collection_candidate): + # type: (Candidate) -> CollectionVersionMetadata + """Retrieve collection metadata of a given candidate.""" + self._assert_that_offline_mode_is_not_requested() + + api_lookup_order = ( + (collection_candidate.src, ) + if isinstance(collection_candidate.src, GalaxyAPI) + else self._apis + ) + + last_err: t.Optional[Exception] + + for api in api_lookup_order: + try: + version_metadata = api.get_collection_version_metadata( + collection_candidate.namespace, + collection_candidate.name, + collection_candidate.ver, + ) + except GalaxyError as api_err: + last_err = api_err + except Exception as unknown_err: + # `verify` doesn't use `get_collection_versions` since the version is already known. + # Do the same as `install` and `download` by trying all APIs before failing. + # Warn for debugging purposes, since the Galaxy server may be unexpectedly down. + last_err = unknown_err + display.warning( + "Skipping Galaxy server {server!s}. " + "Got an unexpected error when getting " + "available versions of collection {fqcn!s}: {err!s}". + format( + server=api.api_server, + fqcn=collection_candidate.fqcn, + err=to_text(unknown_err), + ) + ) + else: + self._concrete_art_mgr.save_collection_source( + collection_candidate, + version_metadata.download_url, + version_metadata.artifact_sha256, + api.token, + version_metadata.signatures_url, + version_metadata.signatures, + ) + return version_metadata + + raise last_err + + def get_collection_dependencies(self, collection_candidate): + # type: (Candidate) -> dict[str, str] + # FIXME: return Requirement instances instead? + """Retrieve collection dependencies of a given candidate.""" + if collection_candidate.is_concrete_artifact: + return ( + self. + _concrete_art_mgr. + get_direct_collection_dependencies + )(collection_candidate) + + return ( + self. + get_collection_version_metadata(collection_candidate). + dependencies + ) + + def get_signatures(self, collection_candidate): + # type: (Candidate) -> list[str] + self._assert_that_offline_mode_is_not_requested() + namespace = collection_candidate.namespace + name = collection_candidate.name + version = collection_candidate.ver + last_err = None # type: Exception | None + + api_lookup_order = ( + (collection_candidate.src, ) + if isinstance(collection_candidate.src, GalaxyAPI) + else self._apis + ) + + for api in api_lookup_order: + try: + return api.get_collection_signatures(namespace, name, version) + except GalaxyError as api_err: + last_err = api_err + except Exception as unknown_err: + # Warn for debugging purposes, since the Galaxy server may be unexpectedly down. + last_err = unknown_err + display.warning( + "Skipping Galaxy server {server!s}. " + "Got an unexpected error when getting " + "available versions of collection {fqcn!s}: {err!s}". + format( + server=api.api_server, + fqcn=collection_candidate.fqcn, + err=to_text(unknown_err), + ) + ) + if last_err: + raise last_err + + return [] |