diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/galaxy/api.py | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream/2.14.3.tar.xz ansible-core-upstream/2.14.3.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/galaxy/api.py')
-rw-r--r-- | lib/ansible/galaxy/api.py | 913 |
1 files changed, 913 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/api.py b/lib/ansible/galaxy/api.py new file mode 100644 index 0000000..8dea804 --- /dev/null +++ b/lib/ansible/galaxy/api.py @@ -0,0 +1,913 @@ +# (C) 2013, James Cammarata <jcammarata@ansible.com> +# Copyright: (c) 2019, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import collections +import datetime +import functools +import hashlib +import json +import os +import stat +import tarfile +import time +import threading + +from urllib.error import HTTPError +from urllib.parse import quote as urlquote, urlencode, urlparse, parse_qs, urljoin + +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.galaxy.user_agent import user_agent +from ansible.module_utils.api import retry_with_delays_and_condition +from ansible.module_utils.api import generate_jittered_backoff +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_bytes, to_native, to_text +from ansible.module_utils.urls import open_url, prepare_multipart +from ansible.utils.display import Display +from ansible.utils.hashing import secure_hash_s +from ansible.utils.path import makedirs_safe + +display = Display() +_CACHE_LOCK = threading.Lock() +COLLECTION_PAGE_SIZE = 100 +RETRY_HTTP_ERROR_CODES = [ # TODO: Allow user-configuration + 429, # Too Many Requests + 520, # Galaxy rate limit error code (Cloudflare unknown error) +] + + +def cache_lock(func): + def wrapped(*args, **kwargs): + with _CACHE_LOCK: + return func(*args, **kwargs) + + return wrapped + + +def is_rate_limit_exception(exception): + # Note: cloud.redhat.com masks rate limit errors with 403 (Forbidden) error codes. + # Since 403 could reflect the actual problem (such as an expired token), we should + # not retry by default. + return isinstance(exception, GalaxyError) and exception.http_code in RETRY_HTTP_ERROR_CODES + + +def g_connect(versions): + """ + Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the + endpoint. + + :param versions: A list of API versions that the function supports. + """ + def decorator(method): + def wrapped(self, *args, **kwargs): + if not self._available_api_versions: + display.vvvv("Initial connection to galaxy_server: %s" % self.api_server) + + # Determine the type of Galaxy server we are talking to. First try it unauthenticated then with Bearer + # auth for Automation Hub. + n_url = self.api_server + error_context_msg = 'Error when finding available api versions from %s (%s)' % (self.name, n_url) + + if self.api_server == 'https://galaxy.ansible.com' or self.api_server == 'https://galaxy.ansible.com/': + n_url = 'https://galaxy.ansible.com/api/' + + try: + data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg, cache=True) + except (AnsibleError, GalaxyError, ValueError, KeyError) as err: + # Either the URL doesnt exist, or other error. Or the URL exists, but isn't a galaxy API + # root (not JSON, no 'available_versions') so try appending '/api/' + if n_url.endswith('/api') or n_url.endswith('/api/'): + raise + + # Let exceptions here bubble up but raise the original if this returns a 404 (/api/ wasn't found). + n_url = _urljoin(n_url, '/api/') + try: + data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg, cache=True) + except GalaxyError as new_err: + if new_err.http_code == 404: + raise err + raise + + if 'available_versions' not in data: + raise AnsibleError("Tried to find galaxy API root at %s but no 'available_versions' are available " + "on %s" % (n_url, self.api_server)) + + # Update api_server to point to the "real" API root, which in this case could have been the configured + # url + '/api/' appended. + self.api_server = n_url + + # Default to only supporting v1, if only v1 is returned we also assume that v2 is available even though + # it isn't returned in the available_versions dict. + available_versions = data.get('available_versions', {u'v1': u'v1/'}) + if list(available_versions.keys()) == [u'v1']: + available_versions[u'v2'] = u'v2/' + + self._available_api_versions = available_versions + display.vvvv("Found API version '%s' with Galaxy server %s (%s)" + % (', '.join(available_versions.keys()), self.name, self.api_server)) + + # Verify that the API versions the function works with are available on the server specified. + available_versions = set(self._available_api_versions.keys()) + common_versions = set(versions).intersection(available_versions) + if not common_versions: + raise AnsibleError("Galaxy action %s requires API versions '%s' but only '%s' are available on %s %s" + % (method.__name__, ", ".join(versions), ", ".join(available_versions), + self.name, self.api_server)) + + return method(self, *args, **kwargs) + return wrapped + return decorator + + +def get_cache_id(url): + """ Gets the cache ID for the URL specified. """ + url_info = urlparse(url) + + port = None + try: + port = url_info.port + except ValueError: + pass # While the URL is probably invalid, let the caller figure that out when using it + + # Cannot use netloc because it could contain credentials if the server specified had them in there. + return '%s:%s' % (url_info.hostname, port or '') + + +@cache_lock +def _load_cache(b_cache_path): + """ Loads the cache file requested if possible. The file must not be world writable. """ + cache_version = 1 + + if not os.path.isfile(b_cache_path): + display.vvvv("Creating Galaxy API response cache file at '%s'" % to_text(b_cache_path)) + with open(b_cache_path, 'w'): + os.chmod(b_cache_path, 0o600) + + cache_mode = os.stat(b_cache_path).st_mode + if cache_mode & stat.S_IWOTH: + display.warning("Galaxy cache has world writable access (%s), ignoring it as a cache source." + % to_text(b_cache_path)) + return + + with open(b_cache_path, mode='rb') as fd: + json_val = to_text(fd.read(), errors='surrogate_or_strict') + + try: + cache = json.loads(json_val) + except ValueError: + cache = None + + if not isinstance(cache, dict) or cache.get('version', None) != cache_version: + display.vvvv("Galaxy cache file at '%s' has an invalid version, clearing" % to_text(b_cache_path)) + cache = {'version': cache_version} + + # Set the cache after we've cleared the existing entries + with open(b_cache_path, mode='wb') as fd: + fd.write(to_bytes(json.dumps(cache), errors='surrogate_or_strict')) + + return cache + + +def _urljoin(*args): + return '/'.join(to_native(a, errors='surrogate_or_strict').strip('/') for a in args + ('',) if a) + + +class GalaxyError(AnsibleError): + """ Error for bad Galaxy server responses. """ + + def __init__(self, http_error, message): + super(GalaxyError, self).__init__(message) + self.http_code = http_error.code + self.url = http_error.geturl() + + try: + http_msg = to_text(http_error.read()) + err_info = json.loads(http_msg) + except (AttributeError, ValueError): + err_info = {} + + url_split = self.url.split('/') + if 'v2' in url_split: + galaxy_msg = err_info.get('message', http_error.reason) + code = err_info.get('code', 'Unknown') + full_error_msg = u"%s (HTTP Code: %d, Message: %s Code: %s)" % (message, self.http_code, galaxy_msg, code) + elif 'v3' in url_split: + errors = err_info.get('errors', []) + if not errors: + errors = [{}] # Defaults are set below, we just need to make sure 1 error is present. + + message_lines = [] + for error in errors: + error_msg = error.get('detail') or error.get('title') or http_error.reason + error_code = error.get('code') or 'Unknown' + message_line = u"(HTTP Code: %d, Message: %s Code: %s)" % (self.http_code, error_msg, error_code) + message_lines.append(message_line) + + full_error_msg = "%s %s" % (message, ', '.join(message_lines)) + else: + # v1 and unknown API endpoints + galaxy_msg = err_info.get('default', http_error.reason) + full_error_msg = u"%s (HTTP Code: %d, Message: %s)" % (message, self.http_code, galaxy_msg) + + self.message = to_native(full_error_msg) + + +# Keep the raw string results for the date. It's too complex to parse as a datetime object and the various APIs return +# them in different formats. +CollectionMetadata = collections.namedtuple('CollectionMetadata', ['namespace', 'name', 'created_str', 'modified_str']) + + +class CollectionVersionMetadata: + + def __init__(self, namespace, name, version, download_url, artifact_sha256, dependencies, signatures_url, signatures): + """ + Contains common information about a collection on a Galaxy server to smooth through API differences for + Collection and define a standard meta info for a collection. + + :param namespace: The namespace name. + :param name: The collection name. + :param version: The version that the metadata refers to. + :param download_url: The URL to download the collection. + :param artifact_sha256: The SHA256 of the collection artifact for later verification. + :param dependencies: A dict of dependencies of the collection. + :param signatures_url: The URL to the specific version of the collection. + :param signatures: The list of signatures found at the signatures_url. + """ + self.namespace = namespace + self.name = name + self.version = version + self.download_url = download_url + self.artifact_sha256 = artifact_sha256 + self.dependencies = dependencies + self.signatures_url = signatures_url + self.signatures = signatures + + +@functools.total_ordering +class GalaxyAPI: + """ This class is meant to be used as a API client for an Ansible Galaxy server """ + + def __init__( + self, galaxy, name, url, + username=None, password=None, token=None, validate_certs=True, + available_api_versions=None, + clear_response_cache=False, no_cache=True, + priority=float('inf'), + timeout=60, + ): + self.galaxy = galaxy + self.name = name + self.username = username + self.password = password + self.token = token + self.api_server = url + self.validate_certs = validate_certs + self.timeout = timeout + self._available_api_versions = available_api_versions or {} + self._priority = priority + self._server_timeout = timeout + + b_cache_dir = to_bytes(C.GALAXY_CACHE_DIR, errors='surrogate_or_strict') + makedirs_safe(b_cache_dir, mode=0o700) + self._b_cache_path = os.path.join(b_cache_dir, b'api.json') + + if clear_response_cache: + with _CACHE_LOCK: + if os.path.exists(self._b_cache_path): + display.vvvv("Clearing cache file (%s)" % to_text(self._b_cache_path)) + os.remove(self._b_cache_path) + + self._cache = None + if not no_cache: + self._cache = _load_cache(self._b_cache_path) + + display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs)) + + def __str__(self): + # type: (GalaxyAPI) -> str + """Render GalaxyAPI as a native string representation.""" + return to_native(self.name) + + def __unicode__(self): + # type: (GalaxyAPI) -> str + """Render GalaxyAPI as a unicode/text string representation.""" + return to_text(self.name) + + def __repr__(self): + # type: (GalaxyAPI) -> str + """Render GalaxyAPI as an inspectable string representation.""" + return ( + '<{instance!s} "{name!s}" @ {url!s} with priority {priority!s}>'. + format( + instance=self, name=self.name, + priority=self._priority, url=self.api_server, + ) + ) + + def __lt__(self, other_galaxy_api): + # type: (GalaxyAPI, GalaxyAPI) -> bool + """Return whether the instance priority is higher than other.""" + if not isinstance(other_galaxy_api, self.__class__): + return NotImplemented + + return ( + self._priority > other_galaxy_api._priority or + self.name < self.name + ) + + @property # type: ignore[misc] # https://github.com/python/mypy/issues/1362 + @g_connect(['v1', 'v2', 'v3']) + def available_api_versions(self): + # Calling g_connect will populate self._available_api_versions + return self._available_api_versions + + @retry_with_delays_and_condition( + backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40), + should_retry_error=is_rate_limit_exception + ) + def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None, + cache=False, cache_key=None): + url_info = urlparse(url) + cache_id = get_cache_id(url) + if not cache_key: + cache_key = url_info.path + query = parse_qs(url_info.query) + if cache and self._cache: + server_cache = self._cache.setdefault(cache_id, {}) + iso_datetime_format = '%Y-%m-%dT%H:%M:%SZ' + + valid = False + if cache_key in server_cache: + expires = datetime.datetime.strptime(server_cache[cache_key]['expires'], iso_datetime_format) + valid = datetime.datetime.utcnow() < expires + + is_paginated_url = 'page' in query or 'offset' in query + if valid and not is_paginated_url: + # Got a hit on the cache and we aren't getting a paginated response + path_cache = server_cache[cache_key] + if path_cache.get('paginated'): + if '/v3/' in cache_key: + res = {'links': {'next': None}} + else: + res = {'next': None} + + # Technically some v3 paginated APIs return in 'data' but the caller checks the keys for this so + # always returning the cache under results is fine. + res['results'] = [] + for result in path_cache['results']: + res['results'].append(result) + + else: + res = path_cache['results'] + + return res + + elif not is_paginated_url: + # The cache entry had expired or does not exist, start a new blank entry to be filled later. + expires = datetime.datetime.utcnow() + expires += datetime.timedelta(days=1) + server_cache[cache_key] = { + 'expires': expires.strftime(iso_datetime_format), + 'paginated': False, + } + + headers = headers or {} + self._add_auth_token(headers, url, required=auth_required) + + try: + display.vvvv("Calling Galaxy at %s" % url) + resp = open_url(to_native(url), data=args, validate_certs=self.validate_certs, headers=headers, + method=method, timeout=self._server_timeout, http_agent=user_agent(), follow_redirects='safe') + except HTTPError as e: + raise GalaxyError(e, error_context_msg) + except Exception as e: + raise AnsibleError("Unknown error when attempting to call Galaxy at '%s': %s" % (url, to_native(e))) + + resp_data = to_text(resp.read(), errors='surrogate_or_strict') + try: + data = json.loads(resp_data) + except ValueError: + raise AnsibleError("Failed to parse Galaxy response from '%s' as JSON:\n%s" + % (resp.url, to_native(resp_data))) + + if cache and self._cache: + path_cache = self._cache[cache_id][cache_key] + + # v3 can return data or results for paginated results. Scan the result so we can determine what to cache. + paginated_key = None + for key in ['data', 'results']: + if key in data: + paginated_key = key + break + + if paginated_key: + path_cache['paginated'] = True + results = path_cache.setdefault('results', []) + for result in data[paginated_key]: + results.append(result) + + else: + path_cache['results'] = data + + return data + + def _add_auth_token(self, headers, url, token_type=None, required=False): + # Don't add the auth token if one is already present + if 'Authorization' in headers: + return + + if not self.token and required: + raise AnsibleError("No access token or username set. A token can be set with --api-key " + "or at {0}.".format(to_native(C.GALAXY_TOKEN_PATH))) + + if self.token: + headers.update(self.token.headers()) + + @cache_lock + def _set_cache(self): + with open(self._b_cache_path, mode='wb') as fd: + fd.write(to_bytes(json.dumps(self._cache), errors='surrogate_or_strict')) + + @g_connect(['v1']) + def authenticate(self, github_token): + """ + Retrieve an authentication token + """ + url = _urljoin(self.api_server, self.available_api_versions['v1'], "tokens") + '/' + args = urlencode({"github_token": github_token}) + + try: + resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST", http_agent=user_agent(), timeout=self._server_timeout) + except HTTPError as e: + raise GalaxyError(e, 'Attempting to authenticate to galaxy') + except Exception as e: + raise AnsibleError('Unable to authenticate to galaxy: %s' % to_native(e), orig_exc=e) + + data = json.loads(to_text(resp.read(), errors='surrogate_or_strict')) + return data + + @g_connect(['v1']) + def create_import_task(self, github_user, github_repo, reference=None, role_name=None): + """ + Post an import request + """ + url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + '/' + args = { + "github_user": github_user, + "github_repo": github_repo, + "github_reference": reference if reference else "" + } + if role_name: + args['alternate_role_name'] = role_name + elif github_repo.startswith('ansible-role'): + args['alternate_role_name'] = github_repo[len('ansible-role') + 1:] + data = self._call_galaxy(url, args=urlencode(args), method="POST") + if data.get('results', None): + return data['results'] + return data + + @g_connect(['v1']) + def get_import_task(self, task_id=None, github_user=None, github_repo=None): + """ + Check the status of an import task. + """ + url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + if task_id is not None: + url = "%s?id=%d" % (url, task_id) + elif github_user is not None and github_repo is not None: + url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo) + else: + raise AnsibleError("Expected task_id or github_user and github_repo") + + data = self._call_galaxy(url) + return data['results'] + + @g_connect(['v1']) + def lookup_role_by_name(self, role_name, notify=True): + """ + Find a role by name. + """ + role_name = to_text(urlquote(to_bytes(role_name))) + + try: + parts = role_name.split(".") + user_name = ".".join(parts[0:-1]) + role_name = parts[-1] + if notify: + display.display("- downloading role '%s', owned by %s" % (role_name, user_name)) + except Exception: + raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name) + + url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", + "?owner__username=%s&name=%s" % (user_name, role_name)) + data = self._call_galaxy(url) + if len(data["results"]) != 0: + return data["results"][0] + return None + + @g_connect(['v1']) + def fetch_role_related(self, related, role_id): + """ + Fetch the list of related items for the given role. + The url comes from the 'related' field of the role. + """ + + results = [] + try: + url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", role_id, related, + "?page_size=50") + data = self._call_galaxy(url) + results = data['results'] + done = (data.get('next_link', None) is None) + + # https://github.com/ansible/ansible/issues/64355 + # api_server contains part of the API path but next_link includes the /api part so strip it out. + url_info = urlparse(self.api_server) + base_url = "%s://%s/" % (url_info.scheme, url_info.netloc) + + while not done: + url = _urljoin(base_url, data['next_link']) + data = self._call_galaxy(url) + results += data['results'] + done = (data.get('next_link', None) is None) + except Exception as e: + display.warning("Unable to retrieve role (id=%s) data (%s), but this is not fatal so we continue: %s" + % (role_id, related, to_text(e))) + return results + + @g_connect(['v1']) + def get_list(self, what): + """ + Fetch the list of items specified. + """ + try: + url = _urljoin(self.api_server, self.available_api_versions['v1'], what, "?page_size") + data = self._call_galaxy(url) + if "results" in data: + results = data['results'] + else: + results = data + done = True + if "next" in data: + done = (data.get('next_link', None) is None) + while not done: + url = _urljoin(self.api_server, data['next_link']) + data = self._call_galaxy(url) + results += data['results'] + done = (data.get('next_link', None) is None) + return results + except Exception as error: + raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error))) + + @g_connect(['v1']) + def search_roles(self, search, **kwargs): + + search_url = _urljoin(self.api_server, self.available_api_versions['v1'], "search", "roles", "?") + + if search: + search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search))) + + tags = kwargs.get('tags', None) + platforms = kwargs.get('platforms', None) + page_size = kwargs.get('page_size', None) + author = kwargs.get('author', None) + + if tags and isinstance(tags, string_types): + tags = tags.split(',') + search_url += '&tags_autocomplete=' + '+'.join(tags) + + if platforms and isinstance(platforms, string_types): + platforms = platforms.split(',') + search_url += '&platforms_autocomplete=' + '+'.join(platforms) + + if page_size: + search_url += '&page_size=%s' % page_size + + if author: + search_url += '&username_autocomplete=%s' % author + + data = self._call_galaxy(search_url) + return data + + @g_connect(['v1']) + def add_secret(self, source, github_user, github_repo, secret): + url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + '/' + args = urlencode({ + "source": source, + "github_user": github_user, + "github_repo": github_repo, + "secret": secret + }) + data = self._call_galaxy(url, args=args, method="POST") + return data + + @g_connect(['v1']) + def list_secrets(self): + url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + data = self._call_galaxy(url, auth_required=True) + return data + + @g_connect(['v1']) + def remove_secret(self, secret_id): + url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets", secret_id) + '/' + data = self._call_galaxy(url, auth_required=True, method='DELETE') + return data + + @g_connect(['v1']) + def delete_role(self, github_user, github_repo): + url = _urljoin(self.api_server, self.available_api_versions['v1'], "removerole", + "?github_user=%s&github_repo=%s" % (github_user, github_repo)) + data = self._call_galaxy(url, auth_required=True, method='DELETE') + return data + + # Collection APIs # + + @g_connect(['v2', 'v3']) + def publish_collection(self, collection_path): + """ + Publishes a collection to a Galaxy server and returns the import task URI. + + :param collection_path: The path to the collection tarball to publish. + :return: The import task URI that contains the import results. + """ + display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, self.name, self.api_server)) + + b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict') + if not os.path.exists(b_collection_path): + raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path)) + elif not tarfile.is_tarfile(b_collection_path): + raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection " + "build' to create a proper release artifact." % to_native(collection_path)) + + with open(b_collection_path, 'rb') as collection_tar: + sha256 = secure_hash_s(collection_tar.read(), hash_func=hashlib.sha256) + + content_type, b_form_data = prepare_multipart( + { + 'sha256': sha256, + 'file': { + 'filename': b_collection_path, + 'mime_type': 'application/octet-stream', + }, + } + ) + + headers = { + 'Content-type': content_type, + 'Content-length': len(b_form_data), + } + + if 'v3' in self.available_api_versions: + n_url = _urljoin(self.api_server, self.available_api_versions['v3'], 'artifacts', 'collections') + '/' + else: + n_url = _urljoin(self.api_server, self.available_api_versions['v2'], 'collections') + '/' + + resp = self._call_galaxy(n_url, args=b_form_data, headers=headers, method='POST', auth_required=True, + error_context_msg='Error when publishing collection to %s (%s)' + % (self.name, self.api_server)) + + return resp['task'] + + @g_connect(['v2', 'v3']) + def wait_import_task(self, task_id, timeout=0): + """ + Waits until the import process on the Galaxy server has completed or the timeout is reached. + + :param task_id: The id of the import task to wait for. This can be parsed out of the return + value for GalaxyAPI.publish_collection. + :param timeout: The timeout in seconds, 0 is no timeout. + """ + state = 'waiting' + data = None + + # Construct the appropriate URL per version + if 'v3' in self.available_api_versions: + full_url = _urljoin(self.api_server, self.available_api_versions['v3'], + 'imports/collections', task_id, '/') + else: + full_url = _urljoin(self.api_server, self.available_api_versions['v2'], + 'collection-imports', task_id, '/') + + display.display("Waiting until Galaxy import task %s has completed" % full_url) + start = time.time() + wait = 2 + + while timeout == 0 or (time.time() - start) < timeout: + try: + data = self._call_galaxy(full_url, method='GET', auth_required=True, + error_context_msg='Error when getting import task results at %s' % full_url) + except GalaxyError as e: + if e.http_code != 404: + raise + # The import job may not have started, and as such, the task url may not yet exist + display.vvv('Galaxy import process has not started, wait %s seconds before trying again' % wait) + time.sleep(wait) + continue + + state = data.get('state', 'waiting') + + if data.get('finished_at', None): + break + + display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again' + % (state, wait)) + time.sleep(wait) + + # poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds. + wait = min(30, wait * 1.5) + if state == 'waiting': + raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" + % to_native(full_url)) + + for message in data.get('messages', []): + level = message['level'] + if level.lower() == 'error': + display.error("Galaxy import error message: %s" % message['message']) + elif level.lower() == 'warning': + display.warning("Galaxy import warning message: %s" % message['message']) + else: + display.vvv("Galaxy import message: %s - %s" % (level, message['message'])) + + if state == 'failed': + code = to_native(data['error'].get('code', 'UNKNOWN')) + description = to_native( + data['error'].get('description', "Unknown error, see %s for more details" % full_url)) + raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code)) + + @g_connect(['v2', 'v3']) + def get_collection_metadata(self, namespace, name): + """ + Gets the collection information from the Galaxy server about a specific Collection. + + :param namespace: The collection namespace. + :param name: The collection name. + return: CollectionMetadata about the collection. + """ + if 'v3' in self.available_api_versions: + api_path = self.available_api_versions['v3'] + field_map = [ + ('created_str', 'created_at'), + ('modified_str', 'updated_at'), + ] + else: + api_path = self.available_api_versions['v2'] + field_map = [ + ('created_str', 'created'), + ('modified_str', 'modified'), + ] + + info_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, '/') + error_context_msg = 'Error when getting the collection info for %s.%s from %s (%s)' \ + % (namespace, name, self.name, self.api_server) + data = self._call_galaxy(info_url, error_context_msg=error_context_msg) + + metadata = {} + for name, api_field in field_map: + metadata[name] = data.get(api_field, None) + + return CollectionMetadata(namespace, name, **metadata) + + @g_connect(['v2', 'v3']) + def get_collection_version_metadata(self, namespace, name, version): + """ + Gets the collection information from the Galaxy server about a specific Collection version. + + :param namespace: The collection namespace. + :param name: The collection name. + :param version: Version of the collection to get the information for. + :return: CollectionVersionMetadata about the collection at the version requested. + """ + api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2')) + url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version, '/'] + + n_collection_url = _urljoin(*url_paths) + error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \ + % (namespace, name, version, self.name, self.api_server) + data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg, cache=True) + self._set_cache() + + signatures = data.get('signatures') or [] + + return CollectionVersionMetadata(data['namespace']['name'], data['collection']['name'], data['version'], + data['download_url'], data['artifact']['sha256'], + data['metadata']['dependencies'], data['href'], signatures) + + @g_connect(['v2', 'v3']) + def get_collection_versions(self, namespace, name): + """ + Gets a list of available versions for a collection on a Galaxy server. + + :param namespace: The collection namespace. + :param name: The collection name. + :return: A list of versions that are available. + """ + relative_link = False + if 'v3' in self.available_api_versions: + api_path = self.available_api_versions['v3'] + pagination_path = ['links', 'next'] + relative_link = True # AH pagination results are relative an not an absolute URI. + else: + api_path = self.available_api_versions['v2'] + pagination_path = ['next'] + + page_size_name = 'limit' if 'v3' in self.available_api_versions else 'page_size' + versions_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/?%s=%d' % (page_size_name, COLLECTION_PAGE_SIZE)) + versions_url_info = urlparse(versions_url) + cache_key = versions_url_info.path + + # We should only rely on the cache if the collection has not changed. This may slow things down but it ensures + # we are not waiting a day before finding any new collections that have been published. + if self._cache: + server_cache = self._cache.setdefault(get_cache_id(versions_url), {}) + modified_cache = server_cache.setdefault('modified', {}) + + try: + modified_date = self.get_collection_metadata(namespace, name).modified_str + except GalaxyError as err: + if err.http_code != 404: + raise + # No collection found, return an empty list to keep things consistent with the various APIs + return [] + + cached_modified_date = modified_cache.get('%s.%s' % (namespace, name), None) + if cached_modified_date != modified_date: + modified_cache['%s.%s' % (namespace, name)] = modified_date + if versions_url_info.path in server_cache: + del server_cache[cache_key] + + self._set_cache() + + error_context_msg = 'Error when getting available collection versions for %s.%s from %s (%s)' \ + % (namespace, name, self.name, self.api_server) + + try: + data = self._call_galaxy(versions_url, error_context_msg=error_context_msg, cache=True, cache_key=cache_key) + except GalaxyError as err: + if err.http_code != 404: + raise + # v3 doesn't raise a 404 so we need to mimick the empty response from APIs that do. + return [] + + if 'data' in data: + # v3 automation-hub is the only known API that uses `data` + # since v3 pulp_ansible does not, we cannot rely on version + # to indicate which key to use + results_key = 'data' + else: + results_key = 'results' + + versions = [] + while True: + versions += [v['version'] for v in data[results_key]] + + next_link = data + for path in pagination_path: + next_link = next_link.get(path, {}) + + if not next_link: + break + elif relative_link: + # TODO: This assumes the pagination result is relative to the root server. Will need to be verified + # with someone who knows the AH API. + + # Remove the query string from the versions_url to use the next_link's query + versions_url = urljoin(versions_url, urlparse(versions_url).path) + next_link = versions_url.replace(versions_url_info.path, next_link) + + data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'), + error_context_msg=error_context_msg, cache=True, cache_key=cache_key) + self._set_cache() + + return versions + + @g_connect(['v2', 'v3']) + def get_collection_signatures(self, namespace, name, version): + """ + Gets the collection signatures from the Galaxy server about a specific Collection version. + + :param namespace: The collection namespace. + :param name: The collection name. + :param version: Version of the collection to get the information for. + :return: A list of signature strings. + """ + api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2')) + url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version, '/'] + + n_collection_url = _urljoin(*url_paths) + error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \ + % (namespace, name, version, self.name, self.api_server) + data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg, cache=True) + self._set_cache() + + try: + signatures = data["signatures"] + except KeyError: + # Noisy since this is used by the dep resolver, so require more verbosity than Galaxy calls + display.vvvvvv(f"Server {self.api_server} has not signed {namespace}.{name}:{version}") + return [] + else: + return [signature_info["signature"] for signature_info in signatures] |