summaryrefslogtreecommitdiffstats
path: root/lib/ansible/cli/config.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/cli/config.py')
-rwxr-xr-xlib/ansible/cli/config.py552
1 files changed, 552 insertions, 0 deletions
diff --git a/lib/ansible/cli/config.py b/lib/ansible/cli/config.py
new file mode 100755
index 0000000..c8d99ea
--- /dev/null
+++ b/lib/ansible/cli/config.py
@@ -0,0 +1,552 @@
+#!/usr/bin/env python
+# Copyright: (c) 2017, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+# PYTHON_ARGCOMPLETE_OK
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+# ansible.cli needs to be imported first, to ensure the source bin/* scripts run that code first
+from ansible.cli import CLI
+
+import os
+import yaml
+import shlex
+import subprocess
+
+from collections.abc import Mapping
+
+from ansible import context
+import ansible.plugins.loader as plugin_loader
+
+from ansible import constants as C
+from ansible.cli.arguments import option_helpers as opt_help
+from ansible.config.manager import ConfigManager, Setting
+from ansible.errors import AnsibleError, AnsibleOptionsError
+from ansible.module_utils._text import to_native, to_text, to_bytes
+from ansible.module_utils.common.json import json_dump
+from ansible.module_utils.six import string_types
+from ansible.parsing.quoting import is_quoted
+from ansible.parsing.yaml.dumper import AnsibleDumper
+from ansible.utils.color import stringc
+from ansible.utils.display import Display
+from ansible.utils.path import unfrackpath
+
+display = Display()
+
+
+def yaml_dump(data, default_flow_style=False, default_style=None):
+ return yaml.dump(data, Dumper=AnsibleDumper, default_flow_style=default_flow_style, default_style=default_style)
+
+
+def yaml_short(data):
+ return yaml_dump(data, default_flow_style=True, default_style="''")
+
+
+def get_constants():
+ ''' helper method to ensure we can template based on existing constants '''
+ if not hasattr(get_constants, 'cvars'):
+ get_constants.cvars = {k: getattr(C, k) for k in dir(C) if not k.startswith('__')}
+ return get_constants.cvars
+
+
+class ConfigCLI(CLI):
+ """ Config command line class """
+
+ name = 'ansible-config'
+
+ def __init__(self, args, callback=None):
+
+ self.config_file = None
+ self.config = None
+ super(ConfigCLI, self).__init__(args, callback)
+
+ def init_parser(self):
+
+ super(ConfigCLI, self).init_parser(
+ desc="View ansible configuration.",
+ )
+
+ common = opt_help.argparse.ArgumentParser(add_help=False)
+ opt_help.add_verbosity_options(common)
+ common.add_argument('-c', '--config', dest='config_file',
+ help="path to configuration file, defaults to first file found in precedence.")
+ common.add_argument("-t", "--type", action="store", default='base', dest='type', choices=['all', 'base'] + list(C.CONFIGURABLE_PLUGINS),
+ help="Filter down to a specific plugin type.")
+ common.add_argument('args', help='Specific plugin to target, requires type of plugin to be set', nargs='*')
+
+ subparsers = self.parser.add_subparsers(dest='action')
+ subparsers.required = True
+
+ list_parser = subparsers.add_parser('list', help='Print all config options', parents=[common])
+ list_parser.set_defaults(func=self.execute_list)
+ list_parser.add_argument('--format', '-f', dest='format', action='store', choices=['json', 'yaml'], default='yaml',
+ help='Output format for list')
+
+ dump_parser = subparsers.add_parser('dump', help='Dump configuration', parents=[common])
+ dump_parser.set_defaults(func=self.execute_dump)
+ dump_parser.add_argument('--only-changed', '--changed-only', dest='only_changed', action='store_true',
+ help="Only show configurations that have changed from the default")
+ dump_parser.add_argument('--format', '-f', dest='format', action='store', choices=['json', 'yaml', 'display'], default='display',
+ help='Output format for dump')
+
+ view_parser = subparsers.add_parser('view', help='View configuration file', parents=[common])
+ view_parser.set_defaults(func=self.execute_view)
+
+ init_parser = subparsers.add_parser('init', help='Create initial configuration', parents=[common])
+ init_parser.set_defaults(func=self.execute_init)
+ init_parser.add_argument('--format', '-f', dest='format', action='store', choices=['ini', 'env', 'vars'], default='ini',
+ help='Output format for init')
+ init_parser.add_argument('--disabled', dest='commented', action='store_true', default=False,
+ help='Prefixes all entries with a comment character to disable them')
+
+ # search_parser = subparsers.add_parser('find', help='Search configuration')
+ # search_parser.set_defaults(func=self.execute_search)
+ # search_parser.add_argument('args', help='Search term', metavar='<search term>')
+
+ def post_process_args(self, options):
+ options = super(ConfigCLI, self).post_process_args(options)
+ display.verbosity = options.verbosity
+
+ return options
+
+ def run(self):
+
+ super(ConfigCLI, self).run()
+
+ if context.CLIARGS['config_file']:
+ self.config_file = unfrackpath(context.CLIARGS['config_file'], follow=False)
+ b_config = to_bytes(self.config_file)
+ if os.path.exists(b_config) and os.access(b_config, os.R_OK):
+ self.config = ConfigManager(self.config_file)
+ else:
+ raise AnsibleOptionsError('The provided configuration file is missing or not accessible: %s' % to_native(self.config_file))
+ else:
+ self.config = C.config
+ self.config_file = self.config._config_file
+
+ if self.config_file:
+ try:
+ if not os.path.exists(self.config_file):
+ raise AnsibleOptionsError("%s does not exist or is not accessible" % (self.config_file))
+ elif not os.path.isfile(self.config_file):
+ raise AnsibleOptionsError("%s is not a valid file" % (self.config_file))
+
+ os.environ['ANSIBLE_CONFIG'] = to_native(self.config_file)
+ except Exception:
+ if context.CLIARGS['action'] in ['view']:
+ raise
+ elif context.CLIARGS['action'] in ['edit', 'update']:
+ display.warning("File does not exist, used empty file: %s" % self.config_file)
+
+ elif context.CLIARGS['action'] == 'view':
+ raise AnsibleError('Invalid or no config file was supplied')
+
+ # run the requested action
+ context.CLIARGS['func']()
+
+ def execute_update(self):
+ '''
+ Updates a single setting in the specified ansible.cfg
+ '''
+ raise AnsibleError("Option not implemented yet")
+
+ # pylint: disable=unreachable
+ if context.CLIARGS['setting'] is None:
+ raise AnsibleOptionsError("update option requires a setting to update")
+
+ (entry, value) = context.CLIARGS['setting'].split('=')
+ if '.' in entry:
+ (section, option) = entry.split('.')
+ else:
+ section = 'defaults'
+ option = entry
+ subprocess.call([
+ 'ansible',
+ '-m', 'ini_file',
+ 'localhost',
+ '-c', 'local',
+ '-a', '"dest=%s section=%s option=%s value=%s backup=yes"' % (self.config_file, section, option, value)
+ ])
+
+ def execute_view(self):
+ '''
+ Displays the current config file
+ '''
+ try:
+ with open(self.config_file, 'rb') as f:
+ self.pager(to_text(f.read(), errors='surrogate_or_strict'))
+ except Exception as e:
+ raise AnsibleError("Failed to open config file: %s" % to_native(e))
+
+ def execute_edit(self):
+ '''
+ Opens ansible.cfg in the default EDITOR
+ '''
+ raise AnsibleError("Option not implemented yet")
+
+ # pylint: disable=unreachable
+ try:
+ editor = shlex.split(os.environ.get('EDITOR', 'vi'))
+ editor.append(self.config_file)
+ subprocess.call(editor)
+ except Exception as e:
+ raise AnsibleError("Failed to open editor: %s" % to_native(e))
+
+ def _list_plugin_settings(self, ptype, plugins=None):
+ entries = {}
+ loader = getattr(plugin_loader, '%s_loader' % ptype)
+
+ # build list
+ if plugins:
+ plugin_cs = []
+ for plugin in plugins:
+ p = loader.get(plugin, class_only=True)
+ if p is None:
+ display.warning("Skipping %s as we could not find matching plugin" % plugin)
+ else:
+ plugin_cs.append(p)
+ else:
+ plugin_cs = loader.all(class_only=True)
+
+ # iterate over class instances
+ for plugin in plugin_cs:
+ finalname = name = plugin._load_name
+ if name.startswith('_'):
+ # alias or deprecated
+ if os.path.islink(plugin._original_path):
+ continue
+ else:
+ finalname = name.replace('_', '', 1) + ' (DEPRECATED)'
+
+ entries[finalname] = self.config.get_configuration_definitions(ptype, name)
+
+ return entries
+
+ def _list_entries_from_args(self):
+ '''
+ build a dict with the list requested configs
+ '''
+ config_entries = {}
+ if context.CLIARGS['type'] in ('base', 'all'):
+ # this dumps main/common configs
+ config_entries = self.config.get_configuration_definitions(ignore_private=True)
+
+ if context.CLIARGS['type'] != 'base':
+ config_entries['PLUGINS'] = {}
+
+ if context.CLIARGS['type'] == 'all':
+ # now each plugin type
+ for ptype in C.CONFIGURABLE_PLUGINS:
+ config_entries['PLUGINS'][ptype.upper()] = self._list_plugin_settings(ptype)
+ elif context.CLIARGS['type'] != 'base':
+ config_entries['PLUGINS'][context.CLIARGS['type']] = self._list_plugin_settings(context.CLIARGS['type'], context.CLIARGS['args'])
+
+ return config_entries
+
+ def execute_list(self):
+ '''
+ list and output available configs
+ '''
+
+ config_entries = self._list_entries_from_args()
+ if context.CLIARGS['format'] == 'yaml':
+ output = yaml_dump(config_entries)
+ elif context.CLIARGS['format'] == 'json':
+ output = json_dump(config_entries)
+
+ self.pager(to_text(output, errors='surrogate_or_strict'))
+
+ def _get_settings_vars(self, settings, subkey):
+
+ data = []
+ if context.CLIARGS['commented']:
+ prefix = '#'
+ else:
+ prefix = ''
+
+ for setting in settings:
+
+ if not settings[setting].get('description'):
+ continue
+
+ default = settings[setting].get('default', '')
+ if subkey == 'env':
+ stype = settings[setting].get('type', '')
+ if stype == 'boolean':
+ if default:
+ default = '1'
+ else:
+ default = '0'
+ elif default:
+ if stype == 'list':
+ if not isinstance(default, string_types):
+ # python lists are not valid env ones
+ try:
+ default = ', '.join(default)
+ except Exception as e:
+ # list of other stuff
+ default = '%s' % to_native(default)
+ if isinstance(default, string_types) and not is_quoted(default):
+ default = shlex.quote(default)
+ elif default is None:
+ default = ''
+
+ if subkey in settings[setting] and settings[setting][subkey]:
+ entry = settings[setting][subkey][-1]['name']
+ if isinstance(settings[setting]['description'], string_types):
+ desc = settings[setting]['description']
+ else:
+ desc = '\n#'.join(settings[setting]['description'])
+ name = settings[setting].get('name', setting)
+ data.append('# %s(%s): %s' % (name, settings[setting].get('type', 'string'), desc))
+
+ # TODO: might need quoting and value coercion depending on type
+ if subkey == 'env':
+ if entry.startswith('_ANSIBLE_'):
+ continue
+ data.append('%s%s=%s' % (prefix, entry, default))
+ elif subkey == 'vars':
+ if entry.startswith('_ansible_'):
+ continue
+ data.append(prefix + '%s: %s' % (entry, to_text(yaml_short(default), errors='surrogate_or_strict')))
+ data.append('')
+
+ return data
+
+ def _get_settings_ini(self, settings):
+
+ sections = {}
+ for o in sorted(settings.keys()):
+
+ opt = settings[o]
+
+ if not isinstance(opt, Mapping):
+ # recursed into one of the few settings that is a mapping, now hitting it's strings
+ continue
+
+ if not opt.get('description'):
+ # its a plugin
+ new_sections = self._get_settings_ini(opt)
+ for s in new_sections:
+ if s in sections:
+ sections[s].extend(new_sections[s])
+ else:
+ sections[s] = new_sections[s]
+ continue
+
+ if isinstance(opt['description'], string_types):
+ desc = '# (%s) %s' % (opt.get('type', 'string'), opt['description'])
+ else:
+ desc = "# (%s) " % opt.get('type', 'string')
+ desc += "\n# ".join(opt['description'])
+
+ if 'ini' in opt and opt['ini']:
+ entry = opt['ini'][-1]
+ if entry['section'] not in sections:
+ sections[entry['section']] = []
+
+ default = opt.get('default', '')
+ if opt.get('type', '') == 'list' and not isinstance(default, string_types):
+ # python lists are not valid ini ones
+ default = ', '.join(default)
+ elif default is None:
+ default = ''
+
+ if context.CLIARGS['commented']:
+ entry['key'] = ';%s' % entry['key']
+
+ key = desc + '\n%s=%s' % (entry['key'], default)
+ sections[entry['section']].append(key)
+
+ return sections
+
+ def execute_init(self):
+ """Create initial configuration"""
+
+ data = []
+ config_entries = self._list_entries_from_args()
+ plugin_types = config_entries.pop('PLUGINS', None)
+
+ if context.CLIARGS['format'] == 'ini':
+ sections = self._get_settings_ini(config_entries)
+
+ if plugin_types:
+ for ptype in plugin_types:
+ plugin_sections = self._get_settings_ini(plugin_types[ptype])
+ for s in plugin_sections:
+ if s in sections:
+ sections[s].extend(plugin_sections[s])
+ else:
+ sections[s] = plugin_sections[s]
+
+ if sections:
+ for section in sections.keys():
+ data.append('[%s]' % section)
+ for key in sections[section]:
+ data.append(key)
+ data.append('')
+ data.append('')
+
+ elif context.CLIARGS['format'] in ('env', 'vars'): # TODO: add yaml once that config option is added
+ data = self._get_settings_vars(config_entries, context.CLIARGS['format'])
+ if plugin_types:
+ for ptype in plugin_types:
+ for plugin in plugin_types[ptype].keys():
+ data.extend(self._get_settings_vars(plugin_types[ptype][plugin], context.CLIARGS['format']))
+
+ self.pager(to_text('\n'.join(data), errors='surrogate_or_strict'))
+
+ def _render_settings(self, config):
+
+ entries = []
+ for setting in sorted(config):
+ changed = (config[setting].origin not in ('default', 'REQUIRED'))
+
+ if context.CLIARGS['format'] == 'display':
+ if isinstance(config[setting], Setting):
+ # proceed normally
+ if config[setting].origin == 'default':
+ color = 'green'
+ elif config[setting].origin == 'REQUIRED':
+ # should include '_terms', '_input', etc
+ color = 'red'
+ else:
+ color = 'yellow'
+ msg = "%s(%s) = %s" % (setting, config[setting].origin, config[setting].value)
+ else:
+ color = 'green'
+ msg = "%s(%s) = %s" % (setting, 'default', config[setting].get('default'))
+
+ entry = stringc(msg, color)
+ else:
+ entry = {}
+ for key in config[setting]._fields:
+ entry[key] = getattr(config[setting], key)
+
+ if not context.CLIARGS['only_changed'] or changed:
+ entries.append(entry)
+
+ return entries
+
+ def _get_global_configs(self):
+ config = self.config.get_configuration_definitions(ignore_private=True).copy()
+ for setting in config.keys():
+ v, o = C.config.get_config_value_and_origin(setting, cfile=self.config_file, variables=get_constants())
+ config[setting] = Setting(setting, v, o, None)
+
+ return self._render_settings(config)
+
+ def _get_plugin_configs(self, ptype, plugins):
+
+ # prep loading
+ loader = getattr(plugin_loader, '%s_loader' % ptype)
+
+ # acumulators
+ output = []
+ config_entries = {}
+
+ # build list
+ if plugins:
+ plugin_cs = []
+ for plugin in plugins:
+ p = loader.get(plugin, class_only=True)
+ if p is None:
+ display.warning("Skipping %s as we could not find matching plugin" % plugin)
+ else:
+ plugin_cs.append(loader.get(plugin, class_only=True))
+ else:
+ plugin_cs = loader.all(class_only=True)
+
+ for plugin in plugin_cs:
+ # in case of deprecastion they diverge
+ finalname = name = plugin._load_name
+ if name.startswith('_'):
+ if os.path.islink(plugin._original_path):
+ # skip alias
+ continue
+ # deprecated, but use 'nice name'
+ finalname = name.replace('_', '', 1) + ' (DEPRECATED)'
+
+ # default entries per plugin
+ config_entries[finalname] = self.config.get_configuration_definitions(ptype, name)
+
+ try:
+ # populate config entries by loading plugin
+ dump = loader.get(name, class_only=True)
+ except Exception as e:
+ display.warning('Skipping "%s" %s plugin, as we cannot load plugin to check config due to : %s' % (name, ptype, to_native(e)))
+ continue
+
+ # actually get the values
+ for setting in config_entries[finalname].keys():
+ try:
+ v, o = C.config.get_config_value_and_origin(setting, cfile=self.config_file, plugin_type=ptype, plugin_name=name, variables=get_constants())
+ except AnsibleError as e:
+ if to_text(e).startswith('No setting was provided for required configuration'):
+ v = None
+ o = 'REQUIRED'
+ else:
+ raise e
+
+ if v is None and o is None:
+ # not all cases will be error
+ o = 'REQUIRED'
+
+ config_entries[finalname][setting] = Setting(setting, v, o, None)
+
+ # pretty please!
+ results = self._render_settings(config_entries[finalname])
+ if results:
+ if context.CLIARGS['format'] == 'display':
+ # avoid header for empty lists (only changed!)
+ output.append('\n%s:\n%s' % (finalname, '_' * len(finalname)))
+ output.extend(results)
+ else:
+ output.append({finalname: results})
+
+ return output
+
+ def execute_dump(self):
+ '''
+ Shows the current settings, merges ansible.cfg if specified
+ '''
+ if context.CLIARGS['type'] == 'base':
+ # deal with base
+ output = self._get_global_configs()
+ elif context.CLIARGS['type'] == 'all':
+ # deal with base
+ output = self._get_global_configs()
+ # deal with plugins
+ for ptype in C.CONFIGURABLE_PLUGINS:
+ plugin_list = self._get_plugin_configs(ptype, context.CLIARGS['args'])
+ if context.CLIARGS['format'] == 'display':
+ if not context.CLIARGS['only_changed'] or plugin_list:
+ output.append('\n%s:\n%s' % (ptype.upper(), '=' * len(ptype)))
+ output.extend(plugin_list)
+ else:
+ if ptype in ('modules', 'doc_fragments'):
+ pname = ptype.upper()
+ else:
+ pname = '%s_PLUGINS' % ptype.upper()
+ output.append({pname: plugin_list})
+ else:
+ # deal with plugins
+ output = self._get_plugin_configs(context.CLIARGS['type'], context.CLIARGS['args'])
+
+ if context.CLIARGS['format'] == 'display':
+ text = '\n'.join(output)
+ if context.CLIARGS['format'] == 'yaml':
+ text = yaml_dump(output)
+ elif context.CLIARGS['format'] == 'json':
+ text = json_dump(output)
+
+ self.pager(to_text(text, errors='surrogate_or_strict'))
+
+
+def main(args=None):
+ ConfigCLI.cli_executor(args)
+
+
+if __name__ == '__main__':
+ main()