1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r'''
---
author: Felix Fontein (@felixfontein)
module: sops_encrypt
short_description: Encrypt data with sops
version_added: '0.1.0'
description:
- Allows to encrypt binary data (Base64 encoded), text data, JSON or YAML data with sops.
options:
path:
description:
- The sops encrypt file.
type: path
required: true
force:
description:
- Force rewriting the encrypted file.
type: bool
default: false
content_text:
description:
- The data to encrypt. Must be a Unicode text.
- Please note that the module might not be idempotent if the text can be parsed as JSON or YAML.
- Exactly one of O(content_text), O(content_binary), O(content_json), and O(content_yaml) must be specified.
type: str
content_binary:
description:
- The data to encrypt. Must be L(Base64 encoded,https://en.wikipedia.org/wiki/Base64) binary data.
- Please note that the module might not be idempotent if the data can be parsed as JSON or YAML.
- Exactly one of O(content_text), O(content_binary), O(content_json), and O(content_yaml) must be specified.
type: str
content_json:
description:
- The data to encrypt. Must be a JSON dictionary.
- Exactly one of O(content_text), O(content_binary), O(content_json), and O(content_yaml) must be specified.
type: dict
content_yaml:
description:
- The data to encrypt. Must be a YAML dictionary.
- Please note that Ansible only allows to pass data that can be represented as a JSON dictionary.
- Exactly one of O(content_text), O(content_binary), O(content_json), and O(content_yaml) must be specified.
type: dict
extends_documentation_fragment:
- ansible.builtin.files
- community.sops.sops
- community.sops.sops.encrypt_specific
- community.sops.attributes
- community.sops.attributes.files
attributes:
check_mode:
support: full
diff_mode:
support: none
safe_file_operations:
support: full
seealso:
- plugin: community.sops.sops
plugin_type: lookup
description: The sops lookup can be used decrypt sops-encrypted files.
'''
EXAMPLES = r'''
- name: Encrypt a secret text
community.sops.sops_encrypt:
path: text-data.sops
content_text: This is a secret text.
- name: Encrypt the contents of a file
community.sops.sops_encrypt:
path: binary-data.sops
content_binary: "{{ lookup('ansible.builtin.file', '/path/to/file', rstrip=false) | b64encode }}"
- name: Encrypt some datastructure as YAML
community.sops.sops_encrypt:
path: stuff.sops.yaml
content_yaml: "{{ result }}"
'''
RETURN = r''' # '''
import base64
import json
import os
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.sops.plugins.module_utils.io import write_file
from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError, get_sops_argument_spec
try:
import yaml
YAML_IMP_ERR = None
HAS_YAML = True
except ImportError:
YAML_IMP_ERR = traceback.format_exc()
HAS_YAML = False
yaml = None
def get_data_type(module):
if module.params['content_text'] is not None:
return 'binary'
if module.params['content_binary'] is not None:
return 'binary'
if module.params['content_json'] is not None:
return 'json'
if module.params['content_yaml'] is not None:
return 'yaml'
module.fail_json(msg='Internal error: unknown content type')
def compare_encoded_content(module, binary_data, content):
if module.params['content_text'] is not None:
return content == module.params['content_text'].encode('utf-8')
if module.params['content_binary'] is not None:
return content == binary_data
if module.params['content_json'] is not None:
# Compare JSON
try:
return json.loads(content) == module.params['content_json']
except Exception:
# Treat parsing errors as content not equal
return False
if module.params['content_yaml'] is not None:
# Compare YAML
try:
return yaml.safe_load(content) == module.params['content_yaml']
except Exception:
# Treat parsing errors as content not equal
return False
module.fail_json(msg='Internal error: unknown content type')
def get_encoded_type_content(module, binary_data):
if module.params['content_text'] is not None:
return 'binary', module.params['content_text'].encode('utf-8')
if module.params['content_binary'] is not None:
return 'binary', binary_data
if module.params['content_json'] is not None:
return 'json', json.dumps(module.params['content_json']).encode('utf-8')
if module.params['content_yaml'] is not None:
return 'yaml', yaml.safe_dump(module.params['content_yaml']).encode('utf-8')
module.fail_json(msg='Internal error: unknown content type')
def main():
argument_spec = dict(
path=dict(type='path', required=True),
force=dict(type='bool', default=False),
content_text=dict(type='str', no_log=True),
content_binary=dict(type='str', no_log=True),
content_json=dict(type='dict', no_log=True),
content_yaml=dict(type='dict', no_log=True),
)
argument_spec.update(get_sops_argument_spec(add_encrypt_specific=True))
module = AnsibleModule(
argument_spec=argument_spec,
mutually_exclusive=[
('content_text', 'content_binary', 'content_json', 'content_yaml'),
],
required_one_of=[
('content_text', 'content_binary', 'content_json', 'content_yaml'),
],
supports_check_mode=True,
add_file_common_args=True,
)
# Check YAML
if module.params['content_yaml'] is not None and not HAS_YAML:
module.fail_json(msg=missing_required_lib('pyyaml'), exception=YAML_IMP_ERR)
# Decode binary data
binary_data = None
if module.params['content_binary'] is not None:
try:
binary_data = base64.b64decode(module.params['content_binary'])
except Exception as e:
module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e))
path = module.params['path']
directory = os.path.dirname(path) or None
changed = False
def get_option_value(argument_name):
return module.params.get(argument_name)
try:
if module.params['force'] or not os.path.exists(path):
# Simply encrypt
changed = True
else:
# Change detection: check if encrypted data equals new data
decrypted_content = Sops.decrypt(
path, decode_output=False, output_type=get_data_type(module), rstrip=False,
get_option_value=get_option_value, module=module,
)
if not compare_encoded_content(module, binary_data, decrypted_content):
changed = True
if changed and not module.check_mode:
input_type, input_data = get_encoded_type_content(module, binary_data)
output_type = None
if path.endswith('.json'):
output_type = 'json'
elif path.endswith('.yaml') or path.endswith('.yml'):
output_type = 'yaml'
data = Sops.encrypt(
data=input_data, cwd=directory, input_type=input_type, output_type=output_type,
get_option_value=get_option_value, module=module,
)
write_file(module, data)
except SopsError as e:
module.fail_json(msg=to_text(e))
file_args = module.load_file_common_arguments(module.params)
changed = module.set_fs_attributes_if_different(file_args, changed)
module.exit_json(changed=changed)
if __name__ == '__main__':
main()
|