summaryrefslogtreecommitdiffstats
path: root/lib/ansible/config/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/config/manager.py')
-rw-r--r--lib/ansible/config/manager.py607
1 files changed, 607 insertions, 0 deletions
diff --git a/lib/ansible/config/manager.py b/lib/ansible/config/manager.py
new file mode 100644
index 0000000..e1fde1d
--- /dev/null
+++ b/lib/ansible/config/manager.py
@@ -0,0 +1,607 @@
+# Copyright: (c) 2017, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import atexit
+import configparser
+import os
+import os.path
+import sys
+import stat
+import tempfile
+import traceback
+
+from collections import namedtuple
+from collections.abc import Mapping, Sequence
+from jinja2.nativetypes import NativeEnvironment
+
+from ansible.errors import AnsibleOptionsError, AnsibleError
+from ansible.module_utils._text import to_text, to_bytes, to_native
+from ansible.module_utils.common.yaml import yaml_load
+from ansible.module_utils.six import string_types
+from ansible.module_utils.parsing.convert_bool import boolean
+from ansible.parsing.quoting import unquote
+from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode
+from ansible.utils import py3compat
+from ansible.utils.path import cleanup_tmp_file, makedirs_safe, unfrackpath
+
+
+Plugin = namedtuple('Plugin', 'name type')
+Setting = namedtuple('Setting', 'name value origin type')
+
+INTERNAL_DEFS = {'lookup': ('_terms',)}
+
+
+def _get_entry(plugin_type, plugin_name, config):
+ ''' construct entry for requested config '''
+ entry = ''
+ if plugin_type:
+ entry += 'plugin_type: %s ' % plugin_type
+ if plugin_name:
+ entry += 'plugin: %s ' % plugin_name
+ entry += 'setting: %s ' % config
+ return entry
+
+
+# FIXME: see if we can unify in module_utils with similar function used by argspec
+def ensure_type(value, value_type, origin=None):
+ ''' return a configuration variable with casting
+ :arg value: The value to ensure correct typing of
+ :kwarg value_type: The type of the value. This can be any of the following strings:
+ :boolean: sets the value to a True or False value
+ :bool: Same as 'boolean'
+ :integer: Sets the value to an integer or raises a ValueType error
+ :int: Same as 'integer'
+ :float: Sets the value to a float or raises a ValueType error
+ :list: Treats the value as a comma separated list. Split the value
+ and return it as a python list.
+ :none: Sets the value to None
+ :path: Expands any environment variables and tilde's in the value.
+ :tmppath: Create a unique temporary directory inside of the directory
+ specified by value and return its path.
+ :temppath: Same as 'tmppath'
+ :tmp: Same as 'tmppath'
+ :pathlist: Treat the value as a typical PATH string. (On POSIX, this
+ means colon separated strings.) Split the value and then expand
+ each part for environment variables and tildes.
+ :pathspec: Treat the value as a PATH string. Expands any environment variables
+ tildes's in the value.
+ :str: Sets the value to string types.
+ :string: Same as 'str'
+ '''
+
+ errmsg = ''
+ basedir = None
+ if origin and os.path.isabs(origin) and os.path.exists(to_bytes(origin)):
+ basedir = origin
+
+ if value_type:
+ value_type = value_type.lower()
+
+ if value is not None:
+ if value_type in ('boolean', 'bool'):
+ value = boolean(value, strict=False)
+
+ elif value_type in ('integer', 'int'):
+ value = int(value)
+
+ elif value_type == 'float':
+ value = float(value)
+
+ elif value_type == 'list':
+ if isinstance(value, string_types):
+ value = [unquote(x.strip()) for x in value.split(',')]
+ elif not isinstance(value, Sequence):
+ errmsg = 'list'
+
+ elif value_type == 'none':
+ if value == "None":
+ value = None
+
+ if value is not None:
+ errmsg = 'None'
+
+ elif value_type == 'path':
+ if isinstance(value, string_types):
+ value = resolve_path(value, basedir=basedir)
+ else:
+ errmsg = 'path'
+
+ elif value_type in ('tmp', 'temppath', 'tmppath'):
+ if isinstance(value, string_types):
+ value = resolve_path(value, basedir=basedir)
+ if not os.path.exists(value):
+ makedirs_safe(value, 0o700)
+ prefix = 'ansible-local-%s' % os.getpid()
+ value = tempfile.mkdtemp(prefix=prefix, dir=value)
+ atexit.register(cleanup_tmp_file, value, warn=True)
+ else:
+ errmsg = 'temppath'
+
+ elif value_type == 'pathspec':
+ if isinstance(value, string_types):
+ value = value.split(os.pathsep)
+
+ if isinstance(value, Sequence):
+ value = [resolve_path(x, basedir=basedir) for x in value]
+ else:
+ errmsg = 'pathspec'
+
+ elif value_type == 'pathlist':
+ if isinstance(value, string_types):
+ value = [x.strip() for x in value.split(',')]
+
+ if isinstance(value, Sequence):
+ value = [resolve_path(x, basedir=basedir) for x in value]
+ else:
+ errmsg = 'pathlist'
+
+ elif value_type in ('dict', 'dictionary'):
+ if not isinstance(value, Mapping):
+ errmsg = 'dictionary'
+
+ elif value_type in ('str', 'string'):
+ if isinstance(value, (string_types, AnsibleVaultEncryptedUnicode, bool, int, float, complex)):
+ value = unquote(to_text(value, errors='surrogate_or_strict'))
+ else:
+ errmsg = 'string'
+
+ # defaults to string type
+ elif isinstance(value, (string_types, AnsibleVaultEncryptedUnicode)):
+ value = unquote(to_text(value, errors='surrogate_or_strict'))
+
+ if errmsg:
+ raise ValueError('Invalid type provided for "%s": %s' % (errmsg, to_native(value)))
+
+ return to_text(value, errors='surrogate_or_strict', nonstring='passthru')
+
+
+# FIXME: see if this can live in utils/path
+def resolve_path(path, basedir=None):
+ ''' resolve relative or 'variable' paths '''
+ if '{{CWD}}' in path: # allow users to force CWD using 'magic' {{CWD}}
+ path = path.replace('{{CWD}}', os.getcwd())
+
+ return unfrackpath(path, follow=False, basedir=basedir)
+
+
+# FIXME: generic file type?
+def get_config_type(cfile):
+
+ ftype = None
+ if cfile is not None:
+ ext = os.path.splitext(cfile)[-1]
+ if ext in ('.ini', '.cfg'):
+ ftype = 'ini'
+ elif ext in ('.yaml', '.yml'):
+ ftype = 'yaml'
+ else:
+ raise AnsibleOptionsError("Unsupported configuration file extension for %s: %s" % (cfile, to_native(ext)))
+
+ return ftype
+
+
+# FIXME: can move to module_utils for use for ini plugins also?
+def get_ini_config_value(p, entry):
+ ''' returns the value of last ini entry found '''
+ value = None
+ if p is not None:
+ try:
+ value = p.get(entry.get('section', 'defaults'), entry.get('key', ''), raw=True)
+ except Exception: # FIXME: actually report issues here
+ pass
+ return value
+
+
+def find_ini_config_file(warnings=None):
+ ''' Load INI Config File order(first found is used): ENV, CWD, HOME, /etc/ansible '''
+ # FIXME: eventually deprecate ini configs
+
+ if warnings is None:
+ # Note: In this case, warnings does nothing
+ warnings = set()
+
+ # A value that can never be a valid path so that we can tell if ANSIBLE_CONFIG was set later
+ # We can't use None because we could set path to None.
+ SENTINEL = object
+
+ potential_paths = []
+
+ # Environment setting
+ path_from_env = os.getenv("ANSIBLE_CONFIG", SENTINEL)
+ if path_from_env is not SENTINEL:
+ path_from_env = unfrackpath(path_from_env, follow=False)
+ if os.path.isdir(to_bytes(path_from_env)):
+ path_from_env = os.path.join(path_from_env, "ansible.cfg")
+ potential_paths.append(path_from_env)
+
+ # Current working directory
+ warn_cmd_public = False
+ try:
+ cwd = os.getcwd()
+ perms = os.stat(cwd)
+ cwd_cfg = os.path.join(cwd, "ansible.cfg")
+ if perms.st_mode & stat.S_IWOTH:
+ # Working directory is world writable so we'll skip it.
+ # Still have to look for a file here, though, so that we know if we have to warn
+ if os.path.exists(cwd_cfg):
+ warn_cmd_public = True
+ else:
+ potential_paths.append(to_text(cwd_cfg, errors='surrogate_or_strict'))
+ except OSError:
+ # If we can't access cwd, we'll simply skip it as a possible config source
+ pass
+
+ # Per user location
+ potential_paths.append(unfrackpath("~/.ansible.cfg", follow=False))
+
+ # System location
+ potential_paths.append("/etc/ansible/ansible.cfg")
+
+ for path in potential_paths:
+ b_path = to_bytes(path)
+ if os.path.exists(b_path) and os.access(b_path, os.R_OK):
+ break
+ else:
+ path = None
+
+ # Emit a warning if all the following are true:
+ # * We did not use a config from ANSIBLE_CONFIG
+ # * There's an ansible.cfg in the current working directory that we skipped
+ if path_from_env != path and warn_cmd_public:
+ warnings.add(u"Ansible is being run in a world writable directory (%s),"
+ u" ignoring it as an ansible.cfg source."
+ u" For more information see"
+ u" https://docs.ansible.com/ansible/devel/reference_appendices/config.html#cfg-in-world-writable-dir"
+ % to_text(cwd))
+
+ return path
+
+
+def _add_base_defs_deprecations(base_defs):
+ '''Add deprecation source 'ansible.builtin' to deprecations in base.yml'''
+ def process(entry):
+ if 'deprecated' in entry:
+ entry['deprecated']['collection_name'] = 'ansible.builtin'
+
+ for dummy, data in base_defs.items():
+ process(data)
+ for section in ('ini', 'env', 'vars'):
+ if section in data:
+ for entry in data[section]:
+ process(entry)
+
+
+class ConfigManager(object):
+
+ DEPRECATED = [] # type: list[tuple[str, dict[str, str]]]
+ WARNINGS = set() # type: set[str]
+
+ def __init__(self, conf_file=None, defs_file=None):
+
+ self._base_defs = {}
+ self._plugins = {}
+ self._parsers = {}
+
+ self._config_file = conf_file
+
+ self._base_defs = self._read_config_yaml_file(defs_file or ('%s/base.yml' % os.path.dirname(__file__)))
+ _add_base_defs_deprecations(self._base_defs)
+
+ if self._config_file is None:
+ # set config using ini
+ self._config_file = find_ini_config_file(self.WARNINGS)
+
+ # consume configuration
+ if self._config_file:
+ # initialize parser and read config
+ self._parse_config_file()
+
+ # ensure we always have config def entry
+ self._base_defs['CONFIG_FILE'] = {'default': None, 'type': 'path'}
+
+ def _read_config_yaml_file(self, yml_file):
+ # TODO: handle relative paths as relative to the directory containing the current playbook instead of CWD
+ # Currently this is only used with absolute paths to the `ansible/config` directory
+ yml_file = to_bytes(yml_file)
+ if os.path.exists(yml_file):
+ with open(yml_file, 'rb') as config_def:
+ return yaml_load(config_def) or {}
+ raise AnsibleError(
+ "Missing base YAML definition file (bad install?): %s" % to_native(yml_file))
+
+ def _parse_config_file(self, cfile=None):
+ ''' return flat configuration settings from file(s) '''
+ # TODO: take list of files with merge/nomerge
+
+ if cfile is None:
+ cfile = self._config_file
+
+ ftype = get_config_type(cfile)
+ if cfile is not None:
+ if ftype == 'ini':
+ self._parsers[cfile] = configparser.ConfigParser(inline_comment_prefixes=(';',))
+ with open(to_bytes(cfile), 'rb') as f:
+ try:
+ cfg_text = to_text(f.read(), errors='surrogate_or_strict')
+ except UnicodeError as e:
+ raise AnsibleOptionsError("Error reading config file(%s) because the config file was not utf8 encoded: %s" % (cfile, to_native(e)))
+ try:
+ self._parsers[cfile].read_string(cfg_text)
+ except configparser.Error as e:
+ raise AnsibleOptionsError("Error reading config file (%s): %s" % (cfile, to_native(e)))
+ # FIXME: this should eventually handle yaml config files
+ # elif ftype == 'yaml':
+ # with open(cfile, 'rb') as config_stream:
+ # self._parsers[cfile] = yaml_load(config_stream)
+ else:
+ raise AnsibleOptionsError("Unsupported configuration file type: %s" % to_native(ftype))
+
+ def _find_yaml_config_files(self):
+ ''' Load YAML Config Files in order, check merge flags, keep origin of settings'''
+ pass
+
+ def get_plugin_options(self, plugin_type, name, keys=None, variables=None, direct=None):
+
+ options = {}
+ defs = self.get_configuration_definitions(plugin_type, name)
+ for option in defs:
+ options[option] = self.get_config_value(option, plugin_type=plugin_type, plugin_name=name, keys=keys, variables=variables, direct=direct)
+
+ return options
+
+ def get_plugin_vars(self, plugin_type, name):
+
+ pvars = []
+ for pdef in self.get_configuration_definitions(plugin_type, name).values():
+ if 'vars' in pdef and pdef['vars']:
+ for var_entry in pdef['vars']:
+ pvars.append(var_entry['name'])
+ return pvars
+
+ def get_plugin_options_from_var(self, plugin_type, name, variable):
+
+ options = []
+ for option_name, pdef in self.get_configuration_definitions(plugin_type, name).items():
+ if 'vars' in pdef and pdef['vars']:
+ for var_entry in pdef['vars']:
+ if variable == var_entry['name']:
+ options.append(option_name)
+ return options
+
+ def get_configuration_definition(self, name, plugin_type=None, plugin_name=None):
+
+ ret = {}
+ if plugin_type is None:
+ ret = self._base_defs.get(name, None)
+ elif plugin_name is None:
+ ret = self._plugins.get(plugin_type, {}).get(name, None)
+ else:
+ ret = self._plugins.get(plugin_type, {}).get(plugin_name, {}).get(name, None)
+
+ return ret
+
+ def has_configuration_definition(self, plugin_type, name):
+
+ has = False
+ if plugin_type in self._plugins:
+ has = (name in self._plugins[plugin_type])
+
+ return has
+
+ def get_configuration_definitions(self, plugin_type=None, name=None, ignore_private=False):
+ ''' just list the possible settings, either base or for specific plugins or plugin '''
+
+ ret = {}
+ if plugin_type is None:
+ ret = self._base_defs
+ elif name is None:
+ ret = self._plugins.get(plugin_type, {})
+ else:
+ ret = self._plugins.get(plugin_type, {}).get(name, {})
+
+ if ignore_private:
+ for cdef in list(ret.keys()):
+ if cdef.startswith('_'):
+ del ret[cdef]
+
+ return ret
+
+ def _loop_entries(self, container, entry_list):
+ ''' repeat code for value entry assignment '''
+
+ value = None
+ origin = None
+ for entry in entry_list:
+ name = entry.get('name')
+ try:
+ temp_value = container.get(name, None)
+ except UnicodeEncodeError:
+ self.WARNINGS.add(u'value for config entry {0} contains invalid characters, ignoring...'.format(to_text(name)))
+ continue
+ if temp_value is not None: # only set if entry is defined in container
+ # inline vault variables should be converted to a text string
+ if isinstance(temp_value, AnsibleVaultEncryptedUnicode):
+ temp_value = to_text(temp_value, errors='surrogate_or_strict')
+
+ value = temp_value
+ origin = name
+
+ # deal with deprecation of setting source, if used
+ if 'deprecated' in entry:
+ self.DEPRECATED.append((entry['name'], entry['deprecated']))
+
+ return value, origin
+
+ def get_config_value(self, config, cfile=None, plugin_type=None, plugin_name=None, keys=None, variables=None, direct=None):
+ ''' wrapper '''
+
+ try:
+ value, _drop = self.get_config_value_and_origin(config, cfile=cfile, plugin_type=plugin_type, plugin_name=plugin_name,
+ keys=keys, variables=variables, direct=direct)
+ except AnsibleError:
+ raise
+ except Exception as e:
+ raise AnsibleError("Unhandled exception when retrieving %s:\n%s" % (config, to_native(e)), orig_exc=e)
+ return value
+
+ def get_config_value_and_origin(self, config, cfile=None, plugin_type=None, plugin_name=None, keys=None, variables=None, direct=None):
+ ''' Given a config key figure out the actual value and report on the origin of the settings '''
+ if cfile is None:
+ # use default config
+ cfile = self._config_file
+
+ if config == 'CONFIG_FILE':
+ return cfile, ''
+
+ # Note: sources that are lists listed in low to high precedence (last one wins)
+ value = None
+ origin = None
+
+ defs = self.get_configuration_definitions(plugin_type, plugin_name)
+ if config in defs:
+
+ aliases = defs[config].get('aliases', [])
+
+ # direct setting via plugin arguments, can set to None so we bypass rest of processing/defaults
+ if direct:
+ if config in direct:
+ value = direct[config]
+ origin = 'Direct'
+ else:
+ direct_aliases = [direct[alias] for alias in aliases if alias in direct]
+ if direct_aliases:
+ value = direct_aliases[0]
+ origin = 'Direct'
+
+ if value is None and variables and defs[config].get('vars'):
+ # Use 'variable overrides' if present, highest precedence, but only present when querying running play
+ value, origin = self._loop_entries(variables, defs[config]['vars'])
+ origin = 'var: %s' % origin
+
+ # use playbook keywords if you have em
+ if value is None and defs[config].get('keyword') and keys:
+ value, origin = self._loop_entries(keys, defs[config]['keyword'])
+ origin = 'keyword: %s' % origin
+
+ # automap to keywords
+ # TODO: deprecate these in favor of explicit keyword above
+ if value is None and keys:
+ if config in keys:
+ value = keys[config]
+ keyword = config
+
+ elif aliases:
+ for alias in aliases:
+ if alias in keys:
+ value = keys[alias]
+ keyword = alias
+ break
+
+ if value is not None:
+ origin = 'keyword: %s' % keyword
+
+ if value is None and 'cli' in defs[config]:
+ # avoid circular import .. until valid
+ from ansible import context
+ value, origin = self._loop_entries(context.CLIARGS, defs[config]['cli'])
+ origin = 'cli: %s' % origin
+
+ # env vars are next precedence
+ if value is None and defs[config].get('env'):
+ value, origin = self._loop_entries(py3compat.environ, defs[config]['env'])
+ origin = 'env: %s' % origin
+
+ # try config file entries next, if we have one
+ if self._parsers.get(cfile, None) is None:
+ self._parse_config_file(cfile)
+
+ if value is None and cfile is not None:
+ ftype = get_config_type(cfile)
+ if ftype and defs[config].get(ftype):
+ if ftype == 'ini':
+ # load from ini config
+ try: # FIXME: generalize _loop_entries to allow for files also, most of this code is dupe
+ for ini_entry in defs[config]['ini']:
+ temp_value = get_ini_config_value(self._parsers[cfile], ini_entry)
+ if temp_value is not None:
+ value = temp_value
+ origin = cfile
+ if 'deprecated' in ini_entry:
+ self.DEPRECATED.append(('[%s]%s' % (ini_entry['section'], ini_entry['key']), ini_entry['deprecated']))
+ except Exception as e:
+ sys.stderr.write("Error while loading ini config %s: %s" % (cfile, to_native(e)))
+ elif ftype == 'yaml':
+ # FIXME: implement, also , break down key from defs (. notation???)
+ origin = cfile
+
+ # set default if we got here w/o a value
+ if value is None:
+ if defs[config].get('required', False):
+ if not plugin_type or config not in INTERNAL_DEFS.get(plugin_type, {}):
+ raise AnsibleError("No setting was provided for required configuration %s" %
+ to_native(_get_entry(plugin_type, plugin_name, config)))
+ else:
+ origin = 'default'
+ value = defs[config].get('default')
+ if isinstance(value, string_types) and (value.startswith('{{') and value.endswith('}}')) and variables is not None:
+ # template default values if possible
+ # NOTE: cannot use is_template due to circular dep
+ try:
+ t = NativeEnvironment().from_string(value)
+ value = t.render(variables)
+ except Exception:
+ pass # not templatable
+
+ # ensure correct type, can raise exceptions on mismatched types
+ try:
+ value = ensure_type(value, defs[config].get('type'), origin=origin)
+ except ValueError as e:
+ if origin.startswith('env:') and value == '':
+ # this is empty env var for non string so we can set to default
+ origin = 'default'
+ value = ensure_type(defs[config].get('default'), defs[config].get('type'), origin=origin)
+ else:
+ raise AnsibleOptionsError('Invalid type for configuration option %s (from %s): %s' %
+ (to_native(_get_entry(plugin_type, plugin_name, config)).strip(), origin, to_native(e)))
+
+ # deal with restricted values
+ if value is not None and 'choices' in defs[config] and defs[config]['choices'] is not None:
+ invalid_choices = True # assume the worst!
+ if defs[config].get('type') == 'list':
+ # for a list type, compare all values in type are allowed
+ invalid_choices = not all(choice in defs[config]['choices'] for choice in value)
+ else:
+ # these should be only the simple data types (string, int, bool, float, etc) .. ignore dicts for now
+ invalid_choices = value not in defs[config]['choices']
+
+ if invalid_choices:
+
+ if isinstance(defs[config]['choices'], Mapping):
+ valid = ', '.join([to_text(k) for k in defs[config]['choices'].keys()])
+ elif isinstance(defs[config]['choices'], string_types):
+ valid = defs[config]['choices']
+ elif isinstance(defs[config]['choices'], Sequence):
+ valid = ', '.join([to_text(c) for c in defs[config]['choices']])
+ else:
+ valid = defs[config]['choices']
+
+ raise AnsibleOptionsError('Invalid value "%s" for configuration option "%s", valid values are: %s' %
+ (value, to_native(_get_entry(plugin_type, plugin_name, config)), valid))
+
+ # deal with deprecation of the setting
+ if 'deprecated' in defs[config] and origin != 'default':
+ self.DEPRECATED.append((config, defs[config].get('deprecated')))
+ else:
+ raise AnsibleError('Requested entry (%s) was not defined in configuration.' % to_native(_get_entry(plugin_type, plugin_name, config)))
+
+ return value, origin
+
+ def initialize_plugin_configuration_definitions(self, plugin_type, name, defs):
+
+ if plugin_type not in self._plugins:
+ self._plugins[plugin_type] = {}
+
+ self._plugins[plugin_type][name] = defs