summaryrefslogtreecommitdiffstats
path: root/lib/ansible/playbook/conditional.py
blob: d994f8f49d7b8d1a67519404a27e5d5ebebfd7a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.

# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import ast
import re

from jinja2.compiler import generate
from jinja2.exceptions import UndefinedError

from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateError
from ansible.module_utils.six import text_type
from ansible.module_utils._text import to_native, to_text
from ansible.playbook.attribute import FieldAttribute
from ansible.utils.display import Display

display = Display()

DEFINED_REGEX = re.compile(r'(hostvars\[.+\]|[\w_]+)\s+(not\s+is|is|is\s+not)\s+(defined|undefined)')
LOOKUP_REGEX = re.compile(r'lookup\s*\(')
VALID_VAR_REGEX = re.compile("^[_A-Za-z][_a-zA-Z0-9]*$")


class Conditional:

    '''
    This is a mix-in class, to be used with Base to allow the object
    to be run conditionally when a condition is met or skipped.
    '''

    when = FieldAttribute(isa='list', default=list, extend=True, prepend=True)

    def __init__(self, loader=None):
        # when used directly, this class needs a loader, but we want to
        # make sure we don't trample on the existing one if this class
        # is used as a mix-in with a playbook base class
        if not hasattr(self, '_loader'):
            if loader is None:
                raise AnsibleError("a loader must be specified when using Conditional() directly")
            else:
                self._loader = loader
        super(Conditional, self).__init__()

    def _validate_when(self, attr, name, value):
        if not isinstance(value, list):
            setattr(self, name, [value])

    def extract_defined_undefined(self, conditional):
        results = []

        cond = conditional
        m = DEFINED_REGEX.search(cond)
        while m:
            results.append(m.groups())
            cond = cond[m.end():]
            m = DEFINED_REGEX.search(cond)

        return results

    def evaluate_conditional(self, templar, all_vars):
        '''
        Loops through the conditionals set on this object, returning
        False if any of them evaluate as such.
        '''

        # since this is a mix-in, it may not have an underlying datastructure
        # associated with it, so we pull it out now in case we need it for
        # error reporting below
        ds = None
        if hasattr(self, '_ds'):
            ds = getattr(self, '_ds')

        result = True
        try:
            for conditional in self.when:

                # do evaluation
                if conditional is None or conditional == '':
                    res = True
                elif isinstance(conditional, bool):
                    res = conditional
                else:
                    res = self._check_conditional(conditional, templar, all_vars)

                # only update if still true, preserve false
                if result:
                    result = res

                display.debug("Evaluated conditional (%s): %s" % (conditional, res))
                if not result:
                    break

        except Exception as e:
            raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)

        return result

    def _check_conditional(self, conditional, templar, all_vars):
        '''
        This method does the low-level evaluation of each conditional
        set on this object, using jinja2 to wrap the conditionals for
        evaluation.
        '''

        original = conditional

        if templar.is_template(conditional):
            display.warning('conditional statements should not include jinja2 '
                            'templating delimiters such as {{ }} or {%% %%}. '
                            'Found: %s' % conditional)

        # make sure the templar is using the variables specified with this method
        templar.available_variables = all_vars

        try:
            # if the conditional is "unsafe", disable lookups
            disable_lookups = hasattr(conditional, '__UNSAFE__')
            conditional = templar.template(conditional, disable_lookups=disable_lookups)

            if not isinstance(conditional, text_type) or conditional == "":
                return conditional

            # If the result of the first-pass template render (to resolve inline templates) is marked unsafe,
            # explicitly fail since the next templating operation would never evaluate
            if hasattr(conditional, '__UNSAFE__'):
                raise AnsibleTemplateError('Conditional is marked as unsafe, and cannot be evaluated.')

            # First, we do some low-level jinja2 parsing involving the AST format of the
            # statement to ensure we don't do anything unsafe (using the disable_lookup flag above)
            class CleansingNodeVisitor(ast.NodeVisitor):
                def generic_visit(self, node, inside_call=False, inside_yield=False):
                    if isinstance(node, ast.Call):
                        inside_call = True
                    elif isinstance(node, ast.Yield):
                        inside_yield = True
                    elif isinstance(node, ast.Str):
                        if disable_lookups:
                            if inside_call and node.s.startswith("__"):
                                # calling things with a dunder is generally bad at this point...
                                raise AnsibleError(
                                    "Invalid access found in the conditional: '%s'" % conditional
                                )
                            elif inside_yield:
                                # we're inside a yield, so recursively parse and traverse the AST
                                # of the result to catch forbidden syntax from executing
                                parsed = ast.parse(node.s, mode='exec')
                                cnv = CleansingNodeVisitor()
                                cnv.visit(parsed)
                    # iterate over all child nodes
                    for child_node in ast.iter_child_nodes(node):
                        self.generic_visit(
                            child_node,
                            inside_call=inside_call,
                            inside_yield=inside_yield
                        )
            try:
                res = templar.environment.parse(conditional, None, None)
                res = generate(res, templar.environment, None, None)
                parsed = ast.parse(res, mode='exec')

                cnv = CleansingNodeVisitor()
                cnv.visit(parsed)
            except Exception as e:
                raise AnsibleError("Invalid conditional detected: %s" % to_native(e))

            # and finally we generate and template the presented string and look at the resulting string
            # NOTE The spaces around True and False are intentional to short-circuit literal_eval for
            #      jinja2_native=False and avoid its expensive calls.
            presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional
            val = templar.template(presented, disable_lookups=disable_lookups).strip()
            if val == "True":
                return True
            elif val == "False":
                return False
            else:
                raise AnsibleError("unable to evaluate conditional: %s" % original)
        except (AnsibleUndefinedVariable, UndefinedError) as e:
            # the templating failed, meaning most likely a variable was undefined. If we happened
            # to be looking for an undefined variable, return True, otherwise fail
            try:
                # first we extract the variable name from the error message
                var_name = re.compile(r"'(hostvars\[.+\]|[\w_]+)' is undefined").search(str(e)).groups()[0]
                # next we extract all defined/undefined tests from the conditional string
                def_undef = self.extract_defined_undefined(conditional)
                # then we loop through these, comparing the error variable name against
                # each def/undef test we found above. If there is a match, we determine
                # whether the logic/state mean the variable should exist or not and return
                # the corresponding True/False
                for (du_var, logic, state) in def_undef:
                    # when we compare the var names, normalize quotes because something
                    # like hostvars['foo'] may be tested against hostvars["foo"]
                    if var_name.replace("'", '"') == du_var.replace("'", '"'):
                        # the should exist is a xor test between a negation in the logic portion
                        # against the state (defined or undefined)
                        should_exist = ('not' in logic) != (state == 'defined')
                        if should_exist:
                            return False
                        else:
                            return True
                # as nothing above matched the failed var name, re-raise here to
                # trigger the AnsibleUndefinedVariable exception again below
                raise
            except Exception:
                raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))