diff options
Diffstat (limited to 'lib/ansible/module_utils/common/arg_spec.py')
-rw-r--r-- | lib/ansible/module_utils/common/arg_spec.py | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/lib/ansible/module_utils/common/arg_spec.py b/lib/ansible/module_utils/common/arg_spec.py new file mode 100644 index 0000000..d9f716e --- /dev/null +++ b/lib/ansible/module_utils/common/arg_spec.py @@ -0,0 +1,311 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2021 Ansible Project +# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from copy import deepcopy + +from ansible.module_utils.common.parameters import ( + _ADDITIONAL_CHECKS, + _get_legal_inputs, + _get_unsupported_parameters, + _handle_aliases, + _list_deprecations, + _list_no_log_values, + _set_defaults, + _validate_argument_types, + _validate_argument_values, + _validate_sub_spec, + set_fallbacks, +) + +from ansible.module_utils.common.text.converters import to_native +from ansible.module_utils.common.warnings import deprecate, warn + +from ansible.module_utils.common.validation import ( + check_mutually_exclusive, + check_required_arguments, +) + +from ansible.module_utils.errors import ( + AliasError, + AnsibleValidationErrorMultiple, + DeprecationError, + MutuallyExclusiveError, + NoLogError, + RequiredDefaultError, + RequiredError, + UnsupportedError, +) + + +class ValidationResult: + """Result of argument spec validation. + + This is the object returned by :func:`ArgumentSpecValidator.validate() + <ansible.module_utils.common.arg_spec.ArgumentSpecValidator.validate()>` + containing the validated parameters and any errors. + """ + + def __init__(self, parameters): + """ + :arg parameters: Terms to be validated and coerced to the correct type. + :type parameters: dict + """ + self._no_log_values = set() + """:class:`set` of values marked as ``no_log`` in the argument spec. This + is a temporary holding place for these values and may move in the future. + """ + + self._unsupported_parameters = set() + self._supported_parameters = dict() + self._validated_parameters = deepcopy(parameters) + self._deprecations = [] + self._warnings = [] + self._aliases = {} + self.errors = AnsibleValidationErrorMultiple() + """ + :class:`~ansible.module_utils.errors.AnsibleValidationErrorMultiple` containing all + :class:`~ansible.module_utils.errors.AnsibleValidationError` objects if there were + any failures during validation. + """ + + @property + def validated_parameters(self): + """Validated and coerced parameters.""" + return self._validated_parameters + + @property + def unsupported_parameters(self): + """:class:`set` of unsupported parameter names.""" + return self._unsupported_parameters + + @property + def error_messages(self): + """:class:`list` of all error messages from each exception in :attr:`errors`.""" + return self.errors.messages + + +class ArgumentSpecValidator: + """Argument spec validation class + + Creates a validator based on the ``argument_spec`` that can be used to + validate a number of parameters using the :meth:`validate` method. + """ + + def __init__(self, argument_spec, + mutually_exclusive=None, + required_together=None, + required_one_of=None, + required_if=None, + required_by=None, + ): + + """ + :arg argument_spec: Specification of valid parameters and their type. May + include nested argument specs. + :type argument_spec: dict[str, dict] + + :kwarg mutually_exclusive: List or list of lists of terms that should not + be provided together. + :type mutually_exclusive: list[str] or list[list[str]] + + :kwarg required_together: List of lists of terms that are required together. + :type required_together: list[list[str]] + + :kwarg required_one_of: List of lists of terms, one of which in each list + is required. + :type required_one_of: list[list[str]] + + :kwarg required_if: List of lists of ``[parameter, value, [parameters]]`` where + one of ``[parameters]`` is required if ``parameter == value``. + :type required_if: list + + :kwarg required_by: Dictionary of parameter names that contain a list of + parameters required by each key in the dictionary. + :type required_by: dict[str, list[str]] + """ + + self._mutually_exclusive = mutually_exclusive + self._required_together = required_together + self._required_one_of = required_one_of + self._required_if = required_if + self._required_by = required_by + self._valid_parameter_names = set() + self.argument_spec = argument_spec + + for key in sorted(self.argument_spec.keys()): + aliases = self.argument_spec[key].get('aliases') + if aliases: + self._valid_parameter_names.update(["{key} ({aliases})".format(key=key, aliases=", ".join(sorted(aliases)))]) + else: + self._valid_parameter_names.update([key]) + + def validate(self, parameters, *args, **kwargs): + """Validate ``parameters`` against argument spec. + + Error messages in the :class:`ValidationResult` may contain no_log values and should be + sanitized with :func:`~ansible.module_utils.common.parameters.sanitize_keys` before logging or displaying. + + :arg parameters: Parameters to validate against the argument spec + :type parameters: dict[str, dict] + + :return: :class:`ValidationResult` containing validated parameters. + + :Simple Example: + + .. code-block:: text + + argument_spec = { + 'name': {'type': 'str'}, + 'age': {'type': 'int'}, + } + + parameters = { + 'name': 'bo', + 'age': '42', + } + + validator = ArgumentSpecValidator(argument_spec) + result = validator.validate(parameters) + + if result.error_messages: + sys.exit("Validation failed: {0}".format(", ".join(result.error_messages)) + + valid_params = result.validated_parameters + """ + + result = ValidationResult(parameters) + + result._no_log_values.update(set_fallbacks(self.argument_spec, result._validated_parameters)) + + alias_warnings = [] + alias_deprecations = [] + try: + result._aliases.update(_handle_aliases(self.argument_spec, result._validated_parameters, alias_warnings, alias_deprecations)) + except (TypeError, ValueError) as e: + result.errors.append(AliasError(to_native(e))) + + legal_inputs = _get_legal_inputs(self.argument_spec, result._validated_parameters, result._aliases) + + for option, alias in alias_warnings: + result._warnings.append({'option': option, 'alias': alias}) + + for deprecation in alias_deprecations: + result._deprecations.append({ + 'msg': "Alias '%s' is deprecated. See the module docs for more information" % deprecation['name'], + 'version': deprecation.get('version'), + 'date': deprecation.get('date'), + 'collection_name': deprecation.get('collection_name'), + }) + + try: + result._no_log_values.update(_list_no_log_values(self.argument_spec, result._validated_parameters)) + except TypeError as te: + result.errors.append(NoLogError(to_native(te))) + + try: + result._deprecations.extend(_list_deprecations(self.argument_spec, result._validated_parameters)) + except TypeError as te: + result.errors.append(DeprecationError(to_native(te))) + + try: + result._unsupported_parameters.update( + _get_unsupported_parameters( + self.argument_spec, + result._validated_parameters, + legal_inputs, + store_supported=result._supported_parameters, + ) + ) + except TypeError as te: + result.errors.append(RequiredDefaultError(to_native(te))) + except ValueError as ve: + result.errors.append(AliasError(to_native(ve))) + + try: + check_mutually_exclusive(self._mutually_exclusive, result._validated_parameters) + except TypeError as te: + result.errors.append(MutuallyExclusiveError(to_native(te))) + + result._no_log_values.update(_set_defaults(self.argument_spec, result._validated_parameters, False)) + + try: + check_required_arguments(self.argument_spec, result._validated_parameters) + except TypeError as e: + result.errors.append(RequiredError(to_native(e))) + + _validate_argument_types(self.argument_spec, result._validated_parameters, errors=result.errors) + _validate_argument_values(self.argument_spec, result._validated_parameters, errors=result.errors) + + for check in _ADDITIONAL_CHECKS: + try: + check['func'](getattr(self, "_{attr}".format(attr=check['attr'])), result._validated_parameters) + except TypeError as te: + result.errors.append(check['err'](to_native(te))) + + result._no_log_values.update(_set_defaults(self.argument_spec, result._validated_parameters)) + + alias_deprecations = [] + _validate_sub_spec(self.argument_spec, result._validated_parameters, + errors=result.errors, + no_log_values=result._no_log_values, + unsupported_parameters=result._unsupported_parameters, + supported_parameters=result._supported_parameters, + alias_deprecations=alias_deprecations,) + for deprecation in alias_deprecations: + result._deprecations.append({ + 'msg': "Alias '%s' is deprecated. See the module docs for more information" % deprecation['name'], + 'version': deprecation.get('version'), + 'date': deprecation.get('date'), + 'collection_name': deprecation.get('collection_name'), + }) + + if result._unsupported_parameters: + flattened_names = [] + for item in result._unsupported_parameters: + if isinstance(item, tuple): + flattened_names.append(".".join(item)) + else: + flattened_names.append(item) + + unsupported_string = ", ".join(sorted(list(flattened_names))) + supported_params = supported_aliases = [] + if result._supported_parameters.get(item): + supported_params = sorted(list(result._supported_parameters[item][0])) + supported_aliases = sorted(list(result._supported_parameters[item][1])) + supported_string = ", ".join(supported_params) + if supported_aliases: + aliases_string = ", ".join(supported_aliases) + supported_string += " (%s)" % aliases_string + + msg = "{0}. Supported parameters include: {1}.".format(unsupported_string, supported_string) + result.errors.append(UnsupportedError(msg)) + + return result + + +class ModuleArgumentSpecValidator(ArgumentSpecValidator): + """Argument spec validation class used by :class:`AnsibleModule`. + + This is not meant to be used outside of :class:`AnsibleModule`. Use + :class:`ArgumentSpecValidator` instead. + """ + + def __init__(self, *args, **kwargs): + super(ModuleArgumentSpecValidator, self).__init__(*args, **kwargs) + + def validate(self, parameters): + result = super(ModuleArgumentSpecValidator, self).validate(parameters) + + for d in result._deprecations: + deprecate(d['msg'], + version=d.get('version'), date=d.get('date'), + collection_name=d.get('collection_name')) + + for w in result._warnings: + warn('Both option {option} and its alias {alias} are set.'.format(option=w['option'], alias=w['alias'])) + + return result |