summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/galaxy/api.py')
-rw-r--r--lib/ansible/galaxy/api.py913
1 files changed, 913 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/api.py b/lib/ansible/galaxy/api.py
new file mode 100644
index 0000000..8dea804
--- /dev/null
+++ b/lib/ansible/galaxy/api.py
@@ -0,0 +1,913 @@
+# (C) 2013, James Cammarata <jcammarata@ansible.com>
+# Copyright: (c) 2019, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import collections
+import datetime
+import functools
+import hashlib
+import json
+import os
+import stat
+import tarfile
+import time
+import threading
+
+from urllib.error import HTTPError
+from urllib.parse import quote as urlquote, urlencode, urlparse, parse_qs, urljoin
+
+from ansible import constants as C
+from ansible.errors import AnsibleError
+from ansible.galaxy.user_agent import user_agent
+from ansible.module_utils.api import retry_with_delays_and_condition
+from ansible.module_utils.api import generate_jittered_backoff
+from ansible.module_utils.six import string_types
+from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.urls import open_url, prepare_multipart
+from ansible.utils.display import Display
+from ansible.utils.hashing import secure_hash_s
+from ansible.utils.path import makedirs_safe
+
+display = Display()
+_CACHE_LOCK = threading.Lock()
+COLLECTION_PAGE_SIZE = 100
+RETRY_HTTP_ERROR_CODES = [ # TODO: Allow user-configuration
+ 429, # Too Many Requests
+ 520, # Galaxy rate limit error code (Cloudflare unknown error)
+]
+
+
+def cache_lock(func):
+ def wrapped(*args, **kwargs):
+ with _CACHE_LOCK:
+ return func(*args, **kwargs)
+
+ return wrapped
+
+
+def is_rate_limit_exception(exception):
+ # Note: cloud.redhat.com masks rate limit errors with 403 (Forbidden) error codes.
+ # Since 403 could reflect the actual problem (such as an expired token), we should
+ # not retry by default.
+ return isinstance(exception, GalaxyError) and exception.http_code in RETRY_HTTP_ERROR_CODES
+
+
+def g_connect(versions):
+ """
+ Wrapper to lazily initialize connection info to Galaxy and verify the API versions required are available on the
+ endpoint.
+
+ :param versions: A list of API versions that the function supports.
+ """
+ def decorator(method):
+ def wrapped(self, *args, **kwargs):
+ if not self._available_api_versions:
+ display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
+
+ # Determine the type of Galaxy server we are talking to. First try it unauthenticated then with Bearer
+ # auth for Automation Hub.
+ n_url = self.api_server
+ error_context_msg = 'Error when finding available api versions from %s (%s)' % (self.name, n_url)
+
+ if self.api_server == 'https://galaxy.ansible.com' or self.api_server == 'https://galaxy.ansible.com/':
+ n_url = 'https://galaxy.ansible.com/api/'
+
+ try:
+ data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg, cache=True)
+ except (AnsibleError, GalaxyError, ValueError, KeyError) as err:
+ # Either the URL doesnt exist, or other error. Or the URL exists, but isn't a galaxy API
+ # root (not JSON, no 'available_versions') so try appending '/api/'
+ if n_url.endswith('/api') or n_url.endswith('/api/'):
+ raise
+
+ # Let exceptions here bubble up but raise the original if this returns a 404 (/api/ wasn't found).
+ n_url = _urljoin(n_url, '/api/')
+ try:
+ data = self._call_galaxy(n_url, method='GET', error_context_msg=error_context_msg, cache=True)
+ except GalaxyError as new_err:
+ if new_err.http_code == 404:
+ raise err
+ raise
+
+ if 'available_versions' not in data:
+ raise AnsibleError("Tried to find galaxy API root at %s but no 'available_versions' are available "
+ "on %s" % (n_url, self.api_server))
+
+ # Update api_server to point to the "real" API root, which in this case could have been the configured
+ # url + '/api/' appended.
+ self.api_server = n_url
+
+ # Default to only supporting v1, if only v1 is returned we also assume that v2 is available even though
+ # it isn't returned in the available_versions dict.
+ available_versions = data.get('available_versions', {u'v1': u'v1/'})
+ if list(available_versions.keys()) == [u'v1']:
+ available_versions[u'v2'] = u'v2/'
+
+ self._available_api_versions = available_versions
+ display.vvvv("Found API version '%s' with Galaxy server %s (%s)"
+ % (', '.join(available_versions.keys()), self.name, self.api_server))
+
+ # Verify that the API versions the function works with are available on the server specified.
+ available_versions = set(self._available_api_versions.keys())
+ common_versions = set(versions).intersection(available_versions)
+ if not common_versions:
+ raise AnsibleError("Galaxy action %s requires API versions '%s' but only '%s' are available on %s %s"
+ % (method.__name__, ", ".join(versions), ", ".join(available_versions),
+ self.name, self.api_server))
+
+ return method(self, *args, **kwargs)
+ return wrapped
+ return decorator
+
+
+def get_cache_id(url):
+ """ Gets the cache ID for the URL specified. """
+ url_info = urlparse(url)
+
+ port = None
+ try:
+ port = url_info.port
+ except ValueError:
+ pass # While the URL is probably invalid, let the caller figure that out when using it
+
+ # Cannot use netloc because it could contain credentials if the server specified had them in there.
+ return '%s:%s' % (url_info.hostname, port or '')
+
+
+@cache_lock
+def _load_cache(b_cache_path):
+ """ Loads the cache file requested if possible. The file must not be world writable. """
+ cache_version = 1
+
+ if not os.path.isfile(b_cache_path):
+ display.vvvv("Creating Galaxy API response cache file at '%s'" % to_text(b_cache_path))
+ with open(b_cache_path, 'w'):
+ os.chmod(b_cache_path, 0o600)
+
+ cache_mode = os.stat(b_cache_path).st_mode
+ if cache_mode & stat.S_IWOTH:
+ display.warning("Galaxy cache has world writable access (%s), ignoring it as a cache source."
+ % to_text(b_cache_path))
+ return
+
+ with open(b_cache_path, mode='rb') as fd:
+ json_val = to_text(fd.read(), errors='surrogate_or_strict')
+
+ try:
+ cache = json.loads(json_val)
+ except ValueError:
+ cache = None
+
+ if not isinstance(cache, dict) or cache.get('version', None) != cache_version:
+ display.vvvv("Galaxy cache file at '%s' has an invalid version, clearing" % to_text(b_cache_path))
+ cache = {'version': cache_version}
+
+ # Set the cache after we've cleared the existing entries
+ with open(b_cache_path, mode='wb') as fd:
+ fd.write(to_bytes(json.dumps(cache), errors='surrogate_or_strict'))
+
+ return cache
+
+
+def _urljoin(*args):
+ return '/'.join(to_native(a, errors='surrogate_or_strict').strip('/') for a in args + ('',) if a)
+
+
+class GalaxyError(AnsibleError):
+ """ Error for bad Galaxy server responses. """
+
+ def __init__(self, http_error, message):
+ super(GalaxyError, self).__init__(message)
+ self.http_code = http_error.code
+ self.url = http_error.geturl()
+
+ try:
+ http_msg = to_text(http_error.read())
+ err_info = json.loads(http_msg)
+ except (AttributeError, ValueError):
+ err_info = {}
+
+ url_split = self.url.split('/')
+ if 'v2' in url_split:
+ galaxy_msg = err_info.get('message', http_error.reason)
+ code = err_info.get('code', 'Unknown')
+ full_error_msg = u"%s (HTTP Code: %d, Message: %s Code: %s)" % (message, self.http_code, galaxy_msg, code)
+ elif 'v3' in url_split:
+ errors = err_info.get('errors', [])
+ if not errors:
+ errors = [{}] # Defaults are set below, we just need to make sure 1 error is present.
+
+ message_lines = []
+ for error in errors:
+ error_msg = error.get('detail') or error.get('title') or http_error.reason
+ error_code = error.get('code') or 'Unknown'
+ message_line = u"(HTTP Code: %d, Message: %s Code: %s)" % (self.http_code, error_msg, error_code)
+ message_lines.append(message_line)
+
+ full_error_msg = "%s %s" % (message, ', '.join(message_lines))
+ else:
+ # v1 and unknown API endpoints
+ galaxy_msg = err_info.get('default', http_error.reason)
+ full_error_msg = u"%s (HTTP Code: %d, Message: %s)" % (message, self.http_code, galaxy_msg)
+
+ self.message = to_native(full_error_msg)
+
+
+# Keep the raw string results for the date. It's too complex to parse as a datetime object and the various APIs return
+# them in different formats.
+CollectionMetadata = collections.namedtuple('CollectionMetadata', ['namespace', 'name', 'created_str', 'modified_str'])
+
+
+class CollectionVersionMetadata:
+
+ def __init__(self, namespace, name, version, download_url, artifact_sha256, dependencies, signatures_url, signatures):
+ """
+ Contains common information about a collection on a Galaxy server to smooth through API differences for
+ Collection and define a standard meta info for a collection.
+
+ :param namespace: The namespace name.
+ :param name: The collection name.
+ :param version: The version that the metadata refers to.
+ :param download_url: The URL to download the collection.
+ :param artifact_sha256: The SHA256 of the collection artifact for later verification.
+ :param dependencies: A dict of dependencies of the collection.
+ :param signatures_url: The URL to the specific version of the collection.
+ :param signatures: The list of signatures found at the signatures_url.
+ """
+ self.namespace = namespace
+ self.name = name
+ self.version = version
+ self.download_url = download_url
+ self.artifact_sha256 = artifact_sha256
+ self.dependencies = dependencies
+ self.signatures_url = signatures_url
+ self.signatures = signatures
+
+
+@functools.total_ordering
+class GalaxyAPI:
+ """ This class is meant to be used as a API client for an Ansible Galaxy server """
+
+ def __init__(
+ self, galaxy, name, url,
+ username=None, password=None, token=None, validate_certs=True,
+ available_api_versions=None,
+ clear_response_cache=False, no_cache=True,
+ priority=float('inf'),
+ timeout=60,
+ ):
+ self.galaxy = galaxy
+ self.name = name
+ self.username = username
+ self.password = password
+ self.token = token
+ self.api_server = url
+ self.validate_certs = validate_certs
+ self.timeout = timeout
+ self._available_api_versions = available_api_versions or {}
+ self._priority = priority
+ self._server_timeout = timeout
+
+ b_cache_dir = to_bytes(C.GALAXY_CACHE_DIR, errors='surrogate_or_strict')
+ makedirs_safe(b_cache_dir, mode=0o700)
+ self._b_cache_path = os.path.join(b_cache_dir, b'api.json')
+
+ if clear_response_cache:
+ with _CACHE_LOCK:
+ if os.path.exists(self._b_cache_path):
+ display.vvvv("Clearing cache file (%s)" % to_text(self._b_cache_path))
+ os.remove(self._b_cache_path)
+
+ self._cache = None
+ if not no_cache:
+ self._cache = _load_cache(self._b_cache_path)
+
+ display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs))
+
+ def __str__(self):
+ # type: (GalaxyAPI) -> str
+ """Render GalaxyAPI as a native string representation."""
+ return to_native(self.name)
+
+ def __unicode__(self):
+ # type: (GalaxyAPI) -> str
+ """Render GalaxyAPI as a unicode/text string representation."""
+ return to_text(self.name)
+
+ def __repr__(self):
+ # type: (GalaxyAPI) -> str
+ """Render GalaxyAPI as an inspectable string representation."""
+ return (
+ '<{instance!s} "{name!s}" @ {url!s} with priority {priority!s}>'.
+ format(
+ instance=self, name=self.name,
+ priority=self._priority, url=self.api_server,
+ )
+ )
+
+ def __lt__(self, other_galaxy_api):
+ # type: (GalaxyAPI, GalaxyAPI) -> bool
+ """Return whether the instance priority is higher than other."""
+ if not isinstance(other_galaxy_api, self.__class__):
+ return NotImplemented
+
+ return (
+ self._priority > other_galaxy_api._priority or
+ self.name < self.name
+ )
+
+ @property # type: ignore[misc] # https://github.com/python/mypy/issues/1362
+ @g_connect(['v1', 'v2', 'v3'])
+ def available_api_versions(self):
+ # Calling g_connect will populate self._available_api_versions
+ return self._available_api_versions
+
+ @retry_with_delays_and_condition(
+ backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40),
+ should_retry_error=is_rate_limit_exception
+ )
+ def _call_galaxy(self, url, args=None, headers=None, method=None, auth_required=False, error_context_msg=None,
+ cache=False, cache_key=None):
+ url_info = urlparse(url)
+ cache_id = get_cache_id(url)
+ if not cache_key:
+ cache_key = url_info.path
+ query = parse_qs(url_info.query)
+ if cache and self._cache:
+ server_cache = self._cache.setdefault(cache_id, {})
+ iso_datetime_format = '%Y-%m-%dT%H:%M:%SZ'
+
+ valid = False
+ if cache_key in server_cache:
+ expires = datetime.datetime.strptime(server_cache[cache_key]['expires'], iso_datetime_format)
+ valid = datetime.datetime.utcnow() < expires
+
+ is_paginated_url = 'page' in query or 'offset' in query
+ if valid and not is_paginated_url:
+ # Got a hit on the cache and we aren't getting a paginated response
+ path_cache = server_cache[cache_key]
+ if path_cache.get('paginated'):
+ if '/v3/' in cache_key:
+ res = {'links': {'next': None}}
+ else:
+ res = {'next': None}
+
+ # Technically some v3 paginated APIs return in 'data' but the caller checks the keys for this so
+ # always returning the cache under results is fine.
+ res['results'] = []
+ for result in path_cache['results']:
+ res['results'].append(result)
+
+ else:
+ res = path_cache['results']
+
+ return res
+
+ elif not is_paginated_url:
+ # The cache entry had expired or does not exist, start a new blank entry to be filled later.
+ expires = datetime.datetime.utcnow()
+ expires += datetime.timedelta(days=1)
+ server_cache[cache_key] = {
+ 'expires': expires.strftime(iso_datetime_format),
+ 'paginated': False,
+ }
+
+ headers = headers or {}
+ self._add_auth_token(headers, url, required=auth_required)
+
+ try:
+ display.vvvv("Calling Galaxy at %s" % url)
+ resp = open_url(to_native(url), data=args, validate_certs=self.validate_certs, headers=headers,
+ method=method, timeout=self._server_timeout, http_agent=user_agent(), follow_redirects='safe')
+ except HTTPError as e:
+ raise GalaxyError(e, error_context_msg)
+ except Exception as e:
+ raise AnsibleError("Unknown error when attempting to call Galaxy at '%s': %s" % (url, to_native(e)))
+
+ resp_data = to_text(resp.read(), errors='surrogate_or_strict')
+ try:
+ data = json.loads(resp_data)
+ except ValueError:
+ raise AnsibleError("Failed to parse Galaxy response from '%s' as JSON:\n%s"
+ % (resp.url, to_native(resp_data)))
+
+ if cache and self._cache:
+ path_cache = self._cache[cache_id][cache_key]
+
+ # v3 can return data or results for paginated results. Scan the result so we can determine what to cache.
+ paginated_key = None
+ for key in ['data', 'results']:
+ if key in data:
+ paginated_key = key
+ break
+
+ if paginated_key:
+ path_cache['paginated'] = True
+ results = path_cache.setdefault('results', [])
+ for result in data[paginated_key]:
+ results.append(result)
+
+ else:
+ path_cache['results'] = data
+
+ return data
+
+ def _add_auth_token(self, headers, url, token_type=None, required=False):
+ # Don't add the auth token if one is already present
+ if 'Authorization' in headers:
+ return
+
+ if not self.token and required:
+ raise AnsibleError("No access token or username set. A token can be set with --api-key "
+ "or at {0}.".format(to_native(C.GALAXY_TOKEN_PATH)))
+
+ if self.token:
+ headers.update(self.token.headers())
+
+ @cache_lock
+ def _set_cache(self):
+ with open(self._b_cache_path, mode='wb') as fd:
+ fd.write(to_bytes(json.dumps(self._cache), errors='surrogate_or_strict'))
+
+ @g_connect(['v1'])
+ def authenticate(self, github_token):
+ """
+ Retrieve an authentication token
+ """
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "tokens") + '/'
+ args = urlencode({"github_token": github_token})
+
+ try:
+ resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST", http_agent=user_agent(), timeout=self._server_timeout)
+ except HTTPError as e:
+ raise GalaxyError(e, 'Attempting to authenticate to galaxy')
+ except Exception as e:
+ raise AnsibleError('Unable to authenticate to galaxy: %s' % to_native(e), orig_exc=e)
+
+ data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
+ return data
+
+ @g_connect(['v1'])
+ def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
+ """
+ Post an import request
+ """
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports") + '/'
+ args = {
+ "github_user": github_user,
+ "github_repo": github_repo,
+ "github_reference": reference if reference else ""
+ }
+ if role_name:
+ args['alternate_role_name'] = role_name
+ elif github_repo.startswith('ansible-role'):
+ args['alternate_role_name'] = github_repo[len('ansible-role') + 1:]
+ data = self._call_galaxy(url, args=urlencode(args), method="POST")
+ if data.get('results', None):
+ return data['results']
+ return data
+
+ @g_connect(['v1'])
+ def get_import_task(self, task_id=None, github_user=None, github_repo=None):
+ """
+ Check the status of an import task.
+ """
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "imports")
+ if task_id is not None:
+ url = "%s?id=%d" % (url, task_id)
+ elif github_user is not None and github_repo is not None:
+ url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo)
+ else:
+ raise AnsibleError("Expected task_id or github_user and github_repo")
+
+ data = self._call_galaxy(url)
+ return data['results']
+
+ @g_connect(['v1'])
+ def lookup_role_by_name(self, role_name, notify=True):
+ """
+ Find a role by name.
+ """
+ role_name = to_text(urlquote(to_bytes(role_name)))
+
+ try:
+ parts = role_name.split(".")
+ user_name = ".".join(parts[0:-1])
+ role_name = parts[-1]
+ if notify:
+ display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
+ except Exception:
+ raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
+
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles",
+ "?owner__username=%s&name=%s" % (user_name, role_name))
+ data = self._call_galaxy(url)
+ if len(data["results"]) != 0:
+ return data["results"][0]
+ return None
+
+ @g_connect(['v1'])
+ def fetch_role_related(self, related, role_id):
+ """
+ Fetch the list of related items for the given role.
+ The url comes from the 'related' field of the role.
+ """
+
+ results = []
+ try:
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "roles", role_id, related,
+ "?page_size=50")
+ data = self._call_galaxy(url)
+ results = data['results']
+ done = (data.get('next_link', None) is None)
+
+ # https://github.com/ansible/ansible/issues/64355
+ # api_server contains part of the API path but next_link includes the /api part so strip it out.
+ url_info = urlparse(self.api_server)
+ base_url = "%s://%s/" % (url_info.scheme, url_info.netloc)
+
+ while not done:
+ url = _urljoin(base_url, data['next_link'])
+ data = self._call_galaxy(url)
+ results += data['results']
+ done = (data.get('next_link', None) is None)
+ except Exception as e:
+ display.warning("Unable to retrieve role (id=%s) data (%s), but this is not fatal so we continue: %s"
+ % (role_id, related, to_text(e)))
+ return results
+
+ @g_connect(['v1'])
+ def get_list(self, what):
+ """
+ Fetch the list of items specified.
+ """
+ try:
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], what, "?page_size")
+ data = self._call_galaxy(url)
+ if "results" in data:
+ results = data['results']
+ else:
+ results = data
+ done = True
+ if "next" in data:
+ done = (data.get('next_link', None) is None)
+ while not done:
+ url = _urljoin(self.api_server, data['next_link'])
+ data = self._call_galaxy(url)
+ results += data['results']
+ done = (data.get('next_link', None) is None)
+ return results
+ except Exception as error:
+ raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error)))
+
+ @g_connect(['v1'])
+ def search_roles(self, search, **kwargs):
+
+ search_url = _urljoin(self.api_server, self.available_api_versions['v1'], "search", "roles", "?")
+
+ if search:
+ search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
+
+ tags = kwargs.get('tags', None)
+ platforms = kwargs.get('platforms', None)
+ page_size = kwargs.get('page_size', None)
+ author = kwargs.get('author', None)
+
+ if tags and isinstance(tags, string_types):
+ tags = tags.split(',')
+ search_url += '&tags_autocomplete=' + '+'.join(tags)
+
+ if platforms and isinstance(platforms, string_types):
+ platforms = platforms.split(',')
+ search_url += '&platforms_autocomplete=' + '+'.join(platforms)
+
+ if page_size:
+ search_url += '&page_size=%s' % page_size
+
+ if author:
+ search_url += '&username_autocomplete=%s' % author
+
+ data = self._call_galaxy(search_url)
+ return data
+
+ @g_connect(['v1'])
+ def add_secret(self, source, github_user, github_repo, secret):
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets") + '/'
+ args = urlencode({
+ "source": source,
+ "github_user": github_user,
+ "github_repo": github_repo,
+ "secret": secret
+ })
+ data = self._call_galaxy(url, args=args, method="POST")
+ return data
+
+ @g_connect(['v1'])
+ def list_secrets(self):
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets")
+ data = self._call_galaxy(url, auth_required=True)
+ return data
+
+ @g_connect(['v1'])
+ def remove_secret(self, secret_id):
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "notification_secrets", secret_id) + '/'
+ data = self._call_galaxy(url, auth_required=True, method='DELETE')
+ return data
+
+ @g_connect(['v1'])
+ def delete_role(self, github_user, github_repo):
+ url = _urljoin(self.api_server, self.available_api_versions['v1'], "removerole",
+ "?github_user=%s&github_repo=%s" % (github_user, github_repo))
+ data = self._call_galaxy(url, auth_required=True, method='DELETE')
+ return data
+
+ # Collection APIs #
+
+ @g_connect(['v2', 'v3'])
+ def publish_collection(self, collection_path):
+ """
+ Publishes a collection to a Galaxy server and returns the import task URI.
+
+ :param collection_path: The path to the collection tarball to publish.
+ :return: The import task URI that contains the import results.
+ """
+ display.display("Publishing collection artifact '%s' to %s %s" % (collection_path, self.name, self.api_server))
+
+ b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
+ if not os.path.exists(b_collection_path):
+ raise AnsibleError("The collection path specified '%s' does not exist." % to_native(collection_path))
+ elif not tarfile.is_tarfile(b_collection_path):
+ raise AnsibleError("The collection path specified '%s' is not a tarball, use 'ansible-galaxy collection "
+ "build' to create a proper release artifact." % to_native(collection_path))
+
+ with open(b_collection_path, 'rb') as collection_tar:
+ sha256 = secure_hash_s(collection_tar.read(), hash_func=hashlib.sha256)
+
+ content_type, b_form_data = prepare_multipart(
+ {
+ 'sha256': sha256,
+ 'file': {
+ 'filename': b_collection_path,
+ 'mime_type': 'application/octet-stream',
+ },
+ }
+ )
+
+ headers = {
+ 'Content-type': content_type,
+ 'Content-length': len(b_form_data),
+ }
+
+ if 'v3' in self.available_api_versions:
+ n_url = _urljoin(self.api_server, self.available_api_versions['v3'], 'artifacts', 'collections') + '/'
+ else:
+ n_url = _urljoin(self.api_server, self.available_api_versions['v2'], 'collections') + '/'
+
+ resp = self._call_galaxy(n_url, args=b_form_data, headers=headers, method='POST', auth_required=True,
+ error_context_msg='Error when publishing collection to %s (%s)'
+ % (self.name, self.api_server))
+
+ return resp['task']
+
+ @g_connect(['v2', 'v3'])
+ def wait_import_task(self, task_id, timeout=0):
+ """
+ Waits until the import process on the Galaxy server has completed or the timeout is reached.
+
+ :param task_id: The id of the import task to wait for. This can be parsed out of the return
+ value for GalaxyAPI.publish_collection.
+ :param timeout: The timeout in seconds, 0 is no timeout.
+ """
+ state = 'waiting'
+ data = None
+
+ # Construct the appropriate URL per version
+ if 'v3' in self.available_api_versions:
+ full_url = _urljoin(self.api_server, self.available_api_versions['v3'],
+ 'imports/collections', task_id, '/')
+ else:
+ full_url = _urljoin(self.api_server, self.available_api_versions['v2'],
+ 'collection-imports', task_id, '/')
+
+ display.display("Waiting until Galaxy import task %s has completed" % full_url)
+ start = time.time()
+ wait = 2
+
+ while timeout == 0 or (time.time() - start) < timeout:
+ try:
+ data = self._call_galaxy(full_url, method='GET', auth_required=True,
+ error_context_msg='Error when getting import task results at %s' % full_url)
+ except GalaxyError as e:
+ if e.http_code != 404:
+ raise
+ # The import job may not have started, and as such, the task url may not yet exist
+ display.vvv('Galaxy import process has not started, wait %s seconds before trying again' % wait)
+ time.sleep(wait)
+ continue
+
+ state = data.get('state', 'waiting')
+
+ if data.get('finished_at', None):
+ break
+
+ display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again'
+ % (state, wait))
+ time.sleep(wait)
+
+ # poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds.
+ wait = min(30, wait * 1.5)
+ if state == 'waiting':
+ raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
+ % to_native(full_url))
+
+ for message in data.get('messages', []):
+ level = message['level']
+ if level.lower() == 'error':
+ display.error("Galaxy import error message: %s" % message['message'])
+ elif level.lower() == 'warning':
+ display.warning("Galaxy import warning message: %s" % message['message'])
+ else:
+ display.vvv("Galaxy import message: %s - %s" % (level, message['message']))
+
+ if state == 'failed':
+ code = to_native(data['error'].get('code', 'UNKNOWN'))
+ description = to_native(
+ data['error'].get('description', "Unknown error, see %s for more details" % full_url))
+ raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code))
+
+ @g_connect(['v2', 'v3'])
+ def get_collection_metadata(self, namespace, name):
+ """
+ Gets the collection information from the Galaxy server about a specific Collection.
+
+ :param namespace: The collection namespace.
+ :param name: The collection name.
+ return: CollectionMetadata about the collection.
+ """
+ if 'v3' in self.available_api_versions:
+ api_path = self.available_api_versions['v3']
+ field_map = [
+ ('created_str', 'created_at'),
+ ('modified_str', 'updated_at'),
+ ]
+ else:
+ api_path = self.available_api_versions['v2']
+ field_map = [
+ ('created_str', 'created'),
+ ('modified_str', 'modified'),
+ ]
+
+ info_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, '/')
+ error_context_msg = 'Error when getting the collection info for %s.%s from %s (%s)' \
+ % (namespace, name, self.name, self.api_server)
+ data = self._call_galaxy(info_url, error_context_msg=error_context_msg)
+
+ metadata = {}
+ for name, api_field in field_map:
+ metadata[name] = data.get(api_field, None)
+
+ return CollectionMetadata(namespace, name, **metadata)
+
+ @g_connect(['v2', 'v3'])
+ def get_collection_version_metadata(self, namespace, name, version):
+ """
+ Gets the collection information from the Galaxy server about a specific Collection version.
+
+ :param namespace: The collection namespace.
+ :param name: The collection name.
+ :param version: Version of the collection to get the information for.
+ :return: CollectionVersionMetadata about the collection at the version requested.
+ """
+ api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2'))
+ url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version, '/']
+
+ n_collection_url = _urljoin(*url_paths)
+ error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \
+ % (namespace, name, version, self.name, self.api_server)
+ data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg, cache=True)
+ self._set_cache()
+
+ signatures = data.get('signatures') or []
+
+ return CollectionVersionMetadata(data['namespace']['name'], data['collection']['name'], data['version'],
+ data['download_url'], data['artifact']['sha256'],
+ data['metadata']['dependencies'], data['href'], signatures)
+
+ @g_connect(['v2', 'v3'])
+ def get_collection_versions(self, namespace, name):
+ """
+ Gets a list of available versions for a collection on a Galaxy server.
+
+ :param namespace: The collection namespace.
+ :param name: The collection name.
+ :return: A list of versions that are available.
+ """
+ relative_link = False
+ if 'v3' in self.available_api_versions:
+ api_path = self.available_api_versions['v3']
+ pagination_path = ['links', 'next']
+ relative_link = True # AH pagination results are relative an not an absolute URI.
+ else:
+ api_path = self.available_api_versions['v2']
+ pagination_path = ['next']
+
+ page_size_name = 'limit' if 'v3' in self.available_api_versions else 'page_size'
+ versions_url = _urljoin(self.api_server, api_path, 'collections', namespace, name, 'versions', '/?%s=%d' % (page_size_name, COLLECTION_PAGE_SIZE))
+ versions_url_info = urlparse(versions_url)
+ cache_key = versions_url_info.path
+
+ # We should only rely on the cache if the collection has not changed. This may slow things down but it ensures
+ # we are not waiting a day before finding any new collections that have been published.
+ if self._cache:
+ server_cache = self._cache.setdefault(get_cache_id(versions_url), {})
+ modified_cache = server_cache.setdefault('modified', {})
+
+ try:
+ modified_date = self.get_collection_metadata(namespace, name).modified_str
+ except GalaxyError as err:
+ if err.http_code != 404:
+ raise
+ # No collection found, return an empty list to keep things consistent with the various APIs
+ return []
+
+ cached_modified_date = modified_cache.get('%s.%s' % (namespace, name), None)
+ if cached_modified_date != modified_date:
+ modified_cache['%s.%s' % (namespace, name)] = modified_date
+ if versions_url_info.path in server_cache:
+ del server_cache[cache_key]
+
+ self._set_cache()
+
+ error_context_msg = 'Error when getting available collection versions for %s.%s from %s (%s)' \
+ % (namespace, name, self.name, self.api_server)
+
+ try:
+ data = self._call_galaxy(versions_url, error_context_msg=error_context_msg, cache=True, cache_key=cache_key)
+ except GalaxyError as err:
+ if err.http_code != 404:
+ raise
+ # v3 doesn't raise a 404 so we need to mimick the empty response from APIs that do.
+ return []
+
+ if 'data' in data:
+ # v3 automation-hub is the only known API that uses `data`
+ # since v3 pulp_ansible does not, we cannot rely on version
+ # to indicate which key to use
+ results_key = 'data'
+ else:
+ results_key = 'results'
+
+ versions = []
+ while True:
+ versions += [v['version'] for v in data[results_key]]
+
+ next_link = data
+ for path in pagination_path:
+ next_link = next_link.get(path, {})
+
+ if not next_link:
+ break
+ elif relative_link:
+ # TODO: This assumes the pagination result is relative to the root server. Will need to be verified
+ # with someone who knows the AH API.
+
+ # Remove the query string from the versions_url to use the next_link's query
+ versions_url = urljoin(versions_url, urlparse(versions_url).path)
+ next_link = versions_url.replace(versions_url_info.path, next_link)
+
+ data = self._call_galaxy(to_native(next_link, errors='surrogate_or_strict'),
+ error_context_msg=error_context_msg, cache=True, cache_key=cache_key)
+ self._set_cache()
+
+ return versions
+
+ @g_connect(['v2', 'v3'])
+ def get_collection_signatures(self, namespace, name, version):
+ """
+ Gets the collection signatures from the Galaxy server about a specific Collection version.
+
+ :param namespace: The collection namespace.
+ :param name: The collection name.
+ :param version: Version of the collection to get the information for.
+ :return: A list of signature strings.
+ """
+ api_path = self.available_api_versions.get('v3', self.available_api_versions.get('v2'))
+ url_paths = [self.api_server, api_path, 'collections', namespace, name, 'versions', version, '/']
+
+ n_collection_url = _urljoin(*url_paths)
+ error_context_msg = 'Error when getting collection version metadata for %s.%s:%s from %s (%s)' \
+ % (namespace, name, version, self.name, self.api_server)
+ data = self._call_galaxy(n_collection_url, error_context_msg=error_context_msg, cache=True)
+ self._set_cache()
+
+ try:
+ signatures = data["signatures"]
+ except KeyError:
+ # Noisy since this is used by the dep resolver, so require more verbosity than Galaxy calls
+ display.vvvvvv(f"Server {self.api_server} has not signed {namespace}.{name}:{version}")
+ return []
+ else:
+ return [signature_info["signature"] for signature_info in signatures]