1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# -*- coding: utf-8 -*-
# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
from ansible.module_utils.common.text.converters import to_native, to_bytes
from ansible_collections.community.crypto.plugins.plugin_utils.action_module import ActionModuleBase
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey import (
select_backend,
get_privatekey_argument_spec,
)
class PrivateKeyModule(object):
def __init__(self, module, module_backend):
self.module = module
self.module_backend = module_backend
self.check_mode = module.check_mode
self.changed = False
self.return_current_key = module.params['return_current_key']
if module.params['content'] is not None:
if module.params['content_base64']:
try:
data = base64.b64decode(module.params['content'])
except Exception as e:
module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e))
else:
data = to_bytes(module.params['content'])
module_backend.set_existing(data)
def generate(self, module):
"""Generate a keypair."""
if self.module_backend.needs_regeneration():
# Regenerate
if not self.check_mode:
self.module_backend.generate_private_key()
privatekey_data = self.module_backend.get_private_key_data()
self.privatekey_bytes = privatekey_data
else:
self.module.deprecate(
'Check mode support for openssl_privatekey_pipe will change in community.crypto 3.0.0'
' to behave the same as without check mode. You can get that behavior right now'
' by adding `check_mode: false` to the openssl_privatekey_pipe task. If you think this'
' breaks your use-case of this module, please create an issue in the'
' community.crypto repository',
version='3.0.0',
collection_name='community.crypto',
)
self.changed = True
elif self.module_backend.needs_conversion():
# Convert
if not self.check_mode:
self.module_backend.convert_private_key()
privatekey_data = self.module_backend.get_private_key_data()
self.privatekey_bytes = privatekey_data
else:
self.module.deprecate(
'Check mode support for openssl_privatekey_pipe will change in community.crypto 3.0.0'
' to behave the same as without check mode. You can get that behavior right now'
' by adding `check_mode: false` to the openssl_privatekey_pipe task. If you think this'
' breaks your use-case of this module, please create an issue in the'
' community.crypto repository',
version='3.0.0',
collection_name='community.crypto',
)
self.changed = True
def dump(self):
"""Serialize the object into a dictionary."""
result = self.module_backend.dump(include_key=self.changed or self.return_current_key)
result['changed'] = self.changed
return result
class ActionModule(ActionModuleBase):
@staticmethod
def setup_module():
argument_spec = get_privatekey_argument_spec()
argument_spec.argument_spec.update(dict(
content=dict(type='str', no_log=True),
content_base64=dict(type='bool', default=False),
return_current_key=dict(type='bool', default=False),
))
return argument_spec, dict(
supports_check_mode=True,
)
@staticmethod
def run_module(module):
backend, module_backend = select_backend(
module=module,
backend=module.params['select_crypto_backend'],
)
try:
private_key = PrivateKeyModule(module, module_backend)
private_key.generate(module)
result = private_key.dump()
if private_key.return_current_key:
# In case the module's input (`content`) is returned as `privatekey`:
# Since `content` is no_log=True, `privatekey`'s value will get replaced by
# VALUE_SPECIFIED_IN_NO_LOG_PARAMETER. To avoid this, we remove the value of
# `content` from module.no_log_values. Since we explicitly set
# `module.no_log = True`, this should be safe.
module.no_log = True
try:
module.no_log_values.remove(module.params['content'])
except KeyError:
pass
module.params['content'] = 'ANSIBLE_NO_LOG_VALUE'
module.exit_json(**result)
except OpenSSLObjectError as exc:
module.fail_json(msg=to_native(exc))
|