summaryrefslogtreecommitdiffstats
path: root/ansible_collections/amazon/aws/plugins/module_utils/acm.py
blob: 4febe874346d2f9a46947c6aa9350f6a78909a2d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# -*- coding: utf-8 -*-

# Copyright (c) 2019 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

# Author:
#   - Matthew Davis <Matthew.Davis.2@team.telstra.com>
#     on behalf of Telstra Corporation Limited
#
# Common functionality to be used by the modules:
#   - acm_certificate
#   - acm_certificate_info

"""
Common Amazon Certificate Manager facts shared between modules
"""

try:
    from botocore.exceptions import BotoCoreError
    from botocore.exceptions import ClientError
except ImportError:
    pass

from ansible.module_utils._text import to_bytes
from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict

from .botocore import is_boto3_error_code
from .retries import AWSRetry
from .tagging import ansible_dict_to_boto3_tag_list
from .tagging import boto3_tag_list_to_ansible_dict


def acm_catch_boto_exception(func):
    def runner(*args, **kwargs):
        module = kwargs.pop("module", None)
        error = kwargs.pop("error", None)
        ignore_error_codes = kwargs.pop("ignore_error_codes", [])

        try:
            return func(*args, **kwargs)
        except is_boto3_error_code(ignore_error_codes):
            return None
        except (BotoCoreError, ClientError) as e:  # pylint: disable=duplicate-except
            if not module:
                raise
            module.fail_json_aws(e, msg=error)

    return runner


