diff options
Diffstat (limited to 'ansible_collections/amazon/aws/plugins/modules/cloudtrail.py')
-rw-r--r-- | ansible_collections/amazon/aws/plugins/modules/cloudtrail.py | 256 |
1 files changed, 133 insertions, 123 deletions
diff --git a/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py b/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py index af48e7ea8..597d43f1b 100644 --- a/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py +++ b/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py @@ -1,12 +1,10 @@ #!/usr/bin/python +# -*- coding: utf-8 -*- + # Copyright: Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import absolute_import, division, print_function -__metaclass__ = type - - -DOCUMENTATION = ''' +DOCUMENTATION = r""" --- module: cloudtrail version_added: 5.0.0 @@ -94,14 +92,13 @@ notes: - The I(purge_tags) option was added in release 4.0.0 extends_documentation_fragment: - - amazon.aws.aws - - amazon.aws.ec2 + - amazon.aws.common.modules + - amazon.aws.region.modules - amazon.aws.tags - amazon.aws.boto3 +""" -''' - -EXAMPLES = ''' +EXAMPLES = r""" - name: create single region cloudtrail amazon.aws.cloudtrail: state: present @@ -150,9 +147,9 @@ EXAMPLES = ''' amazon.aws.cloudtrail: state: absent name: default -''' +""" -RETURN = ''' +RETURN = r""" exists: description: whether the resource exists returned: always @@ -244,16 +241,17 @@ trail: returned: success type: dict sample: {'environment': 'dev', 'Name': 'default'} -''' +""" try: - from botocore.exceptions import ClientError, BotoCoreError + from botocore.exceptions import BotoCoreError + from botocore.exceptions import ClientError except ImportError: pass # Handled by AnsibleAWSModule from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict -from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.modules import AnsibleAWSModule from ansible_collections.amazon.aws.plugins.module_utils.tagging import ansible_dict_to_boto3_tag_list from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict from ansible_collections.amazon.aws.plugins.module_utils.tagging import compare_aws_tags @@ -274,7 +272,7 @@ def get_kms_key_aliases(module, client, keyId): # in case user doesn't have kms:ListAliases permissions return [] - return key_resp['Aliases'] + return key_resp["Aliases"] def create_trail(module, client, ct_params): @@ -344,7 +342,7 @@ def get_tag_list(keys, tags): """ tag_list = [] for k in keys: - tag_list.append({'Key': k, 'Value': tags[k]}) + tag_list.append({"Key": k, "Value": tags[k]}) return tag_list @@ -358,13 +356,13 @@ def set_logging(module, client, name, action): name : The name or ARN of the CloudTrail to operate on action : start or stop """ - if action == 'start': + if action == "start": try: client.start_logging(Name=name) return client.get_trail_status(Name=name) except (BotoCoreError, ClientError) as err: module.fail_json_aws(err, msg="Failed to start logging") - elif action == 'stop': + elif action == "stop": try: client.stop_logging(Name=name) return client.get_trail_status(Name=name) @@ -389,18 +387,27 @@ def get_trail_facts(module, client, name): module.fail_json_aws(err, msg="Failed to describe Trail") # Now check to see if our trail exists and get status and tags - if len(trail_resp['trailList']): - trail = trail_resp['trailList'][0] + if len(trail_resp["trailList"]): + trail = trail_resp["trailList"][0] try: - status_resp = client.get_trail_status(Name=trail['Name']) - tags_list = client.list_tags(ResourceIdList=[trail['TrailARN']]) + status_resp = client.get_trail_status(Name=trail["Name"]) + tags_list = client.list_tags(ResourceIdList=[trail["TrailARN"]]) except (BotoCoreError, ClientError) as err: module.fail_json_aws(err, msg="Failed to describe Trail") - trail['IsLogging'] = status_resp['IsLogging'] - trail['tags'] = boto3_tag_list_to_ansible_dict(tags_list['ResourceTagList'][0]['TagsList']) + trail["IsLogging"] = status_resp["IsLogging"] + trail["tags"] = boto3_tag_list_to_ansible_dict(tags_list["ResourceTagList"][0]["TagsList"]) # Check for non-existent values and populate with None - optional_vals = set(['S3KeyPrefix', 'SnsTopicName', 'SnsTopicARN', 'CloudWatchLogsLogGroupArn', 'CloudWatchLogsRoleArn', 'KmsKeyId']) + optional_vals = set( + [ + "S3KeyPrefix", + "SnsTopicName", + "SnsTopicARN", + "CloudWatchLogsLogGroupArn", + "CloudWatchLogsRoleArn", + "KmsKeyId", + ] + ) for v in optional_vals - set(trail.keys()): trail[v] = None return trail @@ -440,160 +447,163 @@ def update_trail(module, client, ct_params): def main(): argument_spec = dict( - state=dict(default='present', choices=['present', 'absent', 'enabled', 'disabled']), - name=dict(default='default'), - enable_logging=dict(default=True, type='bool'), + state=dict(default="present", choices=["present", "absent", "enabled", "disabled"]), + name=dict(default="default"), + enable_logging=dict(default=True, type="bool"), s3_bucket_name=dict(), s3_key_prefix=dict(no_log=False), sns_topic_name=dict(), - is_multi_region_trail=dict(default=False, type='bool'), - enable_log_file_validation=dict(type='bool', aliases=['log_file_validation_enabled']), - include_global_events=dict(default=True, type='bool', aliases=['include_global_service_events']), + is_multi_region_trail=dict(default=False, type="bool"), + enable_log_file_validation=dict(type="bool", aliases=["log_file_validation_enabled"]), + include_global_events=dict(default=True, type="bool", aliases=["include_global_service_events"]), cloudwatch_logs_role_arn=dict(), cloudwatch_logs_log_group_arn=dict(), kms_key_id=dict(), - tags=dict(type='dict', aliases=['resource_tags']), - purge_tags=dict(default=True, type='bool') + tags=dict(type="dict", aliases=["resource_tags"]), + purge_tags=dict(default=True, type="bool"), ) - required_if = [('state', 'present', ['s3_bucket_name']), ('state', 'enabled', ['s3_bucket_name'])] - required_together = [('cloudwatch_logs_role_arn', 'cloudwatch_logs_log_group_arn')] + required_if = [("state", "present", ["s3_bucket_name"]), ("state", "enabled", ["s3_bucket_name"])] + required_together = [("cloudwatch_logs_role_arn", "cloudwatch_logs_log_group_arn")] - module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True, required_together=required_together, required_if=required_if) + module = AnsibleAWSModule( + argument_spec=argument_spec, + supports_check_mode=True, + required_together=required_together, + required_if=required_if, + ) # collect parameters - if module.params['state'] in ('present', 'enabled'): - state = 'present' - elif module.params['state'] in ('absent', 'disabled'): - state = 'absent' - tags = module.params['tags'] - purge_tags = module.params['purge_tags'] - enable_logging = module.params['enable_logging'] + if module.params["state"] in ("present", "enabled"): + state = "present" + elif module.params["state"] in ("absent", "disabled"): + state = "absent" + tags = module.params["tags"] + purge_tags = module.params["purge_tags"] + enable_logging = module.params["enable_logging"] ct_params = dict( - Name=module.params['name'], - S3BucketName=module.params['s3_bucket_name'], - IncludeGlobalServiceEvents=module.params['include_global_events'], - IsMultiRegionTrail=module.params['is_multi_region_trail'], + Name=module.params["name"], + S3BucketName=module.params["s3_bucket_name"], + IncludeGlobalServiceEvents=module.params["include_global_events"], + IsMultiRegionTrail=module.params["is_multi_region_trail"], ) - if module.params['s3_key_prefix']: - ct_params['S3KeyPrefix'] = module.params['s3_key_prefix'].rstrip('/') + if module.params["s3_key_prefix"]: + ct_params["S3KeyPrefix"] = module.params["s3_key_prefix"].rstrip("/") - if module.params['sns_topic_name']: - ct_params['SnsTopicName'] = module.params['sns_topic_name'] + if module.params["sns_topic_name"]: + ct_params["SnsTopicName"] = module.params["sns_topic_name"] - if module.params['cloudwatch_logs_role_arn']: - ct_params['CloudWatchLogsRoleArn'] = module.params['cloudwatch_logs_role_arn'] + if module.params["cloudwatch_logs_role_arn"]: + ct_params["CloudWatchLogsRoleArn"] = module.params["cloudwatch_logs_role_arn"] - if module.params['cloudwatch_logs_log_group_arn']: - ct_params['CloudWatchLogsLogGroupArn'] = module.params['cloudwatch_logs_log_group_arn'] + if module.params["cloudwatch_logs_log_group_arn"]: + ct_params["CloudWatchLogsLogGroupArn"] = module.params["cloudwatch_logs_log_group_arn"] - if module.params['enable_log_file_validation'] is not None: - ct_params['EnableLogFileValidation'] = module.params['enable_log_file_validation'] + if module.params["enable_log_file_validation"] is not None: + ct_params["EnableLogFileValidation"] = module.params["enable_log_file_validation"] if module.params["kms_key_id"] is not None: ct_params["KmsKeyId"] = module.params["kms_key_id"] - client = module.client('cloudtrail') + client = module.client("cloudtrail") region = module.region - results = dict( - changed=False, - exists=False - ) + results = dict(changed=False, exists=False) # Get existing trail facts - trail = get_trail_facts(module, client, ct_params['Name']) + trail = get_trail_facts(module, client, ct_params["Name"]) # If the trail exists set the result exists variable if trail is not None: - results['exists'] = True - initial_kms_key_id = trail.get('KmsKeyId') + results["exists"] = True + initial_kms_key_id = trail.get("KmsKeyId") - if state == 'absent' and results['exists']: + if state == "absent" and results["exists"]: # If Trail exists go ahead and delete - results['changed'] = True - results['exists'] = False - results['trail'] = dict() + results["changed"] = True + results["exists"] = False + results["trail"] = dict() if not module.check_mode: - delete_trail(module, client, trail['TrailARN']) + delete_trail(module, client, trail["TrailARN"]) - elif state == 'present' and results['exists']: + elif state == "present" and results["exists"]: # If Trail exists see if we need to update it do_update = False for key in ct_params: tkey = str(key) # boto3 has inconsistent parameter naming so we handle it here - if key == 'EnableLogFileValidation': - tkey = 'LogFileValidationEnabled' + if key == "EnableLogFileValidation": + tkey = "LogFileValidationEnabled" # We need to make an empty string equal None - if ct_params.get(key) == '': + if ct_params.get(key) == "": val = None else: val = ct_params.get(key) if val != trail.get(tkey): do_update = True - if tkey != 'KmsKeyId': + if tkey != "KmsKeyId": # We'll check if the KmsKeyId casues changes later since # user could've provided a key alias, alias arn, or key id # and trail['KmsKeyId'] is always a key arn - results['changed'] = True + results["changed"] = True # If we are in check mode copy the changed values to the trail facts in result output to show what would change. if module.check_mode: trail.update({tkey: ct_params.get(key)}) if not module.check_mode and do_update: update_trail(module, client, ct_params) - trail = get_trail_facts(module, client, ct_params['Name']) + trail = get_trail_facts(module, client, ct_params["Name"]) # Determine if KmsKeyId changed if not module.check_mode: - if initial_kms_key_id != trail.get('KmsKeyId'): - results['changed'] = True + if initial_kms_key_id != trail.get("KmsKeyId"): + results["changed"] = True else: - new_key = ct_params.get('KmsKeyId') + new_key = ct_params.get("KmsKeyId") if initial_kms_key_id != new_key: # Assume changed for a moment - results['changed'] = True + results["changed"] = True # However, new_key could be a key id, alias arn, or alias name # that maps back to the key arn in initial_kms_key_id. So check # all aliases for a match. - initial_aliases = get_kms_key_aliases(module, module.client('kms'), initial_kms_key_id) + initial_aliases = get_kms_key_aliases(module, module.client("kms"), initial_kms_key_id) for a in initial_aliases: - if a['AliasName'] == new_key or a['AliasArn'] == new_key or a['TargetKeyId'] == new_key: - results['changed'] = False + if a["AliasName"] == new_key or a["AliasArn"] == new_key or a["TargetKeyId"] == new_key: + results["changed"] = False # Check if we need to start/stop logging - if enable_logging and not trail['IsLogging']: - results['changed'] = True - trail['IsLogging'] = True + if enable_logging and not trail["IsLogging"]: + results["changed"] = True + trail["IsLogging"] = True if not module.check_mode: - set_logging(module, client, name=ct_params['Name'], action='start') - if not enable_logging and trail['IsLogging']: - results['changed'] = True - trail['IsLogging'] = False + set_logging(module, client, name=ct_params["Name"], action="start") + if not enable_logging and trail["IsLogging"]: + results["changed"] = True + trail["IsLogging"] = False if not module.check_mode: - set_logging(module, client, name=ct_params['Name'], action='stop') + set_logging(module, client, name=ct_params["Name"], action="stop") # Check if we need to update tags on resource - tags_changed = tag_trail(module, client, tags=tags, trail_arn=trail['TrailARN'], curr_tags=trail['tags'], - purge_tags=purge_tags) + tags_changed = tag_trail( + module, client, tags=tags, trail_arn=trail["TrailARN"], curr_tags=trail["tags"], purge_tags=purge_tags + ) if tags_changed: updated_tags = dict() if not purge_tags: - updated_tags = trail['tags'] + updated_tags = trail["tags"] updated_tags.update(tags) - results['changed'] = True - trail['tags'] = updated_tags + results["changed"] = True + trail["tags"] = updated_tags # Populate trail facts in output - results['trail'] = camel_dict_to_snake_dict(trail, ignore_list=['tags']) + results["trail"] = camel_dict_to_snake_dict(trail, ignore_list=["tags"]) - elif state == 'present' and not results['exists']: + elif state == "present" and not results["exists"]: # Trail doesn't exist just go create it - results['changed'] = True - results['exists'] = True + results["changed"] = True + results["exists"] = True if not module.check_mode: if tags: ct_params["TagsList"] = ansible_dict_to_boto3_tag_list(tags) @@ -601,42 +611,42 @@ def main(): created_trail = create_trail(module, client, ct_params) # Get the trail status try: - status_resp = client.get_trail_status(Name=created_trail['Name']) + status_resp = client.get_trail_status(Name=created_trail["Name"]) except (BotoCoreError, ClientError) as err: module.fail_json_aws(err, msg="Failed to fetch Trail statuc") # Set the logging state for the trail to desired value - if enable_logging and not status_resp['IsLogging']: - set_logging(module, client, name=ct_params['Name'], action='start') - if not enable_logging and status_resp['IsLogging']: - set_logging(module, client, name=ct_params['Name'], action='stop') + if enable_logging and not status_resp["IsLogging"]: + set_logging(module, client, name=ct_params["Name"], action="start") + if not enable_logging and status_resp["IsLogging"]: + set_logging(module, client, name=ct_params["Name"], action="stop") # Get facts for newly created Trail - trail = get_trail_facts(module, client, ct_params['Name']) + trail = get_trail_facts(module, client, ct_params["Name"]) # If we are in check mode create a fake return structure for the newly minted trail if module.check_mode: - acct_id = '123456789012' + acct_id = "123456789012" try: - sts_client = module.client('sts') - acct_id = sts_client.get_caller_identity()['Account'] + sts_client = module.client("sts") + acct_id = sts_client.get_caller_identity()["Account"] except (BotoCoreError, ClientError): pass trail = dict() trail.update(ct_params) - if 'EnableLogFileValidation' not in ct_params: - ct_params['EnableLogFileValidation'] = False - trail['EnableLogFileValidation'] = ct_params['EnableLogFileValidation'] - trail.pop('EnableLogFileValidation') - fake_arn = 'arn:aws:cloudtrail:' + region + ':' + acct_id + ':trail/' + ct_params['Name'] - trail['HasCustomEventSelectors'] = False - trail['HomeRegion'] = region - trail['TrailARN'] = fake_arn - trail['IsLogging'] = enable_logging - trail['tags'] = tags + if "EnableLogFileValidation" not in ct_params: + ct_params["EnableLogFileValidation"] = False + trail["EnableLogFileValidation"] = ct_params["EnableLogFileValidation"] + trail.pop("EnableLogFileValidation") + fake_arn = "arn:aws:cloudtrail:" + region + ":" + acct_id + ":trail/" + ct_params["Name"] + trail["HasCustomEventSelectors"] = False + trail["HomeRegion"] = region + trail["TrailARN"] = fake_arn + trail["IsLogging"] = enable_logging + trail["tags"] = tags # Populate trail facts in output - results['trail'] = camel_dict_to_snake_dict(trail, ignore_list=['tags']) + results["trail"] = camel_dict_to_snake_dict(trail, ignore_list=["tags"]) module.exit_json(**results) -if __name__ == '__main__': +if __name__ == "__main__": main() |