summaryrefslogtreecommitdiffstats
path: root/lib/ansible/playbook/conditional.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/playbook/conditional.py')
-rw-r--r--lib/ansible/playbook/conditional.py197
1 files changed, 45 insertions, 152 deletions
diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py
index d994f8f..449b4a9 100644
--- a/lib/ansible/playbook/conditional.py
+++ b/lib/ansible/playbook/conditional.py
@@ -19,28 +19,18 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
-import ast
-import re
+import typing as t
-from jinja2.compiler import generate
-from jinja2.exceptions import UndefinedError
-
-from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateError
-from ansible.module_utils.six import text_type
-from ansible.module_utils._text import to_native, to_text
+from ansible.module_utils.common.text.converters import to_native
from ansible.playbook.attribute import FieldAttribute
+from ansible.template import Templar
from ansible.utils.display import Display
display = Display()
-DEFINED_REGEX = re.compile(r'(hostvars\[.+\]|[\w_]+)\s+(not\s+is|is|is\s+not)\s+(defined|undefined)')
-LOOKUP_REGEX = re.compile(r'lookup\s*\(')
-VALID_VAR_REGEX = re.compile("^[_A-Za-z][_a-zA-Z0-9]*$")
-
class Conditional:
-
'''
This is a mix-in class, to be used with Base to allow the object
to be run conditionally when a condition is met or skipped.
@@ -57,166 +47,69 @@ class Conditional:
raise AnsibleError("a loader must be specified when using Conditional() directly")
else:
self._loader = loader
- super(Conditional, self).__init__()
+ super().__init__()
def _validate_when(self, attr, name, value):
if not isinstance(value, list):
setattr(self, name, [value])
- def extract_defined_undefined(self, conditional):
- results = []
-
- cond = conditional
- m = DEFINED_REGEX.search(cond)
- while m:
- results.append(m.groups())
- cond = cond[m.end():]
- m = DEFINED_REGEX.search(cond)
-
- return results
-
- def evaluate_conditional(self, templar, all_vars):
+ def evaluate_conditional(self, templar: Templar, all_vars: dict[str, t.Any]) -> bool:
'''
Loops through the conditionals set on this object, returning
False if any of them evaluate as such.
'''
-
- # since this is a mix-in, it may not have an underlying datastructure
- # associated with it, so we pull it out now in case we need it for
- # error reporting below
- ds = None
- if hasattr(self, '_ds'):
- ds = getattr(self, '_ds')
-
- result = True
- try:
- for conditional in self.when:
-
- # do evaluation
- if conditional is None or conditional == '':
- res = True
- elif isinstance(conditional, bool):
- res = conditional
- else:
+ return self.evaluate_conditional_with_result(templar, all_vars)[0]
+
+ def evaluate_conditional_with_result(self, templar: Templar, all_vars: dict[str, t.Any]) -> tuple[bool, t.Optional[str]]:
+ """Loops through the conditionals set on this object, returning
+ False if any of them evaluate as such as well as the condition
+ that was false.
+ """
+ for conditional in self.when:
+ if conditional is None or conditional == "":
+ res = True
+ elif isinstance(conditional, bool):
+ res = conditional
+ else:
+ try:
res = self._check_conditional(conditional, templar, all_vars)
+ except AnsibleError as e:
+ raise AnsibleError(
+ "The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)),
+ obj=getattr(self, '_ds', None)
+ )
- # only update if still true, preserve false
- if result:
- result = res
+ display.debug("Evaluated conditional (%s): %s" % (conditional, res))
+ if not res:
+ return res, conditional
- display.debug("Evaluated conditional (%s): %s" % (conditional, res))
- if not result:
- break
-
- except Exception as e:
- raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)
-
- return result
-
- def _check_conditional(self, conditional, templar, all_vars):
- '''
- This method does the low-level evaluation of each conditional
- set on this object, using jinja2 to wrap the conditionals for
- evaluation.
- '''
+ return True, None
+ def _check_conditional(self, conditional: str, templar: Templar, all_vars: dict[str, t.Any]) -> bool:
original = conditional
-
- if templar.is_template(conditional):
- display.warning('conditional statements should not include jinja2 '
- 'templating delimiters such as {{ }} or {%% %%}. '
- 'Found: %s' % conditional)
-
- # make sure the templar is using the variables specified with this method
templar.available_variables = all_vars
-
try:
- # if the conditional is "unsafe", disable lookups
- disable_lookups = hasattr(conditional, '__UNSAFE__')
- conditional = templar.template(conditional, disable_lookups=disable_lookups)
-
- if not isinstance(conditional, text_type) or conditional == "":
- return conditional
+ if templar.is_template(conditional):
+ display.warning(
+ "conditional statements should not include jinja2 "
+ "templating delimiters such as {{ }} or {%% %%}. "
+ "Found: %s" % conditional
+ )
+ conditional = templar.template(conditional)
+ if isinstance(conditional, bool):
+ return conditional
+ elif conditional == "":
+ return False
# If the result of the first-pass template render (to resolve inline templates) is marked unsafe,
# explicitly fail since the next templating operation would never evaluate
if hasattr(conditional, '__UNSAFE__'):
raise AnsibleTemplateError('Conditional is marked as unsafe, and cannot be evaluated.')
- # First, we do some low-level jinja2 parsing involving the AST format of the
- # statement to ensure we don't do anything unsafe (using the disable_lookup flag above)
- class CleansingNodeVisitor(ast.NodeVisitor):
- def generic_visit(self, node, inside_call=False, inside_yield=False):
- if isinstance(node, ast.Call):
- inside_call = True
- elif isinstance(node, ast.Yield):
- inside_yield = True
- elif isinstance(node, ast.Str):
- if disable_lookups:
- if inside_call and node.s.startswith("__"):
- # calling things with a dunder is generally bad at this point...
- raise AnsibleError(
- "Invalid access found in the conditional: '%s'" % conditional
- )
- elif inside_yield:
- # we're inside a yield, so recursively parse and traverse the AST
- # of the result to catch forbidden syntax from executing
- parsed = ast.parse(node.s, mode='exec')
- cnv = CleansingNodeVisitor()
- cnv.visit(parsed)
- # iterate over all child nodes
- for child_node in ast.iter_child_nodes(node):
- self.generic_visit(
- child_node,
- inside_call=inside_call,
- inside_yield=inside_yield
- )
- try:
- res = templar.environment.parse(conditional, None, None)
- res = generate(res, templar.environment, None, None)
- parsed = ast.parse(res, mode='exec')
-
- cnv = CleansingNodeVisitor()
- cnv.visit(parsed)
- except Exception as e:
- raise AnsibleError("Invalid conditional detected: %s" % to_native(e))
-
- # and finally we generate and template the presented string and look at the resulting string
# NOTE The spaces around True and False are intentional to short-circuit literal_eval for
# jinja2_native=False and avoid its expensive calls.
- presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional
- val = templar.template(presented, disable_lookups=disable_lookups).strip()
- if val == "True":
- return True
- elif val == "False":
- return False
- else:
- raise AnsibleError("unable to evaluate conditional: %s" % original)
- except (AnsibleUndefinedVariable, UndefinedError) as e:
- # the templating failed, meaning most likely a variable was undefined. If we happened
- # to be looking for an undefined variable, return True, otherwise fail
- try:
- # first we extract the variable name from the error message
- var_name = re.compile(r"'(hostvars\[.+\]|[\w_]+)' is undefined").search(str(e)).groups()[0]
- # next we extract all defined/undefined tests from the conditional string
- def_undef = self.extract_defined_undefined(conditional)
- # then we loop through these, comparing the error variable name against
- # each def/undef test we found above. If there is a match, we determine
- # whether the logic/state mean the variable should exist or not and return
- # the corresponding True/False
- for (du_var, logic, state) in def_undef:
- # when we compare the var names, normalize quotes because something
- # like hostvars['foo'] may be tested against hostvars["foo"]
- if var_name.replace("'", '"') == du_var.replace("'", '"'):
- # the should exist is a xor test between a negation in the logic portion
- # against the state (defined or undefined)
- should_exist = ('not' in logic) != (state == 'defined')
- if should_exist:
- return False
- else:
- return True
- # as nothing above matched the failed var name, re-raise here to
- # trigger the AnsibleUndefinedVariable exception again below
- raise
- except Exception:
- raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))
+ return templar.template(
+ "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional,
+ ).strip() == "True"
+ except AnsibleUndefinedVariable as e:
+ raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))