diff options
Diffstat (limited to 'lib/ansible/playbook/conditional.py')
-rw-r--r-- | lib/ansible/playbook/conditional.py | 197 |
1 files changed, 45 insertions, 152 deletions
diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py index d994f8f..449b4a9 100644 --- a/lib/ansible/playbook/conditional.py +++ b/lib/ansible/playbook/conditional.py @@ -19,28 +19,18 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -import ast -import re +import typing as t -from jinja2.compiler import generate -from jinja2.exceptions import UndefinedError - -from ansible import constants as C from ansible.errors import AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateError -from ansible.module_utils.six import text_type -from ansible.module_utils._text import to_native, to_text +from ansible.module_utils.common.text.converters import to_native from ansible.playbook.attribute import FieldAttribute +from ansible.template import Templar from ansible.utils.display import Display display = Display() -DEFINED_REGEX = re.compile(r'(hostvars\[.+\]|[\w_]+)\s+(not\s+is|is|is\s+not)\s+(defined|undefined)') -LOOKUP_REGEX = re.compile(r'lookup\s*\(') -VALID_VAR_REGEX = re.compile("^[_A-Za-z][_a-zA-Z0-9]*$") - class Conditional: - ''' This is a mix-in class, to be used with Base to allow the object to be run conditionally when a condition is met or skipped. @@ -57,166 +47,69 @@ class Conditional: raise AnsibleError("a loader must be specified when using Conditional() directly") else: self._loader = loader - super(Conditional, self).__init__() + super().__init__() def _validate_when(self, attr, name, value): if not isinstance(value, list): setattr(self, name, [value]) - def extract_defined_undefined(self, conditional): - results = [] - - cond = conditional - m = DEFINED_REGEX.search(cond) - while m: - results.append(m.groups()) - cond = cond[m.end():] - m = DEFINED_REGEX.search(cond) - - return results - - def evaluate_conditional(self, templar, all_vars): + def evaluate_conditional(self, templar: Templar, all_vars: dict[str, t.Any]) -> bool: ''' Loops through the conditionals set on this object, returning False if any of them evaluate as such. ''' - - # since this is a mix-in, it may not have an underlying datastructure - # associated with it, so we pull it out now in case we need it for - # error reporting below - ds = None - if hasattr(self, '_ds'): - ds = getattr(self, '_ds') - - result = True - try: - for conditional in self.when: - - # do evaluation - if conditional is None or conditional == '': - res = True - elif isinstance(conditional, bool): - res = conditional - else: + return self.evaluate_conditional_with_result(templar, all_vars)[0] + + def evaluate_conditional_with_result(self, templar: Templar, all_vars: dict[str, t.Any]) -> tuple[bool, t.Optional[str]]: + """Loops through the conditionals set on this object, returning + False if any of them evaluate as such as well as the condition + that was false. + """ + for conditional in self.when: + if conditional is None or conditional == "": + res = True + elif isinstance(conditional, bool): + res = conditional + else: + try: res = self._check_conditional(conditional, templar, all_vars) + except AnsibleError as e: + raise AnsibleError( + "The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), + obj=getattr(self, '_ds', None) + ) - # only update if still true, preserve false - if result: - result = res + display.debug("Evaluated conditional (%s): %s" % (conditional, res)) + if not res: + return res, conditional - display.debug("Evaluated conditional (%s): %s" % (conditional, res)) - if not result: - break - - except Exception as e: - raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds) - - return result - - def _check_conditional(self, conditional, templar, all_vars): - ''' - This method does the low-level evaluation of each conditional - set on this object, using jinja2 to wrap the conditionals for - evaluation. - ''' + return True, None + def _check_conditional(self, conditional: str, templar: Templar, all_vars: dict[str, t.Any]) -> bool: original = conditional - - if templar.is_template(conditional): - display.warning('conditional statements should not include jinja2 ' - 'templating delimiters such as {{ }} or {%% %%}. ' - 'Found: %s' % conditional) - - # make sure the templar is using the variables specified with this method templar.available_variables = all_vars - try: - # if the conditional is "unsafe", disable lookups - disable_lookups = hasattr(conditional, '__UNSAFE__') - conditional = templar.template(conditional, disable_lookups=disable_lookups) - - if not isinstance(conditional, text_type) or conditional == "": - return conditional + if templar.is_template(conditional): + display.warning( + "conditional statements should not include jinja2 " + "templating delimiters such as {{ }} or {%% %%}. " + "Found: %s" % conditional + ) + conditional = templar.template(conditional) + if isinstance(conditional, bool): + return conditional + elif conditional == "": + return False # If the result of the first-pass template render (to resolve inline templates) is marked unsafe, # explicitly fail since the next templating operation would never evaluate if hasattr(conditional, '__UNSAFE__'): raise AnsibleTemplateError('Conditional is marked as unsafe, and cannot be evaluated.') - # First, we do some low-level jinja2 parsing involving the AST format of the - # statement to ensure we don't do anything unsafe (using the disable_lookup flag above) - class CleansingNodeVisitor(ast.NodeVisitor): - def generic_visit(self, node, inside_call=False, inside_yield=False): - if isinstance(node, ast.Call): - inside_call = True - elif isinstance(node, ast.Yield): - inside_yield = True - elif isinstance(node, ast.Str): - if disable_lookups: - if inside_call and node.s.startswith("__"): - # calling things with a dunder is generally bad at this point... - raise AnsibleError( - "Invalid access found in the conditional: '%s'" % conditional - ) - elif inside_yield: - # we're inside a yield, so recursively parse and traverse the AST - # of the result to catch forbidden syntax from executing - parsed = ast.parse(node.s, mode='exec') - cnv = CleansingNodeVisitor() - cnv.visit(parsed) - # iterate over all child nodes - for child_node in ast.iter_child_nodes(node): - self.generic_visit( - child_node, - inside_call=inside_call, - inside_yield=inside_yield - ) - try: - res = templar.environment.parse(conditional, None, None) - res = generate(res, templar.environment, None, None) - parsed = ast.parse(res, mode='exec') - - cnv = CleansingNodeVisitor() - cnv.visit(parsed) - except Exception as e: - raise AnsibleError("Invalid conditional detected: %s" % to_native(e)) - - # and finally we generate and template the presented string and look at the resulting string # NOTE The spaces around True and False are intentional to short-circuit literal_eval for # jinja2_native=False and avoid its expensive calls. - presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional - val = templar.template(presented, disable_lookups=disable_lookups).strip() - if val == "True": - return True - elif val == "False": - return False - else: - raise AnsibleError("unable to evaluate conditional: %s" % original) - except (AnsibleUndefinedVariable, UndefinedError) as e: - # the templating failed, meaning most likely a variable was undefined. If we happened - # to be looking for an undefined variable, return True, otherwise fail - try: - # first we extract the variable name from the error message - var_name = re.compile(r"'(hostvars\[.+\]|[\w_]+)' is undefined").search(str(e)).groups()[0] - # next we extract all defined/undefined tests from the conditional string - def_undef = self.extract_defined_undefined(conditional) - # then we loop through these, comparing the error variable name against - # each def/undef test we found above. If there is a match, we determine - # whether the logic/state mean the variable should exist or not and return - # the corresponding True/False - for (du_var, logic, state) in def_undef: - # when we compare the var names, normalize quotes because something - # like hostvars['foo'] may be tested against hostvars["foo"] - if var_name.replace("'", '"') == du_var.replace("'", '"'): - # the should exist is a xor test between a negation in the logic portion - # against the state (defined or undefined) - should_exist = ('not' in logic) != (state == 'defined') - if should_exist: - return False - else: - return True - # as nothing above matched the failed var name, re-raise here to - # trigger the AnsibleUndefinedVariable exception again below - raise - except Exception: - raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e)) + return templar.template( + "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional, + ).strip() == "True" + except AnsibleUndefinedVariable as e: + raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e)) |