class ACMServiceManager:
    """Handles ACM Facts Services"""

    def __init__(self, module):
        self.module = module
        self.client = module.client("acm")

    @acm_catch_boto_exception
    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=["RequestInProgressException"])
    def delete_certificate_with_backoff(self, arn):
        self.client.delete_certificate(CertificateArn=arn)

    @acm_catch_boto_exception
    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=["RequestInProgressException"])
    def list_certificates_with_backoff(self, statuses=None):
        paginator = self.client.get_paginator("list_certificates")
        # `list_certificates` requires explicit key type filter, or it returns only RSA_2048 certificates
        kwargs = {
            "Includes": {
                "keyTypes": [
                    "RSA_1024",
                    "RSA_2048",
                    "RSA_3072",
                    "RSA_4096",
                    "EC_prime256v1",
                    "EC_secp384r1",
                    "EC_secp521r1",
                ],
            },
        }
        if statuses:
            kwargs["CertificateStatuses"] = statuses
        return paginator.paginate(**kwargs).build_full_result()["CertificateSummaryList"]

    @acm_catch_boto_exception
    @AWSRetry.jittered_backoff(
        delay=5, catch_extra_error_codes=["RequestInProgressException", "ResourceNotFoundException"]
    )
    def get_certificate_with_backoff(self, certificate_arn):
        response = self.client.get_certificate(CertificateArn=certificate_arn)
        # strip out response metadata
        return {"Certificate": response["Certificate"], "CertificateChain": response["CertificateChain"]}

    @acm_catch_boto_exception
    @AWSRetry.jittered_backoff(
        delay=5, catch_extra_error_codes=["RequestInProgressException", "ResourceNotFoundException"]
    )
    def describe_certificate_with_backoff(self, certificate_arn):
        return self.client.describe_certificate(CertificateArn=certificate_arn)["Certificate"]

    @acm_catch_boto_exception
    @AWSRetry.jittered_backoff(
        delay=5, catch_extra_error_codes=["RequestInProgressException", "ResourceNotFoundException"]
    )
    def list_certificate_tags_with_backoff(self, certificate_arn):
        return self.client.list_tags_for_certificate(CertificateArn=certificate_arn)["Tags"]

    @acm_catch_boto_exception
    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=["RequestInProgressException"])
    def import_certificate_with_backoff(self, certificate, private_key, certificate_chain, arn):
        params = {"Certificate": to_bytes(certificate), "PrivateKey": to_bytes(private_key)}
        if arn:
            params["CertificateArn"] = arn
        if certificate_chain:
            params["CertificateChain"] = certificate_chain

        return self.client.import_certificate(**params)["CertificateArn"]

    # Tags are a normal Ansible style dict
    # {'Key':'Value'}
    @AWSRetry.jittered_backoff(
        delay=5, catch_extra_error_codes=["RequestInProgressException", "ResourceNotFoundException"]
    )
    def tag_certificate_with_backoff(self, arn, tags):
        aws_tags = ansible_dict_to_boto3_tag_list(tags)
        self.client.add_tags_to_certificate(CertificateArn=arn, Tags=aws_tags)

    def _match_tags(self, ref_tags, cert_tags):
        if ref_tags is None:
            return True
        try:
            return all(k in cert_tags for k in ref_tags) and all(cert_tags.get(k) == ref_tags[k] for k in ref_tags)
        except (TypeError, AttributeError) as e:
            self.module.fail_json_aws(e, msg="ACM tag filtering err")

    def delete_certificate(self, *args, arn=None):
        # hacking for backward compatibility
        if arn is None:
            if len(args) < 3:
                self.module.fail_json(msg="Missing required certificate arn to delete.")
            arn = args[2]
        error = f"Couldn't delete certificate {arn}"
        self.delete_certificate_with_backoff(arn, module=self.module, error=error)

    def get_certificates(self, *args, domain_name=None, statuses=None, arn=None, only_tags=None, **kwargs):
        """
        Returns a list of certificates
        if domain_name is specified, returns only certificates with that domain
        if an ARN is specified, returns only that certificate
        only_tags is a dict, e.g. {'key':'value'}. If specified this function will return
        only certificates which contain all those tags (key exists, value matches).
        """
        all_certificates = self.list_certificates_with_backoff(
            statuses=statuses, module=self.module, error="Couldn't obtain certificates"
        )

        def _filter_certificate(cert):
            if domain_name and cert["DomainName"] != domain_name:
                return False
            if arn and cert["CertificateArn"] != arn:
                return False
            return True

        certificates = list(filter(_filter_certificate, all_certificates))

        results = []
        for certificate in certificates:
            cert_data = self.describe_certificate_with_backoff(
                certificate["CertificateArn"],
                module=self.module,
                error=f"Couldn't obtain certificate metadata for domain {certificate['DomainName']}",
                ignore_error_codes=["ResourceNotFoundException"],
            )
            if cert_data is None:
                continue

            # in some states, ACM resources do not have a corresponding cert
            if cert_data["Status"] not in ("PENDING_VALIDATION", "VALIDATION_TIMED_OUT", "FAILED"):
                cert_info = self.get_certificate_with_backoff(
                    certificate["CertificateArn"],
                    module=self.module,
                    error=f"Couldn't obtain certificate data for domain {certificate['DomainName']}",
                    ignore_error_codes=["ResourceNotFoundException"],
                )
                if cert_info is None:
                    continue
                cert_data.update(cert_info)

            cert_data = camel_dict_to_snake_dict(cert_data)
            tags = self.list_certificate_tags_with_backoff(
                certificate["CertificateArn"],
                module=self.module,
                error=f"Couldn't obtain tags for domain {certificate['DomainName']}",
                ignore_error_codes=["ResourceNotFoundException"],
            )
            if tags is None:
                continue

            tags = boto3_tag_list_to_ansible_dict(tags)
            if not self._match_tags(only_tags, tags):
                continue
            cert_data["tags"] = tags
            results.append(cert_data)
        return results

    def get_domain_of_cert(self, arn, **kwargs):
        """
        returns the domain name of a certificate (encoded in the public cert)
        for a given ARN A cert with that ARN must already exist
        """
        if arn is None:
            self.module.fail_json(msg="Internal error with ACM domain fetching, no certificate ARN specified")
        error = f"Couldn't obtain certificate data for arn {arn}"
        cert_data = self.describe_certificate_with_backoff(certificate_arn=arn, module=self.module, error=error)
        return cert_data["DomainName"]

    def import_certificate(self, *args, certificate, private_key, arn=None, certificate_chain=None, tags=None):
        original_arn = arn

        # upload cert
        params = {
            "certificate": certificate,
            "private_key": private_key,
            "certificate_chain": certificate_chain,
            "arn": arn,
            "module": self.module,
            "error": "Couldn't upload new certificate",
        }
        arn = self.import_certificate_with_backoff(**params)
        if original_arn and (arn != original_arn):
            # I'm not sure whether the API guarentees that the ARN will not change
            # I'm failing just in case.
            # If I'm wrong, I'll catch it in the integration tests.
            self.module.fail_json(msg=f"ARN changed with ACM update, from {original_arn} to {arn}")

        # tag that cert
        try:
            self.tag_certificate_with_backoff(arn, tags)
        except (BotoCoreError, ClientError) as e:
            try:
                self.delete_certificate_with_backoff(arn)
            except (BotoCoreError, ClientError):
                self.module.warn(
                    f"Certificate {arn} exists, and is not tagged. So Ansible will not see it on the next run."
                )
                self.module.fail_json_aws(e, msg=f"Couldn't tag certificate {arn}, couldn't delete it either")
            self.module.fail_json_aws(e, msg=f"Couldn't tag certificate {arn}")

        return arn