summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/crypto/plugins/action/openssl_privatekey_pipe.py
blob: dc864ab01c3ac4dfdfc1552268e0821e43f4e97e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-

# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function
__metaclass__ = type


import base64

from ansible.module_utils.common.text.converters import to_native, to_bytes

from ansible_collections.community.crypto.plugins.plugin_utils.action_module import ActionModuleBase

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
    OpenSSLObjectError,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey import (
    select_backend,
    get_privatekey_argument_spec,
)


class PrivateKeyModule(object):
    def __init__(self, module, module_backend):
        self.module = module
        self.module_backend = module_backend
        self.check_mode = module.check_mode
        self.changed = False
        self.return_current_key = module.params['return_current_key']

        if module.params['content'] is not None:
            if module.params['content_base64']:
                try:
                    data = base64.b64decode(module.params['content'])
                except Exception as e:
                    module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e))
            else:
                data = to_bytes(module.params['content'])
            module_backend.set_existing(data)

    def generate(self, module):
        """Generate a keypair."""

        if self.module_backend.needs_regeneration():
            # Regenerate
            if not self.check_mode:
                self.module_backend.generate_private_key()
                privatekey_data = self.module_backend.get_private_key_data()
                self.privatekey_bytes = privatekey_data
            else:
                self.module.deprecate(
                    'Check mode support for openssl_privatekey_pipe will change in community.crypto 3.0.0'
                    ' to behave the same as without check mode. You can get that behavior right now'
                    ' by adding `check_mode: false` to the openssl_privatekey_pipe task. If you think this'
                    ' breaks your use-case of this module, please create an issue in the'
                    ' community.crypto repository',
                    version='3.0.0',
                    collection_name='community.crypto',
                )
            self.changed = True
        elif self.module_backend.needs_conversion():
            # Convert
            if not self.check_mode:
                self.module_backend.convert_private_key()
                privatekey_data = self.module_backend.get_private_key_data()
                self.privatekey_bytes = privatekey_data
            else:
                self.module.deprecate(
                    'Check mode support for openssl_privatekey_pipe will change in community.crypto 3.0.0'
                    ' to behave the same as without check mode. You can get that behavior right now'
                    ' by adding `check_mode: false` to the openssl_privatekey_pipe task. If you think this'
                    ' breaks your use-case of this module, please create an issue in the'
                    ' community.crypto repository',
                    version='3.0.0',
                    collection_name='community.crypto',
                )
            self.changed = True

    def dump(self):
        """Serialize the object into a dictionary."""
        result = self.module_backend.dump(include_key=self.changed or self.return_current_key)
        result['changed'] = self.changed
        return result


class ActionModule(ActionModuleBase):
    @staticmethod
    def setup_module():
        argument_spec = get_privatekey_argument_spec()
        argument_spec.argument_spec.update(dict(
            content=dict(type='str', no_log=True),
            content_base64=dict(type='bool', default=False),
            return_current_key=dict(type='bool', default=False),
        ))
        return argument_spec, dict(
            supports_check_mode=True,
        )

    @staticmethod
    def run_module(module):
        backend, module_backend = select_backend(
            module=module,
            backend=module.params['select_crypto_backend'],
        )

        try:
            private_key = PrivateKeyModule(module, module_backend)
            private_key.generate(module)
            result = private_key.dump()
            if private_key.return_current_key:
                # In case the module's input (`content`) is returned as `privatekey`:
                # Since `content` is no_log=True, `privatekey`'s value will get replaced by
                # VALUE_SPECIFIED_IN_NO_LOG_PARAMETER. To avoid this, we remove the value of
                # `content` from module.no_log_values. Since we explicitly set
                # `module.no_log = True`, this should be safe.
                module.no_log = True
                try:
                    module.no_log_values.remove(module.params['content'])
                except KeyError:
                    pass
                module.params['content'] = 'ANSIBLE_NO_LOG_VALUE'
            module.exit_json(**result)
        except OpenSSLObjectError as exc:
            module.fail_json(msg=to_native(exc))