1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
# -*- coding: utf-8 -*-
# Copyright: (c) 2020-2021, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""A facade for interfacing with multiple Galaxy instances."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import typing as t
if t.TYPE_CHECKING:
from ansible.galaxy.api import CollectionVersionMetadata
from ansible.galaxy.collection.concrete_artifact_manager import (
ConcreteArtifactsManager,
)
from ansible.galaxy.dependency_resolution.dataclasses import (
Candidate, Requirement,
)
from ansible.galaxy.api import GalaxyAPI, GalaxyError
from ansible.module_utils._text import to_text
from ansible.utils.display import Display
display = Display()
class MultiGalaxyAPIProxy:
"""A proxy that abstracts talking to multiple Galaxy instances."""
def __init__(self, apis, concrete_artifacts_manager, offline=False):
# type: (t.Iterable[GalaxyAPI], ConcreteArtifactsManager, bool) -> None
"""Initialize the target APIs list."""
self._apis = apis
self._concrete_art_mgr = concrete_artifacts_manager
self._offline = offline # Prevent all GalaxyAPI calls
@property
def is_offline_mode_requested(self):
return self._offline
def _assert_that_offline_mode_is_not_requested(self): # type: () -> None
if self.is_offline_mode_requested:
raise NotImplementedError("The calling code is not supposed to be invoked in 'offline' mode.")
def _get_collection_versions(self, requirement):
# type: (Requirement) -> t.Iterator[tuple[GalaxyAPI, str]]
"""Helper for get_collection_versions.
Yield api, version pairs for all APIs,
and reraise the last error if no valid API was found.
"""
if self._offline:
return []
found_api = False
last_error = None # type: Exception | None
api_lookup_order = (
(requirement.src, )
if isinstance(requirement.src, GalaxyAPI)
else self._apis
)
for api in api_lookup_order:
try:
versions = api.get_collection_versions(requirement.namespace, requirement.name)
except GalaxyError as api_err:
last_error = api_err
except Exception as unknown_err:
display.warning(
"Skipping Galaxy server {server!s}. "
"Got an unexpected error when getting "
"available versions of collection {fqcn!s}: {err!s}".
format(
server=api.api_server,
fqcn=requirement.fqcn,
err=to_text(unknown_err),
)
)
last_error = unknown_err
else:
found_api = True
for version in versions:
yield api, version
if not found_api and last_error is not None:
raise last_error
def get_collection_versions(self, requirement):
# type: (Requirement) -> t.Iterable[tuple[str, GalaxyAPI]]
"""Get a set of unique versions for FQCN on Galaxy servers."""
if requirement.is_concrete_artifact:
return {
(
self._concrete_art_mgr.
get_direct_collection_version(requirement),
requirement.src,
),
}
api_lookup_order = (
(requirement.src, )
if isinstance(requirement.src, GalaxyAPI)
else self._apis
)
return set(
(version, api)
for api, version in self._get_collection_versions(
requirement,
)
)
def get_collection_version_metadata(self, collection_candidate):
# type: (Candidate) -> CollectionVersionMetadata
"""Retrieve collection metadata of a given candidate."""
self._assert_that_offline_mode_is_not_requested()
api_lookup_order = (
(collection_candidate.src, )
if isinstance(collection_candidate.src, GalaxyAPI)
else self._apis
)
last_err: t.Optional[Exception]
for api in api_lookup_order:
try:
version_metadata = api.get_collection_version_metadata(
collection_candidate.namespace,
collection_candidate.name,
collection_candidate.ver,
)
except GalaxyError as api_err:
last_err = api_err
except Exception as unknown_err:
# `verify` doesn't use `get_collection_versions` since the version is already known.
# Do the same as `install` and `download` by trying all APIs before failing.
# Warn for debugging purposes, since the Galaxy server may be unexpectedly down.
last_err = unknown_err
display.warning(
"Skipping Galaxy server {server!s}. "
"Got an unexpected error when getting "
"available versions of collection {fqcn!s}: {err!s}".
format(
server=api.api_server,
fqcn=collection_candidate.fqcn,
err=to_text(unknown_err),
)
)
else:
self._concrete_art_mgr.save_collection_source(
collection_candidate,
version_metadata.download_url,
version_metadata.artifact_sha256,
api.token,
version_metadata.signatures_url,
version_metadata.signatures,
)
return version_metadata
raise last_err
def get_collection_dependencies(self, collection_candidate):
# type: (Candidate) -> dict[str, str]
# FIXME: return Requirement instances instead?
"""Retrieve collection dependencies of a given candidate."""
if collection_candidate.is_concrete_artifact:
return (
self.
_concrete_art_mgr.
get_direct_collection_dependencies
)(collection_candidate)
return (
self.
get_collection_version_metadata(collection_candidate).
dependencies
)
def get_signatures(self, collection_candidate):
# type: (Candidate) -> list[str]
self._assert_that_offline_mode_is_not_requested()
namespace = collection_candidate.namespace
name = collection_candidate.name
version = collection_candidate.ver
last_err = None # type: Exception | None
api_lookup_order = (
(collection_candidate.src, )
if isinstance(collection_candidate.src, GalaxyAPI)
else self._apis
)
for api in api_lookup_order:
try:
return api.get_collection_signatures(namespace, name, version)
except GalaxyError as api_err:
last_err = api_err
except Exception as unknown_err:
# Warn for debugging purposes, since the Galaxy server may be unexpectedly down.
last_err = unknown_err
display.warning(
"Skipping Galaxy server {server!s}. "
"Got an unexpected error when getting "
"available versions of collection {fqcn!s}: {err!s}".
format(
server=api.api_server,
fqcn=collection_candidate.fqcn,
err=to_text(unknown_err),
)
)
if last_err:
raise last_err
return []
|