1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# Copyright: (c) 2021, Ansible Project
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from jinja2.runtime import Undefined
from jinja2.exceptions import UndefinedError
from ansible.errors import AnsibleFilterError, AnsibleFilterTypeError
from ansible.module_utils._text import to_native, to_bytes
from ansible.module_utils.six import string_types, binary_type
from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode
from ansible.parsing.vault import is_encrypted, VaultSecret, VaultLib
from ansible.utils.display import Display
display = Display()
def do_vault(data, secret, salt=None, vaultid='filter_default', wrap_object=False):
if not isinstance(secret, (string_types, binary_type, Undefined)):
raise AnsibleFilterTypeError("Secret passed is required to be a string, instead we got: %s" % type(secret))
if not isinstance(data, (string_types, binary_type, Undefined)):
raise AnsibleFilterTypeError("Can only vault strings, instead we got: %s" % type(data))
vault = ''
vs = VaultSecret(to_bytes(secret))
vl = VaultLib()
try:
vault = vl.encrypt(to_bytes(data), vs, vaultid, salt)
except UndefinedError:
raise
except Exception as e:
raise AnsibleFilterError("Unable to encrypt: %s" % to_native(e), orig_exc=e)
if wrap_object:
vault = AnsibleVaultEncryptedUnicode(vault)
else:
vault = to_native(vault)
return vault
def do_unvault(vault, secret, vaultid='filter_default'):
if not isinstance(secret, (string_types, binary_type, Undefined)):
raise AnsibleFilterTypeError("Secret passed is required to be as string, instead we got: %s" % type(secret))
if not isinstance(vault, (string_types, binary_type, AnsibleVaultEncryptedUnicode, Undefined)):
raise AnsibleFilterTypeError("Vault should be in the form of a string, instead we got: %s" % type(vault))
data = ''
vs = VaultSecret(to_bytes(secret))
vl = VaultLib([(vaultid, vs)])
if isinstance(vault, AnsibleVaultEncryptedUnicode):
vault.vault = vl
data = vault.data
elif is_encrypted(vault):
try:
data = vl.decrypt(vault)
except UndefinedError:
raise
except Exception as e:
raise AnsibleFilterError("Unable to decrypt: %s" % to_native(e), orig_exc=e)
else:
data = vault
return to_native(data)
class FilterModule(object):
''' Ansible vault jinja2 filters '''
def filters(self):
filters = {
'vault': do_vault,
'unvault': do_unvault,
}
return filters
|