summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/collection/galaxy_api_proxy.py
blob: 51e0c9f5510422539df9741f1ac41f091092040d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# -*- coding: utf-8 -*-
# Copyright: (c) 2020-2021, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""A facade for interfacing with multiple Galaxy instances."""

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import typing as t

if t.TYPE_CHECKING:
    from ansible.galaxy.api import CollectionVersionMetadata
    from ansible.galaxy.collection.concrete_artifact_manager import (
        ConcreteArtifactsManager,
    )
    from ansible.galaxy.dependency_resolution.dataclasses import (
        Candidate, Requirement,
    )

from ansible.galaxy.api import GalaxyAPI, GalaxyError
from ansible.module_utils._text import to_text
from ansible.utils.display import Display


display = Display()


class MultiGalaxyAPIProxy:
    """A proxy that abstracts talking to multiple Galaxy instances."""

    def __init__(self, apis, concrete_artifacts_manager, offline=False):
        # type: (t.Iterable[GalaxyAPI], ConcreteArtifactsManager, bool) -> None
        """Initialize the target APIs list."""
        self._apis = apis
        self._concrete_art_mgr = concrete_artifacts_manager
        self._offline = offline  # Prevent all GalaxyAPI calls

    @property
    def is_offline_mode_requested(self):
        return self._offline

    def _assert_that_offline_mode_is_not_requested(self):  # type: () -> None
        if self.is_offline_mode_requested:
            raise NotImplementedError("The calling code is not supposed to be invoked in 'offline' mode.")

    def _get_collection_versions(self, requirement):
        # type: (Requirement) -> t.Iterator[tuple[GalaxyAPI, str]]
        """Helper for get_collection_versions.

        Yield api, version pairs for all APIs,
        and reraise the last error if no valid API was found.
        """
        if self._offline:
            return []

        found_api = False
        last_error = None  # type: Exception | None

        api_lookup_order = (
            (requirement.src, )
            if isinstance(requirement.src, GalaxyAPI)
            else self._apis
        )

        for api in api_lookup_order:
            try:
                versions = api.get_collection_versions(requirement.namespace, requirement.name)
            except GalaxyError as api_err:
                last_error = api_err
            except Exception as unknown_err:
                display.warning(
                    "Skipping Galaxy server {server!s}. "
                    "Got an unexpected error when getting "
                    "available versions of collection {fqcn!s}: {err!s}".
                    format(
                        server=api.api_server,
                        fqcn=requirement.fqcn,
                        err=to_text(unknown_err),
                    )
                )
                last_error = unknown_err
            else:
                found_api = True
                for version in versions:
                    yield api, version

        if not found_api and last_error is not None:
            raise last_error

    def get_collection_versions(self, requirement):
        # type: (Requirement) -> t.Iterable[tuple[str, GalaxyAPI]]
        """Get a set of unique versions for FQCN on Galaxy servers."""
        if requirement.is_concrete_artifact:
            return {
                (
                    self._concrete_art_mgr.
                    get_direct_collection_version(requirement),
                    requirement.src,
                ),
            }

        api_lookup_order = (
            (requirement.src, )
            if isinstance(requirement.src, GalaxyAPI)
            else self._apis
        )
        return set(
            (version, api)
            for api, version in self._get_collection_versions(
                requirement,
            )
        )

    def get_collection_version_metadata(self, collection_candidate):
        # type: (Candidate) -> CollectionVersionMetadata
        """Retrieve collection metadata of a given candidate."""
        self._assert_that_offline_mode_is_not_requested()

        api_lookup_order = (
            (collection_candidate.src, )
            if isinstance(collection_candidate.src, GalaxyAPI)
            else self._apis
        )

        last_err: t.Optional[Exception]

        for api in api_lookup_order:
            try:
                version_metadata = api.get_collection_version_metadata(
                    collection_candidate.namespace,
                    collection_candidate.name,
                    collection_candidate.ver,
                )
            except GalaxyError as api_err:
                last_err = api_err
            except Exception as unknown_err:
                # `verify` doesn't use `get_collection_versions` since the version is already known.
                # Do the same as `install` and `download` by trying all APIs before failing.
                # Warn for debugging purposes, since the Galaxy server may be unexpectedly down.
                last_err = unknown_err
                display.warning(
                    "Skipping Galaxy server {server!s}. "
                    "Got an unexpected error when getting "
                    "available versions of collection {fqcn!s}: {err!s}".
                    format(
                        server=api.api_server,
                        fqcn=collection_candidate.fqcn,
                        err=to_text(unknown_err),
                    )
                )
            else:
                self._concrete_art_mgr.save_collection_source(
                    collection_candidate,
                    version_metadata.download_url,
                    version_metadata.artifact_sha256,
                    api.token,
                    version_metadata.signatures_url,
                    version_metadata.signatures,
                )
                return version_metadata

        raise last_err

    def get_collection_dependencies(self, collection_candidate):
        # type: (Candidate) -> dict[str, str]
        # FIXME: return Requirement instances instead?
        """Retrieve collection dependencies of a given candidate."""
        if collection_candidate.is_concrete_artifact:
            return (
                self.
                _concrete_art_mgr.
                get_direct_collection_dependencies
            )(collection_candidate)

        return (
            self.
            get_collection_version_metadata(collection_candidate).
            dependencies
        )

    def get_signatures(self, collection_candidate):
        # type: (Candidate) -> list[str]
        self._assert_that_offline_mode_is_not_requested()
        namespace = collection_candidate.namespace
        name = collection_candidate.name
        version = collection_candidate.ver
        last_err = None  # type: Exception | None

        api_lookup_order = (
            (collection_candidate.src, )
            if isinstance(collection_candidate.src, GalaxyAPI)
            else self._apis
        )

        for api in api_lookup_order:
            try:
                return api.get_collection_signatures(namespace, name, version)
            except GalaxyError as api_err:
                last_err = api_err
            except Exception as unknown_err:
                # Warn for debugging purposes, since the Galaxy server may be unexpectedly down.
                last_err = unknown_err
                display.warning(
                    "Skipping Galaxy server {server!s}. "
                    "Got an unexpected error when getting "
                    "available versions of collection {fqcn!s}: {err!s}".
                    format(
                        server=api.api_server,
                        fqcn=collection_candidate.fqcn,
                        err=to_text(unknown_err),
                    )
                )
        if last_err:
            raise last_err

        return []