summaryrefslogtreecommitdiffstats
path: root/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/amazon/aws/plugins/modules/cloudtrail.py')
-rw-r--r--ansible_collections/amazon/aws/plugins/modules/cloudtrail.py256
1 files changed, 133 insertions, 123 deletions
diff --git a/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py b/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py
index af48e7ea8..597d43f1b 100644
--- a/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py
+++ b/ansible_collections/amazon/aws/plugins/modules/cloudtrail.py
@@ -1,12 +1,10 @@
#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
# Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-
-DOCUMENTATION = '''
+DOCUMENTATION = r"""
---
module: cloudtrail
version_added: 5.0.0
@@ -94,14 +92,13 @@ notes:
- The I(purge_tags) option was added in release 4.0.0
extends_documentation_fragment:
- - amazon.aws.aws
- - amazon.aws.ec2
+ - amazon.aws.common.modules
+ - amazon.aws.region.modules
- amazon.aws.tags
- amazon.aws.boto3
+"""
-'''
-
-EXAMPLES = '''
+EXAMPLES = r"""
- name: create single region cloudtrail
amazon.aws.cloudtrail:
state: present
@@ -150,9 +147,9 @@ EXAMPLES = '''
amazon.aws.cloudtrail:
state: absent
name: default
-'''
+"""
-RETURN = '''
+RETURN = r"""
exists:
description: whether the resource exists
returned: always
@@ -244,16 +241,17 @@ trail:
returned: success
type: dict
sample: {'environment': 'dev', 'Name': 'default'}
-'''
+"""
try:
- from botocore.exceptions import ClientError, BotoCoreError
+ from botocore.exceptions import BotoCoreError
+ from botocore.exceptions import ClientError
except ImportError:
pass # Handled by AnsibleAWSModule
from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict
-from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
+from ansible_collections.amazon.aws.plugins.module_utils.modules import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.tagging import ansible_dict_to_boto3_tag_list
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict
from ansible_collections.amazon.aws.plugins.module_utils.tagging import compare_aws_tags
@@ -274,7 +272,7 @@ def get_kms_key_aliases(module, client, keyId):
# in case user doesn't have kms:ListAliases permissions
return []
- return key_resp['Aliases']
+ return key_resp["Aliases"]
def create_trail(module, client, ct_params):
@@ -344,7 +342,7 @@ def get_tag_list(keys, tags):
"""
tag_list = []
for k in keys:
- tag_list.append({'Key': k, 'Value': tags[k]})
+ tag_list.append({"Key": k, "Value": tags[k]})
return tag_list
@@ -358,13 +356,13 @@ def set_logging(module, client, name, action):
name : The name or ARN of the CloudTrail to operate on
action : start or stop
"""
- if action == 'start':
+ if action == "start":
try:
client.start_logging(Name=name)
return client.get_trail_status(Name=name)
except (BotoCoreError, ClientError) as err:
module.fail_json_aws(err, msg="Failed to start logging")
- elif action == 'stop':
+ elif action == "stop":
try:
client.stop_logging(Name=name)
return client.get_trail_status(Name=name)
@@ -389,18 +387,27 @@ def get_trail_facts(module, client, name):
module.fail_json_aws(err, msg="Failed to describe Trail")
# Now check to see if our trail exists and get status and tags
- if len(trail_resp['trailList']):
- trail = trail_resp['trailList'][0]
+ if len(trail_resp["trailList"]):
+ trail = trail_resp["trailList"][0]
try:
- status_resp = client.get_trail_status(Name=trail['Name'])
- tags_list = client.list_tags(ResourceIdList=[trail['TrailARN']])
+ status_resp = client.get_trail_status(Name=trail["Name"])
+ tags_list = client.list_tags(ResourceIdList=[trail["TrailARN"]])
except (BotoCoreError, ClientError) as err:
module.fail_json_aws(err, msg="Failed to describe Trail")
- trail['IsLogging'] = status_resp['IsLogging']
- trail['tags'] = boto3_tag_list_to_ansible_dict(tags_list['ResourceTagList'][0]['TagsList'])
+ trail["IsLogging"] = status_resp["IsLogging"]
+ trail["tags"] = boto3_tag_list_to_ansible_dict(tags_list["ResourceTagList"][0]["TagsList"])
# Check for non-existent values and populate with None
- optional_vals = set(['S3KeyPrefix', 'SnsTopicName', 'SnsTopicARN', 'CloudWatchLogsLogGroupArn', 'CloudWatchLogsRoleArn', 'KmsKeyId'])
+ optional_vals = set(
+ [
+ "S3KeyPrefix",
+ "SnsTopicName",
+ "SnsTopicARN",
+ "CloudWatchLogsLogGroupArn",
+ "CloudWatchLogsRoleArn",
+ "KmsKeyId",
+ ]
+ )
for v in optional_vals - set(trail.keys()):
trail[v] = None
return trail
@@ -440,160 +447,163 @@ def update_trail(module, client, ct_params):
def main():
argument_spec = dict(
- state=dict(default='present', choices=['present', 'absent', 'enabled', 'disabled']),
- name=dict(default='default'),
- enable_logging=dict(default=True, type='bool'),
+ state=dict(default="present", choices=["present", "absent", "enabled", "disabled"]),
+ name=dict(default="default"),
+ enable_logging=dict(default=True, type="bool"),
s3_bucket_name=dict(),
s3_key_prefix=dict(no_log=False),
sns_topic_name=dict(),
- is_multi_region_trail=dict(default=False, type='bool'),
- enable_log_file_validation=dict(type='bool', aliases=['log_file_validation_enabled']),
- include_global_events=dict(default=True, type='bool', aliases=['include_global_service_events']),
+ is_multi_region_trail=dict(default=False, type="bool"),
+ enable_log_file_validation=dict(type="bool", aliases=["log_file_validation_enabled"]),
+ include_global_events=dict(default=True, type="bool", aliases=["include_global_service_events"]),
cloudwatch_logs_role_arn=dict(),
cloudwatch_logs_log_group_arn=dict(),
kms_key_id=dict(),
- tags=dict(type='dict', aliases=['resource_tags']),
- purge_tags=dict(default=True, type='bool')
+ tags=dict(type="dict", aliases=["resource_tags"]),
+ purge_tags=dict(default=True, type="bool"),
)
- required_if = [('state', 'present', ['s3_bucket_name']), ('state', 'enabled', ['s3_bucket_name'])]
- required_together = [('cloudwatch_logs_role_arn', 'cloudwatch_logs_log_group_arn')]
+ required_if = [("state", "present", ["s3_bucket_name"]), ("state", "enabled", ["s3_bucket_name"])]
+ required_together = [("cloudwatch_logs_role_arn", "cloudwatch_logs_log_group_arn")]
- module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True, required_together=required_together, required_if=required_if)
+ module = AnsibleAWSModule(
+ argument_spec=argument_spec,
+ supports_check_mode=True,
+ required_together=required_together,
+ required_if=required_if,
+ )
# collect parameters
- if module.params['state'] in ('present', 'enabled'):
- state = 'present'
- elif module.params['state'] in ('absent', 'disabled'):
- state = 'absent'
- tags = module.params['tags']
- purge_tags = module.params['purge_tags']
- enable_logging = module.params['enable_logging']
+ if module.params["state"] in ("present", "enabled"):
+ state = "present"
+ elif module.params["state"] in ("absent", "disabled"):
+ state = "absent"
+ tags = module.params["tags"]
+ purge_tags = module.params["purge_tags"]
+ enable_logging = module.params["enable_logging"]
ct_params = dict(
- Name=module.params['name'],
- S3BucketName=module.params['s3_bucket_name'],
- IncludeGlobalServiceEvents=module.params['include_global_events'],
- IsMultiRegionTrail=module.params['is_multi_region_trail'],
+ Name=module.params["name"],
+ S3BucketName=module.params["s3_bucket_name"],
+ IncludeGlobalServiceEvents=module.params["include_global_events"],
+ IsMultiRegionTrail=module.params["is_multi_region_trail"],
)
- if module.params['s3_key_prefix']:
- ct_params['S3KeyPrefix'] = module.params['s3_key_prefix'].rstrip('/')
+ if module.params["s3_key_prefix"]:
+ ct_params["S3KeyPrefix"] = module.params["s3_key_prefix"].rstrip("/")
- if module.params['sns_topic_name']:
- ct_params['SnsTopicName'] = module.params['sns_topic_name']
+ if module.params["sns_topic_name"]:
+ ct_params["SnsTopicName"] = module.params["sns_topic_name"]
- if module.params['cloudwatch_logs_role_arn']:
- ct_params['CloudWatchLogsRoleArn'] = module.params['cloudwatch_logs_role_arn']
+ if module.params["cloudwatch_logs_role_arn"]:
+ ct_params["CloudWatchLogsRoleArn"] = module.params["cloudwatch_logs_role_arn"]
- if module.params['cloudwatch_logs_log_group_arn']:
- ct_params['CloudWatchLogsLogGroupArn'] = module.params['cloudwatch_logs_log_group_arn']
+ if module.params["cloudwatch_logs_log_group_arn"]:
+ ct_params["CloudWatchLogsLogGroupArn"] = module.params["cloudwatch_logs_log_group_arn"]
- if module.params['enable_log_file_validation'] is not None:
- ct_params['EnableLogFileValidation'] = module.params['enable_log_file_validation']
+ if module.params["enable_log_file_validation"] is not None:
+ ct_params["EnableLogFileValidation"] = module.params["enable_log_file_validation"]
if module.params["kms_key_id"] is not None:
ct_params["KmsKeyId"] = module.params["kms_key_id"]
- client = module.client('cloudtrail')
+ client = module.client("cloudtrail")
region = module.region
- results = dict(
- changed=False,
- exists=False
- )
+ results = dict(changed=False, exists=False)
# Get existing trail facts
- trail = get_trail_facts(module, client, ct_params['Name'])
+ trail = get_trail_facts(module, client, ct_params["Name"])
# If the trail exists set the result exists variable
if trail is not None:
- results['exists'] = True
- initial_kms_key_id = trail.get('KmsKeyId')
+ results["exists"] = True
+ initial_kms_key_id = trail.get("KmsKeyId")
- if state == 'absent' and results['exists']:
+ if state == "absent" and results["exists"]:
# If Trail exists go ahead and delete
- results['changed'] = True
- results['exists'] = False
- results['trail'] = dict()
+ results["changed"] = True
+ results["exists"] = False
+ results["trail"] = dict()
if not module.check_mode:
- delete_trail(module, client, trail['TrailARN'])
+ delete_trail(module, client, trail["TrailARN"])
- elif state == 'present' and results['exists']:
+ elif state == "present" and results["exists"]:
# If Trail exists see if we need to update it
do_update = False
for key in ct_params:
tkey = str(key)
# boto3 has inconsistent parameter naming so we handle it here
- if key == 'EnableLogFileValidation':
- tkey = 'LogFileValidationEnabled'
+ if key == "EnableLogFileValidation":
+ tkey = "LogFileValidationEnabled"
# We need to make an empty string equal None
- if ct_params.get(key) == '':
+ if ct_params.get(key) == "":
val = None
else:
val = ct_params.get(key)
if val != trail.get(tkey):
do_update = True
- if tkey != 'KmsKeyId':
+ if tkey != "KmsKeyId":
# We'll check if the KmsKeyId casues changes later since
# user could've provided a key alias, alias arn, or key id
# and trail['KmsKeyId'] is always a key arn
- results['changed'] = True
+ results["changed"] = True
# If we are in check mode copy the changed values to the trail facts in result output to show what would change.
if module.check_mode:
trail.update({tkey: ct_params.get(key)})
if not module.check_mode and do_update:
update_trail(module, client, ct_params)
- trail = get_trail_facts(module, client, ct_params['Name'])
+ trail = get_trail_facts(module, client, ct_params["Name"])
# Determine if KmsKeyId changed
if not module.check_mode:
- if initial_kms_key_id != trail.get('KmsKeyId'):
- results['changed'] = True
+ if initial_kms_key_id != trail.get("KmsKeyId"):
+ results["changed"] = True
else:
- new_key = ct_params.get('KmsKeyId')
+ new_key = ct_params.get("KmsKeyId")
if initial_kms_key_id != new_key:
# Assume changed for a moment
- results['changed'] = True
+ results["changed"] = True
# However, new_key could be a key id, alias arn, or alias name
# that maps back to the key arn in initial_kms_key_id. So check
# all aliases for a match.
- initial_aliases = get_kms_key_aliases(module, module.client('kms'), initial_kms_key_id)
+ initial_aliases = get_kms_key_aliases(module, module.client("kms"), initial_kms_key_id)
for a in initial_aliases:
- if a['AliasName'] == new_key or a['AliasArn'] == new_key or a['TargetKeyId'] == new_key:
- results['changed'] = False
+ if a["AliasName"] == new_key or a["AliasArn"] == new_key or a["TargetKeyId"] == new_key:
+ results["changed"] = False
# Check if we need to start/stop logging
- if enable_logging and not trail['IsLogging']:
- results['changed'] = True
- trail['IsLogging'] = True
+ if enable_logging and not trail["IsLogging"]:
+ results["changed"] = True
+ trail["IsLogging"] = True
if not module.check_mode:
- set_logging(module, client, name=ct_params['Name'], action='start')
- if not enable_logging and trail['IsLogging']:
- results['changed'] = True
- trail['IsLogging'] = False
+ set_logging(module, client, name=ct_params["Name"], action="start")
+ if not enable_logging and trail["IsLogging"]:
+ results["changed"] = True
+ trail["IsLogging"] = False
if not module.check_mode:
- set_logging(module, client, name=ct_params['Name'], action='stop')
+ set_logging(module, client, name=ct_params["Name"], action="stop")
# Check if we need to update tags on resource
- tags_changed = tag_trail(module, client, tags=tags, trail_arn=trail['TrailARN'], curr_tags=trail['tags'],
- purge_tags=purge_tags)
+ tags_changed = tag_trail(
+ module, client, tags=tags, trail_arn=trail["TrailARN"], curr_tags=trail["tags"], purge_tags=purge_tags
+ )
if tags_changed:
updated_tags = dict()
if not purge_tags:
- updated_tags = trail['tags']
+ updated_tags = trail["tags"]
updated_tags.update(tags)
- results['changed'] = True
- trail['tags'] = updated_tags
+ results["changed"] = True
+ trail["tags"] = updated_tags
# Populate trail facts in output
- results['trail'] = camel_dict_to_snake_dict(trail, ignore_list=['tags'])
+ results["trail"] = camel_dict_to_snake_dict(trail, ignore_list=["tags"])
- elif state == 'present' and not results['exists']:
+ elif state == "present" and not results["exists"]:
# Trail doesn't exist just go create it
- results['changed'] = True
- results['exists'] = True
+ results["changed"] = True
+ results["exists"] = True
if not module.check_mode:
if tags:
ct_params["TagsList"] = ansible_dict_to_boto3_tag_list(tags)
@@ -601,42 +611,42 @@ def main():
created_trail = create_trail(module, client, ct_params)
# Get the trail status
try:
- status_resp = client.get_trail_status(Name=created_trail['Name'])
+ status_resp = client.get_trail_status(Name=created_trail["Name"])
except (BotoCoreError, ClientError) as err:
module.fail_json_aws(err, msg="Failed to fetch Trail statuc")
# Set the logging state for the trail to desired value
- if enable_logging and not status_resp['IsLogging']:
- set_logging(module, client, name=ct_params['Name'], action='start')
- if not enable_logging and status_resp['IsLogging']:
- set_logging(module, client, name=ct_params['Name'], action='stop')
+ if enable_logging and not status_resp["IsLogging"]:
+ set_logging(module, client, name=ct_params["Name"], action="start")
+ if not enable_logging and status_resp["IsLogging"]:
+ set_logging(module, client, name=ct_params["Name"], action="stop")
# Get facts for newly created Trail
- trail = get_trail_facts(module, client, ct_params['Name'])
+ trail = get_trail_facts(module, client, ct_params["Name"])
# If we are in check mode create a fake return structure for the newly minted trail
if module.check_mode:
- acct_id = '123456789012'
+ acct_id = "123456789012"
try:
- sts_client = module.client('sts')
- acct_id = sts_client.get_caller_identity()['Account']
+ sts_client = module.client("sts")
+ acct_id = sts_client.get_caller_identity()["Account"]
except (BotoCoreError, ClientError):
pass
trail = dict()
trail.update(ct_params)
- if 'EnableLogFileValidation' not in ct_params:
- ct_params['EnableLogFileValidation'] = False
- trail['EnableLogFileValidation'] = ct_params['EnableLogFileValidation']
- trail.pop('EnableLogFileValidation')
- fake_arn = 'arn:aws:cloudtrail:' + region + ':' + acct_id + ':trail/' + ct_params['Name']
- trail['HasCustomEventSelectors'] = False
- trail['HomeRegion'] = region
- trail['TrailARN'] = fake_arn
- trail['IsLogging'] = enable_logging
- trail['tags'] = tags
+ if "EnableLogFileValidation" not in ct_params:
+ ct_params["EnableLogFileValidation"] = False
+ trail["EnableLogFileValidation"] = ct_params["EnableLogFileValidation"]
+ trail.pop("EnableLogFileValidation")
+ fake_arn = "arn:aws:cloudtrail:" + region + ":" + acct_id + ":trail/" + ct_params["Name"]
+ trail["HasCustomEventSelectors"] = False
+ trail["HomeRegion"] = region
+ trail["TrailARN"] = fake_arn
+ trail["IsLogging"] = enable_logging
+ trail["tags"] = tags
# Populate trail facts in output
- results['trail'] = camel_dict_to_snake_dict(trail, ignore_list=['tags'])
+ results["trail"] = camel_dict_to_snake_dict(trail, ignore_list=["tags"])
module.exit_json(**results)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()