1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import ast
import re
from jinja2.compiler import generate
from jinja2.exceptions import UndefinedError
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateError
from ansible.module_utils.six import text_type
from ansible.module_utils._text import to_native, to_text
from ansible.playbook.attribute import FieldAttribute
from ansible.utils.display import Display
display = Display()
DEFINED_REGEX = re.compile(r'(hostvars\[.+\]|[\w_]+)\s+(not\s+is|is|is\s+not)\s+(defined|undefined)')
LOOKUP_REGEX = re.compile(r'lookup\s*\(')
VALID_VAR_REGEX = re.compile("^[_A-Za-z][_a-zA-Z0-9]*$")
class Conditional:
'''
This is a mix-in class, to be used with Base to allow the object
to be run conditionally when a condition is met or skipped.
'''
when = FieldAttribute(isa='list', default=list, extend=True, prepend=True)
def __init__(self, loader=None):
# when used directly, this class needs a loader, but we want to
# make sure we don't trample on the existing one if this class
# is used as a mix-in with a playbook base class
if not hasattr(self, '_loader'):
if loader is None:
raise AnsibleError("a loader must be specified when using Conditional() directly")
else:
self._loader = loader
super(Conditional, self).__init__()
def _validate_when(self, attr, name, value):
if not isinstance(value, list):
setattr(self, name, [value])
def extract_defined_undefined(self, conditional):
results = []
cond = conditional
m = DEFINED_REGEX.search(cond)
while m:
results.append(m.groups())
cond = cond[m.end():]
m = DEFINED_REGEX.search(cond)
return results
def evaluate_conditional(self, templar, all_vars):
'''
Loops through the conditionals set on this object, returning
False if any of them evaluate as such.
'''
# since this is a mix-in, it may not have an underlying datastructure
# associated with it, so we pull it out now in case we need it for
# error reporting below
ds = None
if hasattr(self, '_ds'):
ds = getattr(self, '_ds')
result = True
try:
for conditional in self.when:
# do evaluation
if conditional is None or conditional == '':
res = True
elif isinstance(conditional, bool):
res = conditional
else:
res = self._check_conditional(conditional, templar, all_vars)
# only update if still true, preserve false
if result:
result = res
display.debug("Evaluated conditional (%s): %s" % (conditional, res))
if not result:
break
except Exception as e:
raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)
return result
def _check_conditional(self, conditional, templar, all_vars):
'''
This method does the low-level evaluation of each conditional
set on this object, using jinja2 to wrap the conditionals for
evaluation.
'''
original = conditional
if templar.is_template(conditional):
display.warning('conditional statements should not include jinja2 '
'templating delimiters such as {{ }} or {%% %%}. '
'Found: %s' % conditional)
# make sure the templar is using the variables specified with this method
templar.available_variables = all_vars
try:
# if the conditional is "unsafe", disable lookups
disable_lookups = hasattr(conditional, '__UNSAFE__')
conditional = templar.template(conditional, disable_lookups=disable_lookups)
if not isinstance(conditional, text_type) or conditional == "":
return conditional
# If the result of the first-pass template render (to resolve inline templates) is marked unsafe,
# explicitly fail since the next templating operation would never evaluate
if hasattr(conditional, '__UNSAFE__'):
raise AnsibleTemplateError('Conditional is marked as unsafe, and cannot be evaluated.')
# First, we do some low-level jinja2 parsing involving the AST format of the
# statement to ensure we don't do anything unsafe (using the disable_lookup flag above)
class CleansingNodeVisitor(ast.NodeVisitor):
def generic_visit(self, node, inside_call=False, inside_yield=False):
if isinstance(node, ast.Call):
inside_call = True
elif isinstance(node, ast.Yield):
inside_yield = True
elif isinstance(node, ast.Str):
if disable_lookups:
if inside_call and node.s.startswith("__"):
# calling things with a dunder is generally bad at this point...
raise AnsibleError(
"Invalid access found in the conditional: '%s'" % conditional
)
elif inside_yield:
# we're inside a yield, so recursively parse and traverse the AST
# of the result to catch forbidden syntax from executing
parsed = ast.parse(node.s, mode='exec')
cnv = CleansingNodeVisitor()
cnv.visit(parsed)
# iterate over all child nodes
for child_node in ast.iter_child_nodes(node):
self.generic_visit(
child_node,
inside_call=inside_call,
inside_yield=inside_yield
)
try:
res = templar.environment.parse(conditional, None, None)
res = generate(res, templar.environment, None, None)
parsed = ast.parse(res, mode='exec')
cnv = CleansingNodeVisitor()
cnv.visit(parsed)
except Exception as e:
raise AnsibleError("Invalid conditional detected: %s" % to_native(e))
# and finally we generate and template the presented string and look at the resulting string
# NOTE The spaces around True and False are intentional to short-circuit literal_eval for
# jinja2_native=False and avoid its expensive calls.
presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional
val = templar.template(presented, disable_lookups=disable_lookups).strip()
if val == "True":
return True
elif val == "False":
return False
else:
raise AnsibleError("unable to evaluate conditional: %s" % original)
except (AnsibleUndefinedVariable, UndefinedError) as e:
# the templating failed, meaning most likely a variable was undefined. If we happened
# to be looking for an undefined variable, return True, otherwise fail
try:
# first we extract the variable name from the error message
var_name = re.compile(r"'(hostvars\[.+\]|[\w_]+)' is undefined").search(str(e)).groups()[0]
# next we extract all defined/undefined tests from the conditional string
def_undef = self.extract_defined_undefined(conditional)
# then we loop through these, comparing the error variable name against
# each def/undef test we found above. If there is a match, we determine
# whether the logic/state mean the variable should exist or not and return
# the corresponding True/False
for (du_var, logic, state) in def_undef:
# when we compare the var names, normalize quotes because something
# like hostvars['foo'] may be tested against hostvars["foo"]
if var_name.replace("'", '"') == du_var.replace("'", '"'):
# the should exist is a xor test between a negation in the logic portion
# against the state (defined or undefined)
should_exist = ('not' in logic) != (state == 'defined')
if should_exist:
return False
else:
return True
# as nothing above matched the failed var name, re-raise here to
# trigger the AnsibleUndefinedVariable exception again below
raise
except Exception:
raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))
|