diff options
Diffstat (limited to '')
-rwxr-xr-x | bin/ansible-galaxy | 1865 |
1 files changed, 1865 insertions, 0 deletions
diff --git a/bin/ansible-galaxy b/bin/ansible-galaxy new file mode 100755 index 0000000..3cb7fe2 --- /dev/null +++ b/bin/ansible-galaxy @@ -0,0 +1,1865 @@ +#!/usr/bin/env python +# Copyright: (c) 2013, James Cammarata <jcammarata@ansible.com> +# Copyright: (c) 2018-2021, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +# PYTHON_ARGCOMPLETE_OK + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +# ansible.cli needs to be imported first, to ensure the source bin/* scripts run that code first +from ansible.cli import CLI + +import json +import os.path +import re +import shutil +import sys +import textwrap +import time +import typing as t + +from dataclasses import dataclass +from yaml.error import YAMLError + +import ansible.constants as C +from ansible import context +from ansible.cli.arguments import option_helpers as opt_help +from ansible.errors import AnsibleError, AnsibleOptionsError +from ansible.galaxy import Galaxy, get_collections_galaxy_meta_info +from ansible.galaxy.api import GalaxyAPI +from ansible.galaxy.collection import ( + build_collection, + download_collections, + find_existing_collections, + install_collections, + publish_collection, + validate_collection_name, + validate_collection_path, + verify_collections, + SIGNATURE_COUNT_RE, +) +from ansible.galaxy.collection.concrete_artifact_manager import ( + ConcreteArtifactsManager, +) +from ansible.galaxy.collection.gpg import GPG_ERROR_MAP +from ansible.galaxy.dependency_resolution.dataclasses import Requirement + +from ansible.galaxy.role import GalaxyRole +from ansible.galaxy.token import BasicAuthToken, GalaxyToken, KeycloakToken, NoTokenSentinel +from ansible.module_utils.ansible_release import __version__ as ansible_version +from ansible.module_utils.common.collections import is_iterable +from ansible.module_utils.common.yaml import yaml_dump, yaml_load +from ansible.module_utils._text import to_bytes, to_native, to_text +from ansible.module_utils import six +from ansible.parsing.dataloader import DataLoader +from ansible.parsing.yaml.loader import AnsibleLoader +from ansible.playbook.role.requirement import RoleRequirement +from ansible.template import Templar +from ansible.utils.collection_loader import AnsibleCollectionConfig +from ansible.utils.display import Display +from ansible.utils.plugin_docs import get_versioned_doclink + +display = Display() +urlparse = six.moves.urllib.parse.urlparse + +# config definition by position: name, required, type +SERVER_DEF = [ + ('url', True, 'str'), + ('username', False, 'str'), + ('password', False, 'str'), + ('token', False, 'str'), + ('auth_url', False, 'str'), + ('v3', False, 'bool'), + ('validate_certs', False, 'bool'), + ('client_id', False, 'str'), + ('timeout', False, 'int'), +] + +# config definition fields +SERVER_ADDITIONAL = { + 'v3': {'default': 'False'}, + 'validate_certs': {'cli': [{'name': 'validate_certs'}]}, + 'timeout': {'default': '60', 'cli': [{'name': 'timeout'}]}, + 'token': {'default': None}, +} + + +def with_collection_artifacts_manager(wrapped_method): + """Inject an artifacts manager if not passed explicitly. + + This decorator constructs a ConcreteArtifactsManager and maintains + the related temporary directory auto-cleanup around the target + method invocation. + """ + def method_wrapper(*args, **kwargs): + if 'artifacts_manager' in kwargs: + return wrapped_method(*args, **kwargs) + + # FIXME: use validate_certs context from Galaxy servers when downloading collections + artifacts_manager_kwargs = {'validate_certs': context.CLIARGS['resolved_validate_certs']} + + keyring = context.CLIARGS.get('keyring', None) + if keyring is not None: + artifacts_manager_kwargs.update({ + 'keyring': GalaxyCLI._resolve_path(keyring), + 'required_signature_count': context.CLIARGS.get('required_valid_signature_count', None), + 'ignore_signature_errors': context.CLIARGS.get('ignore_gpg_errors', None), + }) + + with ConcreteArtifactsManager.under_tmpdir( + C.DEFAULT_LOCAL_TMP, + **artifacts_manager_kwargs + ) as concrete_artifact_cm: + kwargs['artifacts_manager'] = concrete_artifact_cm + return wrapped_method(*args, **kwargs) + return method_wrapper + + +def _display_header(path, h1, h2, w1=10, w2=7): + display.display('\n# {0}\n{1:{cwidth}} {2:{vwidth}}\n{3} {4}\n'.format( + path, + h1, + h2, + '-' * max([len(h1), w1]), # Make sure that the number of dashes is at least the width of the header + '-' * max([len(h2), w2]), + cwidth=w1, + vwidth=w2, + )) + + +def _display_role(gr): + install_info = gr.install_info + version = None + if install_info: + version = install_info.get("version", None) + if not version: + version = "(unknown version)" + display.display("- %s, %s" % (gr.name, version)) + + +def _display_collection(collection, cwidth=10, vwidth=7, min_cwidth=10, min_vwidth=7): + display.display('{fqcn:{cwidth}} {version:{vwidth}}'.format( + fqcn=to_text(collection.fqcn), + version=collection.ver, + cwidth=max(cwidth, min_cwidth), # Make sure the width isn't smaller than the header + vwidth=max(vwidth, min_vwidth) + )) + + +def _get_collection_widths(collections): + if not is_iterable(collections): + collections = (collections, ) + + fqcn_set = {to_text(c.fqcn) for c in collections} + version_set = {to_text(c.ver) for c in collections} + + fqcn_length = len(max(fqcn_set, key=len)) + version_length = len(max(version_set, key=len)) + + return fqcn_length, version_length + + +def validate_signature_count(value): + match = re.match(SIGNATURE_COUNT_RE, value) + + if match is None: + raise ValueError(f"{value} is not a valid signature count value") + + return value + + +@dataclass +class RoleDistributionServer: + _api: t.Union[GalaxyAPI, None] + api_servers: list[GalaxyAPI] + + @property + def api(self): + if self._api: + return self._api + + for server in self.api_servers: + try: + if u'v1' in server.available_api_versions: + self._api = server + break + except Exception: + continue + + if not self._api: + self._api = self.api_servers[0] + + return self._api + + +class GalaxyCLI(CLI): + '''Command to manage Ansible roles and collections. + + None of the CLI tools are designed to run concurrently with themselves. + Use an external scheduler and/or locking to ensure there are no clashing operations. + ''' + + name = 'ansible-galaxy' + + SKIP_INFO_KEYS = ("name", "description", "readme_html", "related", "summary_fields", "average_aw_composite", "average_aw_score", "url") + + def __init__(self, args): + self._raw_args = args + self._implicit_role = False + + if len(args) > 1: + # Inject role into sys.argv[1] as a backwards compatibility step + if args[1] not in ['-h', '--help', '--version'] and 'role' not in args and 'collection' not in args: + # TODO: Should we add a warning here and eventually deprecate the implicit role subcommand choice + args.insert(1, 'role') + self._implicit_role = True + # since argparse doesn't allow hidden subparsers, handle dead login arg from raw args after "role" normalization + if args[1:3] == ['role', 'login']: + display.error( + "The login command was removed in late 2020. An API key is now required to publish roles or collections " + "to Galaxy. The key can be found at https://galaxy.ansible.com/me/preferences, and passed to the " + "ansible-galaxy CLI via a file at {0} or (insecurely) via the `--token` " + "command-line argument.".format(to_text(C.GALAXY_TOKEN_PATH))) + sys.exit(1) + + self.api_servers = [] + self.galaxy = None + self.lazy_role_api = None + super(GalaxyCLI, self).__init__(args) + + def init_parser(self): + ''' create an options parser for bin/ansible ''' + + super(GalaxyCLI, self).init_parser( + desc="Perform various Role and Collection related operations.", + ) + + # Common arguments that apply to more than 1 action + common = opt_help.argparse.ArgumentParser(add_help=False) + common.add_argument('-s', '--server', dest='api_server', help='The Galaxy API server URL') + common.add_argument('--token', '--api-key', dest='api_key', + help='The Ansible Galaxy API key which can be found at ' + 'https://galaxy.ansible.com/me/preferences.') + common.add_argument('-c', '--ignore-certs', action='store_true', dest='ignore_certs', help='Ignore SSL certificate validation errors.', default=None) + common.add_argument('--timeout', dest='timeout', type=int, + help="The time to wait for operations against the galaxy server, defaults to 60s.") + + opt_help.add_verbosity_options(common) + + force = opt_help.argparse.ArgumentParser(add_help=False) + force.add_argument('-f', '--force', dest='force', action='store_true', default=False, + help='Force overwriting an existing role or collection') + + github = opt_help.argparse.ArgumentParser(add_help=False) + github.add_argument('github_user', help='GitHub username') + github.add_argument('github_repo', help='GitHub repository') + + offline = opt_help.argparse.ArgumentParser(add_help=False) + offline.add_argument('--offline', dest='offline', default=False, action='store_true', + help="Don't query the galaxy API when creating roles") + + default_roles_path = C.config.get_configuration_definition('DEFAULT_ROLES_PATH').get('default', '') + roles_path = opt_help.argparse.ArgumentParser(add_help=False) + roles_path.add_argument('-p', '--roles-path', dest='roles_path', type=opt_help.unfrack_path(pathsep=True), + default=C.DEFAULT_ROLES_PATH, action=opt_help.PrependListAction, + help='The path to the directory containing your roles. The default is the first ' + 'writable one configured via DEFAULT_ROLES_PATH: %s ' % default_roles_path) + + collections_path = opt_help.argparse.ArgumentParser(add_help=False) + collections_path.add_argument('-p', '--collections-path', dest='collections_path', type=opt_help.unfrack_path(pathsep=True), + default=AnsibleCollectionConfig.collection_paths, + action=opt_help.PrependListAction, + help="One or more directories to search for collections in addition " + "to the default COLLECTIONS_PATHS. Separate multiple paths " + "with '{0}'.".format(os.path.pathsep)) + + cache_options = opt_help.argparse.ArgumentParser(add_help=False) + cache_options.add_argument('--clear-response-cache', dest='clear_response_cache', action='store_true', + default=False, help='Clear the existing server response cache.') + cache_options.add_argument('--no-cache', dest='no_cache', action='store_true', default=False, + help='Do not use the server response cache.') + + # Add sub parser for the Galaxy role type (role or collection) + type_parser = self.parser.add_subparsers(metavar='TYPE', dest='type') + type_parser.required = True + + # Add sub parser for the Galaxy collection actions + collection = type_parser.add_parser('collection', help='Manage an Ansible Galaxy collection.') + collection_parser = collection.add_subparsers(metavar='COLLECTION_ACTION', dest='action') + collection_parser.required = True + self.add_download_options(collection_parser, parents=[common, cache_options]) + self.add_init_options(collection_parser, parents=[common, force]) + self.add_build_options(collection_parser, parents=[common, force]) + self.add_publish_options(collection_parser, parents=[common]) + self.add_install_options(collection_parser, parents=[common, force, cache_options]) + self.add_list_options(collection_parser, parents=[common, collections_path]) + self.add_verify_options(collection_parser, parents=[common, collections_path]) + + # Add sub parser for the Galaxy role actions + role = type_parser.add_parser('role', help='Manage an Ansible Galaxy role.') + role_parser = role.add_subparsers(metavar='ROLE_ACTION', dest='action') + role_parser.required = True + self.add_init_options(role_parser, parents=[common, force, offline]) + self.add_remove_options(role_parser, parents=[common, roles_path]) + self.add_delete_options(role_parser, parents=[common, github]) + self.add_list_options(role_parser, parents=[common, roles_path]) + self.add_search_options(role_parser, parents=[common]) + self.add_import_options(role_parser, parents=[common, github]) + self.add_setup_options(role_parser, parents=[common, roles_path]) + + self.add_info_options(role_parser, parents=[common, roles_path, offline]) + self.add_install_options(role_parser, parents=[common, force, roles_path]) + + def add_download_options(self, parser, parents=None): + download_parser = parser.add_parser('download', parents=parents, + help='Download collections and their dependencies as a tarball for an ' + 'offline install.') + download_parser.set_defaults(func=self.execute_download) + + download_parser.add_argument('args', help='Collection(s)', metavar='collection', nargs='*') + + download_parser.add_argument('-n', '--no-deps', dest='no_deps', action='store_true', default=False, + help="Don't download collection(s) listed as dependencies.") + + download_parser.add_argument('-p', '--download-path', dest='download_path', + default='./collections', + help='The directory to download the collections to.') + download_parser.add_argument('-r', '--requirements-file', dest='requirements', + help='A file containing a list of collections to be downloaded.') + download_parser.add_argument('--pre', dest='allow_pre_release', action='store_true', + help='Include pre-release versions. Semantic versioning pre-releases are ignored by default') + + def add_init_options(self, parser, parents=None): + galaxy_type = 'collection' if parser.metavar == 'COLLECTION_ACTION' else 'role' + + init_parser = parser.add_parser('init', parents=parents, + help='Initialize new {0} with the base structure of a ' + '{0}.'.format(galaxy_type)) + init_parser.set_defaults(func=self.execute_init) + + init_parser.add_argument('--init-path', dest='init_path', default='./', + help='The path in which the skeleton {0} will be created. The default is the ' + 'current working directory.'.format(galaxy_type)) + init_parser.add_argument('--{0}-skeleton'.format(galaxy_type), dest='{0}_skeleton'.format(galaxy_type), + default=C.GALAXY_COLLECTION_SKELETON if galaxy_type == 'collection' else C.GALAXY_ROLE_SKELETON, + help='The path to a {0} skeleton that the new {0} should be based ' + 'upon.'.format(galaxy_type)) + + obj_name_kwargs = {} + if galaxy_type == 'collection': + obj_name_kwargs['type'] = validate_collection_name + init_parser.add_argument('{0}_name'.format(galaxy_type), help='{0} name'.format(galaxy_type.capitalize()), + **obj_name_kwargs) + + if galaxy_type == 'role': + init_parser.add_argument('--type', dest='role_type', action='store', default='default', + help="Initialize using an alternate role type. Valid types include: 'container', " + "'apb' and 'network'.") + + def add_remove_options(self, parser, parents=None): + remove_parser = parser.add_parser('remove', parents=parents, help='Delete roles from roles_path.') + remove_parser.set_defaults(func=self.execute_remove) + + remove_parser.add_argument('args', help='Role(s)', metavar='role', nargs='+') + + def add_delete_options(self, parser, parents=None): + delete_parser = parser.add_parser('delete', parents=parents, + help='Removes the role from Galaxy. It does not remove or alter the actual ' + 'GitHub repository.') + delete_parser.set_defaults(func=self.execute_delete) + + def add_list_options(self, parser, parents=None): + galaxy_type = 'role' + if parser.metavar == 'COLLECTION_ACTION': + galaxy_type = 'collection' + + list_parser = parser.add_parser('list', parents=parents, + help='Show the name and version of each {0} installed in the {0}s_path.'.format(galaxy_type)) + + list_parser.set_defaults(func=self.execute_list) + + list_parser.add_argument(galaxy_type, help=galaxy_type.capitalize(), nargs='?', metavar=galaxy_type) + + if galaxy_type == 'collection': + list_parser.add_argument('--format', dest='output_format', choices=('human', 'yaml', 'json'), default='human', + help="Format to display the list of collections in.") + + def add_search_options(self, parser, parents=None): + search_parser = parser.add_parser('search', parents=parents, + help='Search the Galaxy database by tags, platforms, author and multiple ' + 'keywords.') + search_parser.set_defaults(func=self.execute_search) + + search_parser.add_argument('--platforms', dest='platforms', help='list of OS platforms to filter by') + search_parser.add_argument('--galaxy-tags', dest='galaxy_tags', help='list of galaxy tags to filter by') + search_parser.add_argument('--author', dest='author', help='GitHub username') + search_parser.add_argument('args', help='Search terms', metavar='searchterm', nargs='*') + + def add_import_options(self, parser, parents=None): + import_parser = parser.add_parser('import', parents=parents, help='Import a role into a galaxy server') + import_parser.set_defaults(func=self.execute_import) + + import_parser.add_argument('--no-wait', dest='wait', action='store_false', default=True, + help="Don't wait for import results.") + import_parser.add_argument('--branch', dest='reference', + help='The name of a branch to import. Defaults to the repository\'s default branch ' + '(usually master)') + import_parser.add_argument('--role-name', dest='role_name', + help='The name the role should have, if different than the repo name') + import_parser.add_argument('--status', dest='check_status', action='store_true', default=False, + help='Check the status of the most recent import request for given github_' + 'user/github_repo.') + + def add_setup_options(self, parser, parents=None): + setup_parser = parser.add_parser('setup', parents=parents, + help='Manage the integration between Galaxy and the given source.') + setup_parser.set_defaults(func=self.execute_setup) + + setup_parser.add_argument('--remove', dest='remove_id', default=None, + help='Remove the integration matching the provided ID value. Use --list to see ' + 'ID values.') + setup_parser.add_argument('--list', dest="setup_list", action='store_true', default=False, + help='List all of your integrations.') + setup_parser.add_argument('source', help='Source') + setup_parser.add_argument('github_user', help='GitHub username') + setup_parser.add_argument('github_repo', help='GitHub repository') + setup_parser.add_argument('secret', help='Secret') + + def add_info_options(self, parser, parents=None): + info_parser = parser.add_parser('info', parents=parents, help='View more details about a specific role.') + info_parser.set_defaults(func=self.execute_info) + + info_parser.add_argument('args', nargs='+', help='role', metavar='role_name[,version]') + + def add_verify_options(self, parser, parents=None): + galaxy_type = 'collection' + verify_parser = parser.add_parser('verify', parents=parents, help='Compare checksums with the collection(s) ' + 'found on the server and the installed copy. This does not verify dependencies.') + verify_parser.set_defaults(func=self.execute_verify) + + verify_parser.add_argument('args', metavar='{0}_name'.format(galaxy_type), nargs='*', help='The installed collection(s) name. ' + 'This is mutually exclusive with --requirements-file.') + verify_parser.add_argument('-i', '--ignore-errors', dest='ignore_errors', action='store_true', default=False, + help='Ignore errors during verification and continue with the next specified collection.') + verify_parser.add_argument('--offline', dest='offline', action='store_true', default=False, + help='Validate collection integrity locally without contacting server for ' + 'canonical manifest hash.') + verify_parser.add_argument('-r', '--requirements-file', dest='requirements', + help='A file containing a list of collections to be verified.') + verify_parser.add_argument('--keyring', dest='keyring', default=C.GALAXY_GPG_KEYRING, + help='The keyring used during signature verification') # Eventually default to ~/.ansible/pubring.kbx? + verify_parser.add_argument('--signature', dest='signatures', action='append', + help='An additional signature source to verify the authenticity of the MANIFEST.json before using ' + 'it to verify the rest of the contents of a collection from a Galaxy server. Use in ' + 'conjunction with a positional collection name (mutually exclusive with --requirements-file).') + valid_signature_count_help = 'The number of signatures that must successfully verify the collection. This should be a positive integer ' \ + 'or all to signify that all signatures must be used to verify the collection. ' \ + 'Prepend the value with + to fail if no valid signatures are found for the collection (e.g. +all).' + ignore_gpg_status_help = 'A status code to ignore during signature verification (for example, NO_PUBKEY). ' \ + 'Provide this option multiple times to ignore a list of status codes. ' \ + 'Descriptions for the choices can be seen at L(https://github.com/gpg/gnupg/blob/master/doc/DETAILS#general-status-codes).' + verify_parser.add_argument('--required-valid-signature-count', dest='required_valid_signature_count', type=validate_signature_count, + help=valid_signature_count_help, default=C.GALAXY_REQUIRED_VALID_SIGNATURE_COUNT) + verify_parser.add_argument('--ignore-signature-status-code', dest='ignore_gpg_errors', type=str, action='append', + help=ignore_gpg_status_help, default=C.GALAXY_IGNORE_INVALID_SIGNATURE_STATUS_CODES, + choices=list(GPG_ERROR_MAP.keys())) + + def add_install_options(self, parser, parents=None): + galaxy_type = 'collection' if parser.metavar == 'COLLECTION_ACTION' else 'role' + + args_kwargs = {} + if galaxy_type == 'collection': + args_kwargs['help'] = 'The collection(s) name or path/url to a tar.gz collection artifact. This is ' \ + 'mutually exclusive with --requirements-file.' + ignore_errors_help = 'Ignore errors during installation and continue with the next specified ' \ + 'collection. This will not ignore dependency conflict errors.' + else: + args_kwargs['help'] = 'Role name, URL or tar file' + ignore_errors_help = 'Ignore errors and continue with the next specified role.' + + install_parser = parser.add_parser('install', parents=parents, + help='Install {0}(s) from file(s), URL(s) or Ansible ' + 'Galaxy'.format(galaxy_type)) + install_parser.set_defaults(func=self.execute_install) + + install_parser.add_argument('args', metavar='{0}_name'.format(galaxy_type), nargs='*', **args_kwargs) + install_parser.add_argument('-i', '--ignore-errors', dest='ignore_errors', action='store_true', default=False, + help=ignore_errors_help) + + install_exclusive = install_parser.add_mutually_exclusive_group() + install_exclusive.add_argument('-n', '--no-deps', dest='no_deps', action='store_true', default=False, + help="Don't download {0}s listed as dependencies.".format(galaxy_type)) + install_exclusive.add_argument('--force-with-deps', dest='force_with_deps', action='store_true', default=False, + help="Force overwriting an existing {0} and its " + "dependencies.".format(galaxy_type)) + + valid_signature_count_help = 'The number of signatures that must successfully verify the collection. This should be a positive integer ' \ + 'or -1 to signify that all signatures must be used to verify the collection. ' \ + 'Prepend the value with + to fail if no valid signatures are found for the collection (e.g. +all).' + ignore_gpg_status_help = 'A status code to ignore during signature verification (for example, NO_PUBKEY). ' \ + 'Provide this option multiple times to ignore a list of status codes. ' \ + 'Descriptions for the choices can be seen at L(https://github.com/gpg/gnupg/blob/master/doc/DETAILS#general-status-codes).' + + if galaxy_type == 'collection': + install_parser.add_argument('-p', '--collections-path', dest='collections_path', + default=self._get_default_collection_path(), + help='The path to the directory containing your collections.') + install_parser.add_argument('-r', '--requirements-file', dest='requirements', + help='A file containing a list of collections to be installed.') + install_parser.add_argument('--pre', dest='allow_pre_release', action='store_true', + help='Include pre-release versions. Semantic versioning pre-releases are ignored by default') + install_parser.add_argument('-U', '--upgrade', dest='upgrade', action='store_true', default=False, + help='Upgrade installed collection artifacts. This will also update dependencies unless --no-deps is provided') + install_parser.add_argument('--keyring', dest='keyring', default=C.GALAXY_GPG_KEYRING, + help='The keyring used during signature verification') # Eventually default to ~/.ansible/pubring.kbx? + install_parser.add_argument('--disable-gpg-verify', dest='disable_gpg_verify', action='store_true', + default=C.GALAXY_DISABLE_GPG_VERIFY, + help='Disable GPG signature verification when installing collections from a Galaxy server') + install_parser.add_argument('--signature', dest='signatures', action='append', + help='An additional signature source to verify the authenticity of the MANIFEST.json before ' + 'installing the collection from a Galaxy server. Use in conjunction with a positional ' + 'collection name (mutually exclusive with --requirements-file).') + install_parser.add_argument('--required-valid-signature-count', dest='required_valid_signature_count', type=validate_signature_count, + help=valid_signature_count_help, default=C.GALAXY_REQUIRED_VALID_SIGNATURE_COUNT) + install_parser.add_argument('--ignore-signature-status-code', dest='ignore_gpg_errors', type=str, action='append', + help=ignore_gpg_status_help, default=C.GALAXY_IGNORE_INVALID_SIGNATURE_STATUS_CODES, + choices=list(GPG_ERROR_MAP.keys())) + install_parser.add_argument('--offline', dest='offline', action='store_true', default=False, + help='Install collection artifacts (tarballs) without contacting any distribution servers. ' + 'This does not apply to collections in remote Git repositories or URLs to remote tarballs.' + ) + else: + install_parser.add_argument('-r', '--role-file', dest='requirements', + help='A file containing a list of roles to be installed.') + + r_re = re.compile(r'^(?<!-)-[a-zA-Z]*r[a-zA-Z]*') # -r, -fr + contains_r = bool([a for a in self._raw_args if r_re.match(a)]) + role_file_re = re.compile(r'--role-file($|=)') # --role-file foo, --role-file=foo + contains_role_file = bool([a for a in self._raw_args if role_file_re.match(a)]) + if self._implicit_role and (contains_r or contains_role_file): + # Any collections in the requirements files will also be installed + install_parser.add_argument('--keyring', dest='keyring', default=C.GALAXY_GPG_KEYRING, + help='The keyring used during collection signature verification') + install_parser.add_argument('--disable-gpg-verify', dest='disable_gpg_verify', action='store_true', + default=C.GALAXY_DISABLE_GPG_VERIFY, + help='Disable GPG signature verification when installing collections from a Galaxy server') + install_parser.add_argument('--required-valid-signature-count', dest='required_valid_signature_count', type=validate_signature_count, + help=valid_signature_count_help, default=C.GALAXY_REQUIRED_VALID_SIGNATURE_COUNT) + install_parser.add_argument('--ignore-signature-status-code', dest='ignore_gpg_errors', type=str, action='append', + help=ignore_gpg_status_help, default=C.GALAXY_IGNORE_INVALID_SIGNATURE_STATUS_CODES, + choices=list(GPG_ERROR_MAP.keys())) + + install_parser.add_argument('-g', '--keep-scm-meta', dest='keep_scm_meta', action='store_true', + default=False, + help='Use tar instead of the scm archive option when packaging the role.') + + def add_build_options(self, parser, parents=None): + build_parser = parser.add_parser('build', parents=parents, + help='Build an Ansible collection artifact that can be published to Ansible ' + 'Galaxy.') + build_parser.set_defaults(func=self.execute_build) + + build_parser.add_argument('args', metavar='collection', nargs='*', default=('.',), + help='Path to the collection(s) directory to build. This should be the directory ' + 'that contains the galaxy.yml file. The default is the current working ' + 'directory.') + build_parser.add_argument('--output-path', dest='output_path', default='./', + help='The path in which the collection is built to. The default is the current ' + 'working directory.') + + def add_publish_options(self, parser, parents=None): + publish_parser = parser.add_parser('publish', parents=parents, + help='Publish a collection artifact to Ansible Galaxy.') + publish_parser.set_defaults(func=self.execute_publish) + + publish_parser.add_argument('args', metavar='collection_path', + help='The path to the collection tarball to publish.') + publish_parser.add_argument('--no-wait', dest='wait', action='store_false', default=True, + help="Don't wait for import validation results.") + publish_parser.add_argument('--import-timeout', dest='import_timeout', type=int, default=0, + help="The time to wait for the collection import process to finish.") + + def post_process_args(self, options): + options = super(GalaxyCLI, self).post_process_args(options) + + # ensure we have 'usable' cli option + setattr(options, 'validate_certs', (None if options.ignore_certs is None else not options.ignore_certs)) + # the default if validate_certs is None + setattr(options, 'resolved_validate_certs', (options.validate_certs if options.validate_certs is not None else not C.GALAXY_IGNORE_CERTS)) + + display.verbosity = options.verbosity + return options + + def run(self): + + super(GalaxyCLI, self).run() + + self.galaxy = Galaxy() + + def server_config_def(section, key, required, option_type): + config_def = { + 'description': 'The %s of the %s Galaxy server' % (key, section), + 'ini': [ + { + 'section': 'galaxy_server.%s' % section, + 'key': key, + } + ], + 'env': [ + {'name': 'ANSIBLE_GALAXY_SERVER_%s_%s' % (section.upper(), key.upper())}, + ], + 'required': required, + 'type': option_type, + } + if key in SERVER_ADDITIONAL: + config_def.update(SERVER_ADDITIONAL[key]) + + return config_def + + galaxy_options = {} + for optional_key in ['clear_response_cache', 'no_cache', 'timeout']: + if optional_key in context.CLIARGS: + galaxy_options[optional_key] = context.CLIARGS[optional_key] + + config_servers = [] + + # Need to filter out empty strings or non truthy values as an empty server list env var is equal to ['']. + server_list = [s for s in C.GALAXY_SERVER_LIST or [] if s] + for server_priority, server_key in enumerate(server_list, start=1): + # Abuse the 'plugin config' by making 'galaxy_server' a type of plugin + # Config definitions are looked up dynamically based on the C.GALAXY_SERVER_LIST entry. We look up the + # section [galaxy_server.<server>] for the values url, username, password, and token. + config_dict = dict((k, server_config_def(server_key, k, req, ensure_type)) for k, req, ensure_type in SERVER_DEF) + defs = AnsibleLoader(yaml_dump(config_dict)).get_single_data() + C.config.initialize_plugin_configuration_definitions('galaxy_server', server_key, defs) + + # resolve the config created options above with existing config and user options + server_options = C.config.get_plugin_options('galaxy_server', server_key) + + # auth_url is used to create the token, but not directly by GalaxyAPI, so + # it doesn't need to be passed as kwarg to GalaxyApi, same for others we pop here + auth_url = server_options.pop('auth_url') + client_id = server_options.pop('client_id') + token_val = server_options['token'] or NoTokenSentinel + username = server_options['username'] + v3 = server_options.pop('v3') + if server_options['validate_certs'] is None: + server_options['validate_certs'] = context.CLIARGS['resolved_validate_certs'] + validate_certs = server_options['validate_certs'] + + if v3: + # This allows a user to explicitly indicate the server uses the /v3 API + # This was added for testing against pulp_ansible and I'm not sure it has + # a practical purpose outside of this use case. As such, this option is not + # documented as of now + server_options['available_api_versions'] = {'v3': '/v3'} + + # default case if no auth info is provided. + server_options['token'] = None + + if username: + server_options['token'] = BasicAuthToken(username, server_options['password']) + else: + if token_val: + if auth_url: + server_options['token'] = KeycloakToken(access_token=token_val, + auth_url=auth_url, + validate_certs=validate_certs, + client_id=client_id) + else: + # The galaxy v1 / github / django / 'Token' + server_options['token'] = GalaxyToken(token=token_val) + + server_options.update(galaxy_options) + config_servers.append(GalaxyAPI( + self.galaxy, server_key, + priority=server_priority, + **server_options + )) + + cmd_server = context.CLIARGS['api_server'] + cmd_token = GalaxyToken(token=context.CLIARGS['api_key']) + + validate_certs = context.CLIARGS['resolved_validate_certs'] + if cmd_server: + # Cmd args take precedence over the config entry but fist check if the arg was a name and use that config + # entry, otherwise create a new API entry for the server specified. + config_server = next((s for s in config_servers if s.name == cmd_server), None) + if config_server: + self.api_servers.append(config_server) + else: + self.api_servers.append(GalaxyAPI( + self.galaxy, 'cmd_arg', cmd_server, token=cmd_token, + priority=len(config_servers) + 1, + validate_certs=validate_certs, + **galaxy_options + )) + else: + self.api_servers = config_servers + + # Default to C.GALAXY_SERVER if no servers were defined + if len(self.api_servers) == 0: + self.api_servers.append(GalaxyAPI( + self.galaxy, 'default', C.GALAXY_SERVER, token=cmd_token, + priority=0, + validate_certs=validate_certs, + **galaxy_options + )) + + # checks api versions once a GalaxyRole makes an api call + # self.api can be used to evaluate the best server immediately + self.lazy_role_api = RoleDistributionServer(None, self.api_servers) + + return context.CLIARGS['func']() + + @property + def api(self): + return self.lazy_role_api.api + + def _get_default_collection_path(self): + return C.COLLECTIONS_PATHS[0] + + def _parse_requirements_file(self, requirements_file, allow_old_format=True, artifacts_manager=None, validate_signature_options=True): + """ + Parses an Ansible requirement.yml file and returns all the roles and/or collections defined in it. There are 2 + requirements file format: + + # v1 (roles only) + - src: The source of the role, required if include is not set. Can be Galaxy role name, URL to a SCM repo or tarball. + name: Downloads the role to the specified name, defaults to Galaxy name from Galaxy or name of repo if src is a URL. + scm: If src is a URL, specify the SCM. Only git or hd are supported and defaults ot git. + version: The version of the role to download. Can also be tag, commit, or branch name and defaults to master. + include: Path to additional requirements.yml files. + + # v2 (roles and collections) + --- + roles: + # Same as v1 format just under the roles key + + collections: + - namespace.collection + - name: namespace.collection + version: version identifier, multiple identifiers are separated by ',' + source: the URL or a predefined source name that relates to C.GALAXY_SERVER_LIST + type: git|file|url|galaxy + + :param requirements_file: The path to the requirements file. + :param allow_old_format: Will fail if a v1 requirements file is found and this is set to False. + :param artifacts_manager: Artifacts manager. + :return: a dict containing roles and collections to found in the requirements file. + """ + requirements = { + 'roles': [], + 'collections': [], + } + + b_requirements_file = to_bytes(requirements_file, errors='surrogate_or_strict') + if not os.path.exists(b_requirements_file): + raise AnsibleError("The requirements file '%s' does not exist." % to_native(requirements_file)) + + display.vvv("Reading requirement file at '%s'" % requirements_file) + with open(b_requirements_file, 'rb') as req_obj: + try: + file_requirements = yaml_load(req_obj) + except YAMLError as err: + raise AnsibleError( + "Failed to parse the requirements yml at '%s' with the following error:\n%s" + % (to_native(requirements_file), to_native(err))) + + if file_requirements is None: + raise AnsibleError("No requirements found in file '%s'" % to_native(requirements_file)) + + def parse_role_req(requirement): + if "include" not in requirement: + role = RoleRequirement.role_yaml_parse(requirement) + display.vvv("found role %s in yaml file" % to_text(role)) + if "name" not in role and "src" not in role: + raise AnsibleError("Must specify name or src for role") + return [GalaxyRole(self.galaxy, self.lazy_role_api, **role)] + else: + b_include_path = to_bytes(requirement["include"], errors="surrogate_or_strict") + if not os.path.isfile(b_include_path): + raise AnsibleError("Failed to find include requirements file '%s' in '%s'" + % (to_native(b_include_path), to_native(requirements_file))) + + with open(b_include_path, 'rb') as f_include: + try: + return [GalaxyRole(self.galaxy, self.lazy_role_api, **r) for r in + (RoleRequirement.role_yaml_parse(i) for i in yaml_load(f_include))] + except Exception as e: + raise AnsibleError("Unable to load data from include requirements file: %s %s" + % (to_native(requirements_file), to_native(e))) + + if isinstance(file_requirements, list): + # Older format that contains only roles + if not allow_old_format: + raise AnsibleError("Expecting requirements file to be a dict with the key 'collections' that contains " + "a list of collections to install") + + for role_req in file_requirements: + requirements['roles'] += parse_role_req(role_req) + + else: + # Newer format with a collections and/or roles key + extra_keys = set(file_requirements.keys()).difference(set(['roles', 'collections'])) + if extra_keys: + raise AnsibleError("Expecting only 'roles' and/or 'collections' as base keys in the requirements " + "file. Found: %s" % (to_native(", ".join(extra_keys)))) + + for role_req in file_requirements.get('roles') or []: + requirements['roles'] += parse_role_req(role_req) + + requirements['collections'] = [ + Requirement.from_requirement_dict( + self._init_coll_req_dict(collection_req), + artifacts_manager, + validate_signature_options, + ) + for collection_req in file_requirements.get('collections') or [] + ] + + return requirements + + def _init_coll_req_dict(self, coll_req): + if not isinstance(coll_req, dict): + # Assume it's a string: + return {'name': coll_req} + + if ( + 'name' not in coll_req or + not coll_req.get('source') or + coll_req.get('type', 'galaxy') != 'galaxy' + ): + return coll_req + + # Try and match up the requirement source with our list of Galaxy API + # servers defined in the config, otherwise create a server with that + # URL without any auth. + coll_req['source'] = next( + iter( + srvr for srvr in self.api_servers + if coll_req['source'] in {srvr.name, srvr.api_server} + ), + GalaxyAPI( + self.galaxy, + 'explicit_requirement_{name!s}'.format( + name=coll_req['name'], + ), + coll_req['source'], + validate_certs=context.CLIARGS['resolved_validate_certs'], + ), + ) + + return coll_req + + @staticmethod + def exit_without_ignore(rc=1): + """ + Exits with the specified return code unless the + option --ignore-errors was specified + """ + if not context.CLIARGS['ignore_errors']: + raise AnsibleError('- you can use --ignore-errors to skip failed roles and finish processing the list.') + + @staticmethod + def _display_role_info(role_info): + + text = [u"", u"Role: %s" % to_text(role_info['name'])] + + # Get the top-level 'description' first, falling back to galaxy_info['galaxy_info']['description']. + galaxy_info = role_info.get('galaxy_info', {}) + description = role_info.get('description', galaxy_info.get('description', '')) + text.append(u"\tdescription: %s" % description) + + for k in sorted(role_info.keys()): + + if k in GalaxyCLI.SKIP_INFO_KEYS: + continue + + if isinstance(role_info[k], dict): + text.append(u"\t%s:" % (k)) + for key in sorted(role_info[k].keys()): + if key in GalaxyCLI.SKIP_INFO_KEYS: + continue + text.append(u"\t\t%s: %s" % (key, role_info[k][key])) + else: + text.append(u"\t%s: %s" % (k, role_info[k])) + + # make sure we have a trailing newline returned + text.append(u"") + return u'\n'.join(text) + + @staticmethod + def _resolve_path(path): + return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) + + @staticmethod + def _get_skeleton_galaxy_yml(template_path, inject_data): + with open(to_bytes(template_path, errors='surrogate_or_strict'), 'rb') as template_obj: + meta_template = to_text(template_obj.read(), errors='surrogate_or_strict') + + galaxy_meta = get_collections_galaxy_meta_info() + + required_config = [] + optional_config = [] + for meta_entry in galaxy_meta: + config_list = required_config if meta_entry.get('required', False) else optional_config + + value = inject_data.get(meta_entry['key'], None) + if not value: + meta_type = meta_entry.get('type', 'str') + + if meta_type == 'str': + value = '' + elif meta_type == 'list': + value = [] + elif meta_type == 'dict': + value = {} + + meta_entry['value'] = value + config_list.append(meta_entry) + + link_pattern = re.compile(r"L\(([^)]+),\s+([^)]+)\)") + const_pattern = re.compile(r"C\(([^)]+)\)") + + def comment_ify(v): + if isinstance(v, list): + v = ". ".join([l.rstrip('.') for l in v]) + + v = link_pattern.sub(r"\1 <\2>", v) + v = const_pattern.sub(r"'\1'", v) + + return textwrap.fill(v, width=117, initial_indent="# ", subsequent_indent="# ", break_on_hyphens=False) + + loader = DataLoader() + templar = Templar(loader, variables={'required_config': required_config, 'optional_config': optional_config}) + templar.environment.filters['comment_ify'] = comment_ify + + meta_value = templar.template(meta_template) + + return meta_value + + def _require_one_of_collections_requirements( + self, collections, requirements_file, + signatures=None, + artifacts_manager=None, + ): + if collections and requirements_file: + raise AnsibleError("The positional collection_name arg and --requirements-file are mutually exclusive.") + elif not collections and not requirements_file: + raise AnsibleError("You must specify a collection name or a requirements file.") + elif requirements_file: + if signatures is not None: + raise AnsibleError( + "The --signatures option and --requirements-file are mutually exclusive. " + "Use the --signatures with positional collection_name args or provide a " + "'signatures' key for requirements in the --requirements-file." + ) + requirements_file = GalaxyCLI._resolve_path(requirements_file) + requirements = self._parse_requirements_file( + requirements_file, + allow_old_format=False, + artifacts_manager=artifacts_manager, + ) + else: + requirements = { + 'collections': [ + Requirement.from_string(coll_input, artifacts_manager, signatures) + for coll_input in collections + ], + 'roles': [], + } + return requirements + + ############################ + # execute actions + ############################ + + def execute_role(self): + """ + Perform the action on an Ansible Galaxy role. Must be combined with a further action like delete/install/init + as listed below. + """ + # To satisfy doc build + pass + + def execute_collection(self): + """ + Perform the action on an Ansible Galaxy collection. Must be combined with a further action like init/install as + listed below. + """ + # To satisfy doc build + pass + + def execute_build(self): + """ + Build an Ansible Galaxy collection artifact that can be stored in a central repository like Ansible Galaxy. + By default, this command builds from the current working directory. You can optionally pass in the + collection input path (where the ``galaxy.yml`` file is). + """ + force = context.CLIARGS['force'] + output_path = GalaxyCLI._resolve_path(context.CLIARGS['output_path']) + b_output_path = to_bytes(output_path, errors='surrogate_or_strict') + + if not os.path.exists(b_output_path): + os.makedirs(b_output_path) + elif os.path.isfile(b_output_path): + raise AnsibleError("- the output collection directory %s is a file - aborting" % to_native(output_path)) + + for collection_path in context.CLIARGS['args']: + collection_path = GalaxyCLI._resolve_path(collection_path) + build_collection( + to_text(collection_path, errors='surrogate_or_strict'), + to_text(output_path, errors='surrogate_or_strict'), + force, + ) + + @with_collection_artifacts_manager + def execute_download(self, artifacts_manager=None): + collections = context.CLIARGS['args'] + no_deps = context.CLIARGS['no_deps'] + download_path = context.CLIARGS['download_path'] + + requirements_file = context.CLIARGS['requirements'] + if requirements_file: + requirements_file = GalaxyCLI._resolve_path(requirements_file) + + requirements = self._require_one_of_collections_requirements( + collections, requirements_file, + artifacts_manager=artifacts_manager, + )['collections'] + + download_path = GalaxyCLI._resolve_path(download_path) + b_download_path = to_bytes(download_path, errors='surrogate_or_strict') + if not os.path.exists(b_download_path): + os.makedirs(b_download_path) + + download_collections( + requirements, download_path, self.api_servers, no_deps, + context.CLIARGS['allow_pre_release'], + artifacts_manager=artifacts_manager, + ) + + return 0 + + def execute_init(self): + """ + Creates the skeleton framework of a role or collection that complies with the Galaxy metadata format. + Requires a role or collection name. The collection name must be in the format ``<namespace>.<collection>``. + """ + + galaxy_type = context.CLIARGS['type'] + init_path = context.CLIARGS['init_path'] + force = context.CLIARGS['force'] + obj_skeleton = context.CLIARGS['{0}_skeleton'.format(galaxy_type)] + + obj_name = context.CLIARGS['{0}_name'.format(galaxy_type)] + + inject_data = dict( + description='your {0} description'.format(galaxy_type), + ansible_plugin_list_dir=get_versioned_doclink('plugins/plugins.html'), + ) + if galaxy_type == 'role': + inject_data.update(dict( + author='your name', + company='your company (optional)', + license='license (GPL-2.0-or-later, MIT, etc)', + role_name=obj_name, + role_type=context.CLIARGS['role_type'], + issue_tracker_url='http://example.com/issue/tracker', + repository_url='http://example.com/repository', + documentation_url='http://docs.example.com', + homepage_url='http://example.com', + min_ansible_version=ansible_version[:3], # x.y + dependencies=[], + )) + + skeleton_ignore_expressions = C.GALAXY_ROLE_SKELETON_IGNORE + obj_path = os.path.join(init_path, obj_name) + elif galaxy_type == 'collection': + namespace, collection_name = obj_name.split('.', 1) + + inject_data.update(dict( + namespace=namespace, + collection_name=collection_name, + version='1.0.0', + readme='README.md', + authors=['your name <example@domain.com>'], + license=['GPL-2.0-or-later'], + repository='http://example.com/repository', + documentation='http://docs.example.com', + homepage='http://example.com', + issues='http://example.com/issue/tracker', + build_ignore=[], + )) + + skeleton_ignore_expressions = C.GALAXY_COLLECTION_SKELETON_IGNORE + obj_path = os.path.join(init_path, namespace, collection_name) + + b_obj_path = to_bytes(obj_path, errors='surrogate_or_strict') + + if os.path.exists(b_obj_path): + if os.path.isfile(obj_path): + raise AnsibleError("- the path %s already exists, but is a file - aborting" % to_native(obj_path)) + elif not force: + raise AnsibleError("- the directory %s already exists. " + "You can use --force to re-initialize this directory,\n" + "however it will reset any main.yml files that may have\n" + "been modified there already." % to_native(obj_path)) + + # delete the contents rather than the collection root in case init was run from the root (--init-path ../../) + for root, dirs, files in os.walk(b_obj_path, topdown=True): + for old_dir in dirs: + path = os.path.join(root, old_dir) + shutil.rmtree(path) + for old_file in files: + path = os.path.join(root, old_file) + os.unlink(path) + + if obj_skeleton is not None: + own_skeleton = False + else: + own_skeleton = True + obj_skeleton = self.galaxy.default_role_skeleton_path + skeleton_ignore_expressions = ['^.*/.git_keep$'] + + obj_skeleton = os.path.expanduser(obj_skeleton) + skeleton_ignore_re = [re.compile(x) for x in skeleton_ignore_expressions] + + if not os.path.exists(obj_skeleton): + raise AnsibleError("- the skeleton path '{0}' does not exist, cannot init {1}".format( + to_native(obj_skeleton), galaxy_type) + ) + + loader = DataLoader() + templar = Templar(loader, variables=inject_data) + + # create role directory + if not os.path.exists(b_obj_path): + os.makedirs(b_obj_path) + + for root, dirs, files in os.walk(obj_skeleton, topdown=True): + rel_root = os.path.relpath(root, obj_skeleton) + rel_dirs = rel_root.split(os.sep) + rel_root_dir = rel_dirs[0] + if galaxy_type == 'collection': + # A collection can contain templates in playbooks/*/templates and roles/*/templates + in_templates_dir = rel_root_dir in ['playbooks', 'roles'] and 'templates' in rel_dirs + else: + in_templates_dir = rel_root_dir == 'templates' + + # Filter out ignored directory names + # Use [:] to mutate the list os.walk uses + dirs[:] = [d for d in dirs if not any(r.match(d) for r in skeleton_ignore_re)] + + for f in files: + filename, ext = os.path.splitext(f) + + if any(r.match(os.path.join(rel_root, f)) for r in skeleton_ignore_re): + continue + + if galaxy_type == 'collection' and own_skeleton and rel_root == '.' and f == 'galaxy.yml.j2': + # Special use case for galaxy.yml.j2 in our own default collection skeleton. We build the options + # dynamically which requires special options to be set. + + # The templated data's keys must match the key name but the inject data contains collection_name + # instead of name. We just make a copy and change the key back to name for this file. + template_data = inject_data.copy() + template_data['name'] = template_data.pop('collection_name') + + meta_value = GalaxyCLI._get_skeleton_galaxy_yml(os.path.join(root, rel_root, f), template_data) + b_dest_file = to_bytes(os.path.join(obj_path, rel_root, filename), errors='surrogate_or_strict') + with open(b_dest_file, 'wb') as galaxy_obj: + galaxy_obj.write(to_bytes(meta_value, errors='surrogate_or_strict')) + elif ext == ".j2" and not in_templates_dir: + src_template = os.path.join(root, f) + dest_file = os.path.join(obj_path, rel_root, filename) + template_data = to_text(loader._get_file_contents(src_template)[0], errors='surrogate_or_strict') + b_rendered = to_bytes(templar.template(template_data), errors='surrogate_or_strict') + with open(dest_file, 'wb') as df: + df.write(b_rendered) + else: + f_rel_path = os.path.relpath(os.path.join(root, f), obj_skeleton) + shutil.copyfile(os.path.join(root, f), os.path.join(obj_path, f_rel_path)) + + for d in dirs: + b_dir_path = to_bytes(os.path.join(obj_path, rel_root, d), errors='surrogate_or_strict') + if not os.path.exists(b_dir_path): + os.makedirs(b_dir_path) + + display.display("- %s %s was created successfully" % (galaxy_type.title(), obj_name)) + + def execute_info(self): + """ + prints out detailed information about an installed role as well as info available from the galaxy API. + """ + + roles_path = context.CLIARGS['roles_path'] + + data = '' + for role in context.CLIARGS['args']: + + role_info = {'path': roles_path} + gr = GalaxyRole(self.galaxy, self.lazy_role_api, role) + + install_info = gr.install_info + if install_info: + if 'version' in install_info: + install_info['installed_version'] = install_info['version'] + del install_info['version'] + role_info.update(install_info) + + if not context.CLIARGS['offline']: + remote_data = None + try: + remote_data = self.api.lookup_role_by_name(role, False) + except AnsibleError as e: + if e.http_code == 400 and 'Bad Request' in e.message: + # Role does not exist in Ansible Galaxy + data = u"- the role %s was not found" % role + break + + raise AnsibleError("Unable to find info about '%s': %s" % (role, e)) + + if remote_data: + role_info.update(remote_data) + + elif context.CLIARGS['offline'] and not gr._exists: + data = u"- the role %s was not found" % role + break + + if gr.metadata: + role_info.update(gr.metadata) + + req = RoleRequirement() + role_spec = req.role_yaml_parse({'role': role}) + if role_spec: + role_info.update(role_spec) + + data += self._display_role_info(role_info) + + self.pager(data) + + @with_collection_artifacts_manager + def execute_verify(self, artifacts_manager=None): + + collections = context.CLIARGS['args'] + search_paths = context.CLIARGS['collections_path'] + ignore_errors = context.CLIARGS['ignore_errors'] + local_verify_only = context.CLIARGS['offline'] + requirements_file = context.CLIARGS['requirements'] + signatures = context.CLIARGS['signatures'] + if signatures is not None: + signatures = list(signatures) + + requirements = self._require_one_of_collections_requirements( + collections, requirements_file, + signatures=signatures, + artifacts_manager=artifacts_manager, + )['collections'] + + resolved_paths = [validate_collection_path(GalaxyCLI._resolve_path(path)) for path in search_paths] + + results = verify_collections( + requirements, resolved_paths, + self.api_servers, ignore_errors, + local_verify_only=local_verify_only, + artifacts_manager=artifacts_manager, + ) + + if any(result for result in results if not result.success): + return 1 + + return 0 + + @with_collection_artifacts_manager + def execute_install(self, artifacts_manager=None): + """ + Install one or more roles(``ansible-galaxy role install``), or one or more collections(``ansible-galaxy collection install``). + You can pass in a list (roles or collections) or use the file + option listed below (these are mutually exclusive). If you pass in a list, it + can be a name (which will be downloaded via the galaxy API and github), or it can be a local tar archive file. + + :param artifacts_manager: Artifacts manager. + """ + install_items = context.CLIARGS['args'] + requirements_file = context.CLIARGS['requirements'] + collection_path = None + signatures = context.CLIARGS.get('signatures') + if signatures is not None: + signatures = list(signatures) + + if requirements_file: + requirements_file = GalaxyCLI._resolve_path(requirements_file) + + two_type_warning = "The requirements file '%s' contains {0}s which will be ignored. To install these {0}s " \ + "run 'ansible-galaxy {0} install -r' or to install both at the same time run " \ + "'ansible-galaxy install -r' without a custom install path." % to_text(requirements_file) + + # TODO: Would be nice to share the same behaviour with args and -r in collections and roles. + collection_requirements = [] + role_requirements = [] + if context.CLIARGS['type'] == 'collection': + collection_path = GalaxyCLI._resolve_path(context.CLIARGS['collections_path']) + requirements = self._require_one_of_collections_requirements( + install_items, requirements_file, + signatures=signatures, + artifacts_manager=artifacts_manager, + ) + + collection_requirements = requirements['collections'] + if requirements['roles']: + display.vvv(two_type_warning.format('role')) + else: + if not install_items and requirements_file is None: + raise AnsibleOptionsError("- you must specify a user/role name or a roles file") + + if requirements_file: + if not (requirements_file.endswith('.yaml') or requirements_file.endswith('.yml')): + raise AnsibleError("Invalid role requirements file, it must end with a .yml or .yaml extension") + + galaxy_args = self._raw_args + will_install_collections = self._implicit_role and '-p' not in galaxy_args and '--roles-path' not in galaxy_args + + requirements = self._parse_requirements_file( + requirements_file, + artifacts_manager=artifacts_manager, + validate_signature_options=will_install_collections, + ) + role_requirements = requirements['roles'] + + # We can only install collections and roles at the same time if the type wasn't specified and the -p + # argument was not used. If collections are present in the requirements then at least display a msg. + if requirements['collections'] and (not self._implicit_role or '-p' in galaxy_args or + '--roles-path' in galaxy_args): + + # We only want to display a warning if 'ansible-galaxy install -r ... -p ...'. Other cases the user + # was explicit about the type and shouldn't care that collections were skipped. + display_func = display.warning if self._implicit_role else display.vvv + display_func(two_type_warning.format('collection')) + else: + collection_path = self._get_default_collection_path() + collection_requirements = requirements['collections'] + else: + # roles were specified directly, so we'll just go out grab them + # (and their dependencies, unless the user doesn't want us to). + for rname in context.CLIARGS['args']: + role = RoleRequirement.role_yaml_parse(rname.strip()) + role_requirements.append(GalaxyRole(self.galaxy, self.lazy_role_api, **role)) + + if not role_requirements and not collection_requirements: + display.display("Skipping install, no requirements found") + return + + if role_requirements: + display.display("Starting galaxy role install process") + self._execute_install_role(role_requirements) + + if collection_requirements: + display.display("Starting galaxy collection install process") + # Collections can technically be installed even when ansible-galaxy is in role mode so we need to pass in + # the install path as context.CLIARGS['collections_path'] won't be set (default is calculated above). + self._execute_install_collection( + collection_requirements, collection_path, + artifacts_manager=artifacts_manager, + ) + + def _execute_install_collection( + self, requirements, path, artifacts_manager, + ): + force = context.CLIARGS['force'] + ignore_errors = context.CLIARGS['ignore_errors'] + no_deps = context.CLIARGS['no_deps'] + force_with_deps = context.CLIARGS['force_with_deps'] + try: + disable_gpg_verify = context.CLIARGS['disable_gpg_verify'] + except KeyError: + if self._implicit_role: + raise AnsibleError( + 'Unable to properly parse command line arguments. Please use "ansible-galaxy collection install" ' + 'instead of "ansible-galaxy install".' + ) + raise + + # If `ansible-galaxy install` is used, collection-only options aren't available to the user and won't be in context.CLIARGS + allow_pre_release = context.CLIARGS.get('allow_pre_release', False) + upgrade = context.CLIARGS.get('upgrade', False) + + collections_path = C.COLLECTIONS_PATHS + if len([p for p in collections_path if p.startswith(path)]) == 0: + display.warning("The specified collections path '%s' is not part of the configured Ansible " + "collections paths '%s'. The installed collection will not be picked up in an Ansible " + "run, unless within a playbook-adjacent collections directory." % (to_text(path), to_text(":".join(collections_path)))) + + output_path = validate_collection_path(path) + b_output_path = to_bytes(output_path, errors='surrogate_or_strict') + if not os.path.exists(b_output_path): + os.makedirs(b_output_path) + + install_collections( + requirements, output_path, self.api_servers, ignore_errors, + no_deps, force, force_with_deps, upgrade, + allow_pre_release=allow_pre_release, + artifacts_manager=artifacts_manager, + disable_gpg_verify=disable_gpg_verify, + offline=context.CLIARGS.get('offline', False), + ) + + return 0 + + def _execute_install_role(self, requirements): + role_file = context.CLIARGS['requirements'] + no_deps = context.CLIARGS['no_deps'] + force_deps = context.CLIARGS['force_with_deps'] + force = context.CLIARGS['force'] or force_deps + + for role in requirements: + # only process roles in roles files when names matches if given + if role_file and context.CLIARGS['args'] and role.name not in context.CLIARGS['args']: + display.vvv('Skipping role %s' % role.name) + continue + + display.vvv('Processing role %s ' % role.name) + + # query the galaxy API for the role data + + if role.install_info is not None: + if role.install_info['version'] != role.version or force: + if force: + display.display('- changing role %s from %s to %s' % + (role.name, role.install_info['version'], role.version or "unspecified")) + role.remove() + else: + display.warning('- %s (%s) is already installed - use --force to change version to %s' % + (role.name, role.install_info['version'], role.version or "unspecified")) + continue + else: + if not force: + display.display('- %s is already installed, skipping.' % str(role)) + continue + + try: + installed = role.install() + except AnsibleError as e: + display.warning(u"- %s was NOT installed successfully: %s " % (role.name, to_text(e))) + self.exit_without_ignore() + continue + + # install dependencies, if we want them + if not no_deps and installed: + if not role.metadata: + # NOTE: the meta file is also required for installing the role, not just dependencies + display.warning("Meta file %s is empty. Skipping dependencies." % role.path) + else: + role_dependencies = role.metadata_dependencies + role.requirements + for dep in role_dependencies: + display.debug('Installing dep %s' % dep) + dep_req = RoleRequirement() + dep_info = dep_req.role_yaml_parse(dep) + dep_role = GalaxyRole(self.galaxy, self.lazy_role_api, **dep_info) + if '.' not in dep_role.name and '.' not in dep_role.src and dep_role.scm is None: + # we know we can skip this, as it's not going to + # be found on galaxy.ansible.com + continue + if dep_role.install_info is None: + if dep_role not in requirements: + display.display('- adding dependency: %s' % to_text(dep_role)) + requirements.append(dep_role) + else: + display.display('- dependency %s already pending installation.' % dep_role.name) + else: + if dep_role.install_info['version'] != dep_role.version: + if force_deps: + display.display('- changing dependent role %s from %s to %s' % + (dep_role.name, dep_role.install_info['version'], dep_role.version or "unspecified")) + dep_role.remove() + requirements.append(dep_role) + else: + display.warning('- dependency %s (%s) from role %s differs from already installed version (%s), skipping' % + (to_text(dep_role), dep_role.version, role.name, dep_role.install_info['version'])) + else: + if force_deps: + requirements.append(dep_role) + else: + display.display('- dependency %s is already installed, skipping.' % dep_role.name) + + if not installed: + display.warning("- %s was NOT installed successfully." % role.name) + self.exit_without_ignore() + + return 0 + + def execute_remove(self): + """ + removes the list of roles passed as arguments from the local system. + """ + + if not context.CLIARGS['args']: + raise AnsibleOptionsError('- you must specify at least one role to remove.') + + for role_name in context.CLIARGS['args']: + role = GalaxyRole(self.galaxy, self.api, role_name) + try: + if role.remove(): + display.display('- successfully removed %s' % role_name) + else: + display.display('- %s is not installed, skipping.' % role_name) + except Exception as e: + raise AnsibleError("Failed to remove role %s: %s" % (role_name, to_native(e))) + + return 0 + + def execute_list(self): + """ + List installed collections or roles + """ + + if context.CLIARGS['type'] == 'role': + self.execute_list_role() + elif context.CLIARGS['type'] == 'collection': + self.execute_list_collection() + + def execute_list_role(self): + """ + List all roles installed on the local system or a specific role + """ + + path_found = False + role_found = False + warnings = [] + roles_search_paths = context.CLIARGS['roles_path'] + role_name = context.CLIARGS['role'] + + for path in roles_search_paths: + role_path = GalaxyCLI._resolve_path(path) + if os.path.isdir(path): + path_found = True + else: + warnings.append("- the configured path {0} does not exist.".format(path)) + continue + + if role_name: + # show the requested role, if it exists + gr = GalaxyRole(self.galaxy, self.lazy_role_api, role_name, path=os.path.join(role_path, role_name)) + if os.path.isdir(gr.path): + role_found = True + display.display('# %s' % os.path.dirname(gr.path)) + _display_role(gr) + break + warnings.append("- the role %s was not found" % role_name) + else: + if not os.path.exists(role_path): + warnings.append("- the configured path %s does not exist." % role_path) + continue + + if not os.path.isdir(role_path): + warnings.append("- the configured path %s, exists, but it is not a directory." % role_path) + continue + + display.display('# %s' % role_path) + path_files = os.listdir(role_path) + for path_file in path_files: + gr = GalaxyRole(self.galaxy, self.lazy_role_api, path_file, path=path) + if gr.metadata: + _display_role(gr) + + # Do not warn if the role was found in any of the search paths + if role_found and role_name: + warnings = [] + + for w in warnings: + display.warning(w) + + if not path_found: + raise AnsibleOptionsError("- None of the provided paths were usable. Please specify a valid path with --{0}s-path".format(context.CLIARGS['type'])) + + return 0 + + @with_collection_artifacts_manager + def execute_list_collection(self, artifacts_manager=None): + """ + List all collections installed on the local system + + :param artifacts_manager: Artifacts manager. + """ + if artifacts_manager is not None: + artifacts_manager.require_build_metadata = False + + output_format = context.CLIARGS['output_format'] + collections_search_paths = set(context.CLIARGS['collections_path']) + collection_name = context.CLIARGS['collection'] + default_collections_path = AnsibleCollectionConfig.collection_paths + collections_in_paths = {} + + warnings = [] + path_found = False + collection_found = False + for path in collections_search_paths: + collection_path = GalaxyCLI._resolve_path(path) + if not os.path.exists(path): + if path in default_collections_path: + # don't warn for missing default paths + continue + warnings.append("- the configured path {0} does not exist.".format(collection_path)) + continue + + if not os.path.isdir(collection_path): + warnings.append("- the configured path {0}, exists, but it is not a directory.".format(collection_path)) + continue + + path_found = True + + if collection_name: + # list a specific collection + + validate_collection_name(collection_name) + namespace, collection = collection_name.split('.') + + collection_path = validate_collection_path(collection_path) + b_collection_path = to_bytes(os.path.join(collection_path, namespace, collection), errors='surrogate_or_strict') + + if not os.path.exists(b_collection_path): + warnings.append("- unable to find {0} in collection paths".format(collection_name)) + continue + + if not os.path.isdir(collection_path): + warnings.append("- the configured path {0}, exists, but it is not a directory.".format(collection_path)) + continue + + collection_found = True + + try: + collection = Requirement.from_dir_path_as_unknown( + b_collection_path, + artifacts_manager, + ) + except ValueError as val_err: + six.raise_from(AnsibleError(val_err), val_err) + + if output_format in {'yaml', 'json'}: + collections_in_paths[collection_path] = { + collection.fqcn: {'version': collection.ver} + } + + continue + + fqcn_width, version_width = _get_collection_widths([collection]) + + _display_header(collection_path, 'Collection', 'Version', fqcn_width, version_width) + _display_collection(collection, fqcn_width, version_width) + + else: + # list all collections + collection_path = validate_collection_path(path) + if os.path.isdir(collection_path): + display.vvv("Searching {0} for collections".format(collection_path)) + collections = list(find_existing_collections( + collection_path, artifacts_manager, + )) + else: + # There was no 'ansible_collections/' directory in the path, so there + # or no collections here. + display.vvv("No 'ansible_collections' directory found at {0}".format(collection_path)) + continue + + if not collections: + display.vvv("No collections found at {0}".format(collection_path)) + continue + + if output_format in {'yaml', 'json'}: + collections_in_paths[collection_path] = { + collection.fqcn: {'version': collection.ver} for collection in collections + } + + continue + + # Display header + fqcn_width, version_width = _get_collection_widths(collections) + _display_header(collection_path, 'Collection', 'Version', fqcn_width, version_width) + + # Sort collections by the namespace and name + for collection in sorted(collections, key=to_text): + _display_collection(collection, fqcn_width, version_width) + + # Do not warn if the specific collection was found in any of the search paths + if collection_found and collection_name: + warnings = [] + + for w in warnings: + display.warning(w) + + if not path_found: + raise AnsibleOptionsError("- None of the provided paths were usable. Please specify a valid path with --{0}s-path".format(context.CLIARGS['type'])) + + if output_format == 'json': + display.display(json.dumps(collections_in_paths)) + elif output_format == 'yaml': + display.display(yaml_dump(collections_in_paths)) + + return 0 + + def execute_publish(self): + """ + Publish a collection into Ansible Galaxy. Requires the path to the collection tarball to publish. + """ + collection_path = GalaxyCLI._resolve_path(context.CLIARGS['args']) + wait = context.CLIARGS['wait'] + timeout = context.CLIARGS['import_timeout'] + + publish_collection(collection_path, self.api, wait, timeout) + + def execute_search(self): + ''' searches for roles on the Ansible Galaxy server''' + page_size = 1000 + search = None + + if context.CLIARGS['args']: + search = '+'.join(context.CLIARGS['args']) + + if not search and not context.CLIARGS['platforms'] and not context.CLIARGS['galaxy_tags'] and not context.CLIARGS['author']: + raise AnsibleError("Invalid query. At least one search term, platform, galaxy tag or author must be provided.") + + response = self.api.search_roles(search, platforms=context.CLIARGS['platforms'], + tags=context.CLIARGS['galaxy_tags'], author=context.CLIARGS['author'], page_size=page_size) + + if response['count'] == 0: + display.display("No roles match your search.", color=C.COLOR_ERROR) + return 1 + + data = [u''] + + if response['count'] > page_size: + data.append(u"Found %d roles matching your search. Showing first %s." % (response['count'], page_size)) + else: + data.append(u"Found %d roles matching your search:" % response['count']) + + max_len = [] + for role in response['results']: + max_len.append(len(role['username'] + '.' + role['name'])) + name_len = max(max_len) + format_str = u" %%-%ds %%s" % name_len + data.append(u'') + data.append(format_str % (u"Name", u"Description")) + data.append(format_str % (u"----", u"-----------")) + for role in response['results']: + data.append(format_str % (u'%s.%s' % (role['username'], role['name']), role['description'])) + + data = u'\n'.join(data) + self.pager(data) + + return 0 + + def execute_import(self): + """ used to import a role into Ansible Galaxy """ + + colors = { + 'INFO': 'normal', + 'WARNING': C.COLOR_WARN, + 'ERROR': C.COLOR_ERROR, + 'SUCCESS': C.COLOR_OK, + 'FAILED': C.COLOR_ERROR, + } + + github_user = to_text(context.CLIARGS['github_user'], errors='surrogate_or_strict') + github_repo = to_text(context.CLIARGS['github_repo'], errors='surrogate_or_strict') + + if context.CLIARGS['check_status']: + task = self.api.get_import_task(github_user=github_user, github_repo=github_repo) + else: + # Submit an import request + task = self.api.create_import_task(github_user, github_repo, + reference=context.CLIARGS['reference'], + role_name=context.CLIARGS['role_name']) + + if len(task) > 1: + # found multiple roles associated with github_user/github_repo + display.display("WARNING: More than one Galaxy role associated with Github repo %s/%s." % (github_user, github_repo), + color='yellow') + display.display("The following Galaxy roles are being updated:" + u'\n', color=C.COLOR_CHANGED) + for t in task: + display.display('%s.%s' % (t['summary_fields']['role']['namespace'], t['summary_fields']['role']['name']), color=C.COLOR_CHANGED) + display.display(u'\nTo properly namespace this role, remove each of the above and re-import %s/%s from scratch' % (github_user, github_repo), + color=C.COLOR_CHANGED) + return 0 + # found a single role as expected + display.display("Successfully submitted import request %d" % task[0]['id']) + if not context.CLIARGS['wait']: + display.display("Role name: %s" % task[0]['summary_fields']['role']['name']) + display.display("Repo: %s/%s" % (task[0]['github_user'], task[0]['github_repo'])) + + if context.CLIARGS['check_status'] or context.CLIARGS['wait']: + # Get the status of the import + msg_list = [] + finished = False + while not finished: + task = self.api.get_import_task(task_id=task[0]['id']) + for msg in task[0]['summary_fields']['task_messages']: + if msg['id'] not in msg_list: + display.display(msg['message_text'], color=colors[msg['message_type']]) + msg_list.append(msg['id']) + if task[0]['state'] in ['SUCCESS', 'FAILED']: + finished = True + else: + time.sleep(10) + + return 0 + + def execute_setup(self): + """ Setup an integration from Github or Travis for Ansible Galaxy roles""" + + if context.CLIARGS['setup_list']: + # List existing integration secrets + secrets = self.api.list_secrets() + if len(secrets) == 0: + # None found + display.display("No integrations found.") + return 0 + display.display(u'\n' + "ID Source Repo", color=C.COLOR_OK) + display.display("---------- ---------- ----------", color=C.COLOR_OK) + for secret in secrets: + display.display("%-10s %-10s %s/%s" % (secret['id'], secret['source'], secret['github_user'], + secret['github_repo']), color=C.COLOR_OK) + return 0 + + if context.CLIARGS['remove_id']: + # Remove a secret + self.api.remove_secret(context.CLIARGS['remove_id']) + display.display("Secret removed. Integrations using this secret will not longer work.", color=C.COLOR_OK) + return 0 + + source = context.CLIARGS['source'] + github_user = context.CLIARGS['github_user'] + github_repo = context.CLIARGS['github_repo'] + secret = context.CLIARGS['secret'] + + resp = self.api.add_secret(source, github_user, github_repo, secret) + display.display("Added integration for %s %s/%s" % (resp['source'], resp['github_user'], resp['github_repo'])) + + return 0 + + def execute_delete(self): + """ Delete a role from Ansible Galaxy. """ + + github_user = context.CLIARGS['github_user'] + github_repo = context.CLIARGS['github_repo'] + resp = self.api.delete_role(github_user, github_repo) + + if len(resp['deleted_roles']) > 1: + display.display("Deleted the following roles:") + display.display("ID User Name") + display.display("------ --------------- ----------") + for role in resp['deleted_roles']: + display.display("%-8s %-15s %s" % (role.id, role.namespace, role.name)) + + display.display(resp['status']) + + return 0 + + +def main(args=None): + GalaxyCLI.cli_executor(args) + + +if __name__ == '__main__': + main() |