summaryrefslogtreecommitdiffstats
path: root/ansible_collections/amazon/aws/tests/unit/plugins
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-18 05:52:35 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-18 05:52:35 +0000
commit7fec0b69a082aaeec72fee0612766aa42f6b1b4d (patch)
treeefb569b86ca4da888717f5433e757145fa322e08 /ansible_collections/amazon/aws/tests/unit/plugins
parentReleasing progress-linux version 7.7.0+dfsg-3~progress7.99u1. (diff)
downloadansible-7fec0b69a082aaeec72fee0612766aa42f6b1b4d.tar.xz
ansible-7fec0b69a082aaeec72fee0612766aa42f6b1b4d.zip
Merging upstream version 9.4.0+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/amazon/aws/tests/unit/plugins')
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py815
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py674
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py348
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py21
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py65
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py140
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py65
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py240
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py239
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py99
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py85
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py100
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py0
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py146
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py142
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py364
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py224
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py108
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py224
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py353
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py101
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py86
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py128
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py83
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py16
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py340
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py314
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py121
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py169
-rw-r--r--ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py26
37 files changed, 4623 insertions, 1213 deletions
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py
index 5386fe6c7..8cced1662 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py
@@ -17,96 +17,25 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
import pytest
-import datetime
-from unittest.mock import Mock, MagicMock
+
+try:
+ import botocore
+except ImportError:
+ # Handled by HAS_BOTO3
+ pass
from ansible.errors import AnsibleError
-from ansible.parsing.dataloader import DataLoader
-from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import InventoryModule, instance_data_filter_to_boto_attr
-
-
-instances = {
- 'Instances': [
- {'Monitoring': {'State': 'disabled'},
- 'PublicDnsName': 'ec2-12-345-67-890.compute-1.amazonaws.com',
- 'State': {'Code': 16, 'Name': 'running'},
- 'EbsOptimized': False,
- 'LaunchTime': datetime.datetime(2017, 10, 31, 12, 59, 25),
- 'PublicIpAddress': '12.345.67.890',
- 'PrivateIpAddress': '098.76.54.321',
- 'ProductCodes': [],
- 'VpcId': 'vpc-12345678',
- 'StateTransitionReason': '',
- 'InstanceId': 'i-00000000000000000',
- 'EnaSupport': True,
- 'ImageId': 'ami-12345678',
- 'PrivateDnsName': 'ip-098-76-54-321.ec2.internal',
- 'KeyName': 'testkey',
- 'SecurityGroups': [{'GroupName': 'default', 'GroupId': 'sg-12345678'}],
- 'ClientToken': '',
- 'SubnetId': 'subnet-12345678',
- 'InstanceType': 't2.micro',
- 'NetworkInterfaces': [
- {'Status': 'in-use',
- 'MacAddress': '12:a0:50:42:3d:a4',
- 'SourceDestCheck': True,
- 'VpcId': 'vpc-12345678',
- 'Description': '',
- 'NetworkInterfaceId': 'eni-12345678',
- 'PrivateIpAddresses': [
- {'PrivateDnsName': 'ip-098-76-54-321.ec2.internal',
- 'PrivateIpAddress': '098.76.54.321',
- 'Primary': True,
- 'Association':
- {'PublicIp': '12.345.67.890',
- 'PublicDnsName': 'ec2-12-345-67-890.compute-1.amazonaws.com',
- 'IpOwnerId': 'amazon'}}],
- 'PrivateDnsName': 'ip-098-76-54-321.ec2.internal',
- 'Attachment':
- {'Status': 'attached',
- 'DeviceIndex': 0,
- 'DeleteOnTermination': True,
- 'AttachmentId': 'eni-attach-12345678',
- 'AttachTime': datetime.datetime(2017, 10, 31, 12, 59, 25)},
- 'Groups': [
- {'GroupName': 'default',
- 'GroupId': 'sg-12345678'}],
- 'Ipv6Addresses': [],
- 'OwnerId': '123456789012',
- 'PrivateIpAddress': '098.76.54.321',
- 'SubnetId': 'subnet-12345678',
- 'Association':
- {'PublicIp': '12.345.67.890',
- 'PublicDnsName': 'ec2-12-345-67-890.compute-1.amazonaws.com',
- 'IpOwnerId': 'amazon'}}],
- 'SourceDestCheck': True,
- 'Placement':
- {'Tenancy': 'default',
- 'GroupName': '',
- 'AvailabilityZone': 'us-east-1c'},
- 'Hypervisor': 'xen',
- 'BlockDeviceMappings': [
- {'DeviceName': '/dev/xvda',
- 'Ebs':
- {'Status': 'attached',
- 'DeleteOnTermination': True,
- 'VolumeId': 'vol-01234567890000000',
- 'AttachTime': datetime.datetime(2017, 10, 31, 12, 59, 26)}}],
- 'Architecture': 'x86_64',
- 'RootDeviceType': 'ebs',
- 'RootDeviceName': '/dev/xvda',
- 'VirtualizationType': 'hvm',
- 'Tags': [{'Value': 'test', 'Key': 'ansible'}, {'Value': 'aws_ec2', 'Key': 'Name'}],
- 'AmiLaunchIndex': 0}],
- 'ReservationId': 'r-01234567890000000',
- 'Groups': [],
- 'OwnerId': '123456789012'
-}
+
+from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import InventoryModule
+from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _compile_values
+from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _get_boto_attr_chain
+from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _get_tag_hostname
+from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _prepare_host_vars
@pytest.fixture()
@@ -140,236 +69,187 @@ def inventory():
return inventory
-def test_compile_values(inventory):
- found_value = instances['Instances'][0]
- chain_of_keys = instance_data_filter_to_boto_attr['instance.group-id']
- for attr in chain_of_keys:
- found_value = inventory._compile_values(found_value, attr)
- assert found_value == "sg-12345678"
-
-
-def test_get_boto_attr_chain(inventory):
- instance = instances['Instances'][0]
- assert inventory._get_boto_attr_chain('network-interface.addresses.private-ip-address', instance) == "098.76.54.321"
-
-
-def test_boto3_conn(inventory):
- inventory._options = {"aws_profile": "first_precedence",
- "aws_access_key": "test_access_key",
- "aws_secret_key": "test_secret_key",
- "aws_security_token": "test_security_token",
- "iam_role_arn": None}
- loader = DataLoader()
- inventory._set_credentials(loader)
- with pytest.raises(AnsibleError) as error_message:
- for _connection, _region in inventory._boto3_conn(regions=['us-east-1']):
- assert "Insufficient credentials found" in error_message
-
-
-def testget_all_hostnames_default(inventory):
- instance = instances['Instances'][0]
- assert inventory.get_all_hostnames(instance, hostnames=None) == ["ec2-12-345-67-890.compute-1.amazonaws.com", "ip-098-76-54-321.ec2.internal"]
-
-
-def testget_all_hostnames(inventory):
- hostnames = ['ip-address', 'dns-name']
- instance = instances['Instances'][0]
- assert inventory.get_all_hostnames(instance, hostnames) == ["12.345.67.890", "ec2-12-345-67-890.compute-1.amazonaws.com"]
-
-
-def testget_all_hostnames_dict(inventory):
- hostnames = [{'name': 'private-ip-address', 'separator': '_', 'prefix': 'tag:Name'}]
- instance = instances['Instances'][0]
- assert inventory.get_all_hostnames(instance, hostnames) == ["aws_ec2_098.76.54.321"]
-
-
-def testget_all_hostnames_with_2_tags(inventory):
- hostnames = ['tag:ansible', 'tag:Name']
- instance = instances['Instances'][0]
- assert inventory.get_all_hostnames(instance, hostnames) == ["test", "aws_ec2"]
-
-
-def test_get_preferred_hostname_default(inventory):
- instance = instances['Instances'][0]
- assert inventory._get_preferred_hostname(instance, hostnames=None) == "ec2-12-345-67-890.compute-1.amazonaws.com"
-
-
-def test_get_preferred_hostname(inventory):
- hostnames = ['ip-address', 'dns-name']
- instance = instances['Instances'][0]
- assert inventory._get_preferred_hostname(instance, hostnames) == "12.345.67.890"
-
-
-def test_get_preferred_hostname_dict(inventory):
- hostnames = [{'name': 'private-ip-address', 'separator': '_', 'prefix': 'tag:Name'}]
- instance = instances['Instances'][0]
- assert inventory._get_preferred_hostname(instance, hostnames) == "aws_ec2_098.76.54.321"
-
-
-def test_get_preferred_hostname_with_2_tags(inventory):
- hostnames = ['tag:ansible', 'tag:Name']
- instance = instances['Instances'][0]
- assert inventory._get_preferred_hostname(instance, hostnames) == "test"
-
-
-def test_set_credentials(inventory):
- inventory._options = {'aws_access_key': 'test_access_key',
- 'aws_secret_key': 'test_secret_key',
- 'aws_security_token': 'test_security_token',
- 'aws_profile': 'test_profile',
- 'iam_role_arn': 'arn:aws:iam::123456789012:role/test-role'}
- loader = DataLoader()
- inventory._set_credentials(loader)
-
- assert inventory.boto_profile == "test_profile"
- assert inventory.aws_access_key_id == "test_access_key"
- assert inventory.aws_secret_access_key == "test_secret_key"
- assert inventory.aws_security_token == "test_security_token"
- assert inventory.iam_role_arn == "arn:aws:iam::123456789012:role/test-role"
-
-
-def test_insufficient_credentials(inventory):
- inventory._options = {
- 'aws_access_key': None,
- 'aws_secret_key': None,
- 'aws_security_token': None,
- 'aws_profile': None,
- 'iam_role_arn': None
- }
- with pytest.raises(AnsibleError) as error_message:
- loader = DataLoader()
- inventory._set_credentials(loader)
- assert "Insufficient credentials found" in error_message
-
-
-def test_verify_file_bad_config(inventory):
- assert inventory.verify_file('not_aws_config.yml') is False
+@pytest.mark.parametrize(
+ "obj,expected",
+ [
+ (None, None),
+ ({}, None),
+ ({"GroupId": "test01"}, "test01"),
+ ({"GroupId": ["test01"]}, "test01"),
+ ({"GroupId": "test01"}, "test01"),
+ ({"GroupId": ["test01", "test02"]}, ["test01", "test02"]),
+ ([{"GroupId": ["test01", "test02"]}], ["test01", "test02"]),
+ ([{"GroupId": ["test01"]}], "test01"),
+ (
+ [{"GroupId": ["test01", "test02"]}, {"GroupId": ["test03", "test04"]}],
+ [["test01", "test02"], ["test03", "test04"]],
+ ),
+ (
+ ({"GroupId": ["test01", "test02"]}, {"GroupId": ["test03", "test04"]}),
+ [["test01", "test02"], ["test03", "test04"]],
+ ),
+ (({"GroupId": ["test01", "test02"]}, {}), ["test01", "test02"]),
+ ],
+)
+def test_compile_values(obj, expected):
+ assert _compile_values(obj, "GroupId") == expected
-def test_include_filters_with_no_filter(inventory):
- inventory._options = {
- 'filters': {},
- 'include_filters': [],
- }
- print(inventory.build_include_filters())
- assert inventory.build_include_filters() == [{}]
+@pytest.mark.parametrize(
+ "filter_name,expected",
+ [
+ ("ansible.aws.unexpected.file", "ansible.aws.unexpected.file"),
+ ("instance.group-id", "sg-0123456789"),
+ ("instance.group-name", "default"),
+ ("owner-id", "id-012345678L"),
+ ],
+)
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._compile_values")
+def test_get_boto_attr_chain(m_compile_values, filter_name, expected):
+ m_compile_values.side_effect = lambda obj, attr: obj.get(attr)
+ instance = {"SecurityGroups": {"GroupName": "default", "GroupId": "sg-0123456789"}, "OwnerId": "id-012345678L"}
-def test_include_filters_with_include_filters_only(inventory):
- inventory._options = {
- 'filters': {},
- 'include_filters': [{"foo": "bar"}],
- }
- assert inventory.build_include_filters() == [{"foo": "bar"}]
+ assert _get_boto_attr_chain(filter_name, instance) == expected
-def test_include_filters_with_filter_and_include_filters(inventory):
- inventory._options = {
- 'filters': {"from_filter": 1},
- 'include_filters': [{"from_include_filter": "bar"}],
+@pytest.mark.parametrize(
+ "hostnames,expected",
+ [
+ ([], "test-instance.ansible.com"),
+ (["private-dns-name"], "test-instance.localhost"),
+ (["tag:os_version"], "RHEL"),
+ (["tag:os_version", "dns-name"], "RHEL"),
+ ([{"name": "Name", "prefix": "Phase"}], "dev_test-instance-01"),
+ ([{"name": "Name", "prefix": "Phase", "separator": "-"}], "dev-test-instance-01"),
+ ([{"name": "Name", "prefix": "OSVersion", "separator": "-"}], "test-instance-01"),
+ ([{"name": "Name", "separator": "-"}], "test-instance-01"),
+ ([{"name": "Name", "prefix": "Phase"}, "private-dns-name"], "dev_test-instance-01"),
+ ([{"name": "Name", "prefix": "Phase"}, "tag:os_version"], "dev_test-instance-01"),
+ (["private-dns-name", "dns-name"], "test-instance.localhost"),
+ (["private-dns-name", {"name": "Name", "separator": "-"}], "test-instance.localhost"),
+ (["private-dns-name", "tag:os_version"], "test-instance.localhost"),
+ (["OSRelease"], None),
+ ],
+)
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_tag_hostname")
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_boto_attr_chain")
+def test_inventory_get_preferred_hostname(m_get_boto_attr_chain, m_get_tag_hostname, inventory, hostnames, expected):
+ instance = {
+ "Name": "test-instance-01",
+ "Phase": "dev",
+ "tag:os_version": ["RHEL", "CoreOS"],
+ "another_key": "another_value",
+ "dns-name": "test-instance.ansible.com",
+ "private-dns-name": "test-instance.localhost",
}
- print(inventory.build_include_filters())
- assert inventory.build_include_filters() == [
- {"from_filter": 1},
- {"from_include_filter": "bar"}]
+ inventory._sanitize_hostname = MagicMock()
+ inventory._sanitize_hostname.side_effect = lambda x: x
-def test_add_host_empty_hostnames(inventory):
- hosts = [
- {
- "Placement": {
- "AvailabilityZone": "us-east-1a",
- },
- "PublicDnsName": "ip-10-85-0-4.ec2.internal"
- },
- ]
- inventory._add_hosts(hosts, "aws_ec2", [])
- inventory.inventory.add_host.assert_called_with("ip-10-85-0-4.ec2.internal", group="aws_ec2")
+ m_get_boto_attr_chain.side_effect = lambda pref, instance: instance.get(pref)
+ m_get_tag_hostname.side_effect = lambda pref, instance: instance.get(pref)
+ assert expected == inventory._get_preferred_hostname(instance, hostnames)
-def test_add_host_with_hostnames_no_criteria(inventory):
- hosts = [{}]
- inventory._add_hosts(
- hosts, "aws_ec2", hostnames=["tag:Name", "private-dns-name", "dns-name"]
- )
- assert inventory.inventory.add_host.call_count == 0
+def test_inventory_get_preferred_hostname_failure(inventory):
+ instance = {}
+ hostnames = [{"value": "saome_value"}]
+ inventory._sanitize_hostname = MagicMock()
+ inventory._sanitize_hostname.side_effect = lambda x: x
-def test_add_host_with_hostnames_and_one_criteria(inventory):
- hosts = [
- {
- "Placement": {
- "AvailabilityZone": "us-east-1a",
- },
- "PublicDnsName": "sample-host",
- }
- ]
+ with pytest.raises(AnsibleError) as err:
+ inventory._get_preferred_hostname(instance, hostnames)
+ assert "A 'name' key must be defined in a hostnames dictionary." in err
- inventory._add_hosts(
- hosts, "aws_ec2", hostnames=["tag:Name", "private-dns-name", "dns-name"]
- )
- assert inventory.inventory.add_host.call_count == 1
- inventory.inventory.add_host.assert_called_with("sample-host", group="aws_ec2")
+@pytest.mark.parametrize("base_verify_file_return", [True, False])
+@pytest.mark.parametrize(
+ "filename,result",
+ [
+ ("inventory_aws_ec2.yml", True),
+ ("inventory_aws_ec2.yaml", True),
+ ("inventory_aws_EC2.yaml", False),
+ ("inventory_Aws_ec2.yaml", False),
+ ("aws_ec2_inventory.yml", False),
+ ("aws_ec2.yml_inventory", False),
+ ("aws_ec2.yml", True),
+ ("aws_ec2.yaml", True),
+ ],
+)
+@patch("ansible.plugins.inventory.BaseInventoryPlugin.verify_file")
+def test_inventory_verify_file(m_base_verify_file, inventory, base_verify_file_return, filename, result):
+ m_base_verify_file.return_value = base_verify_file_return
+ if not base_verify_file_return:
+ assert not inventory.verify_file(filename)
+ else:
+ assert result == inventory.verify_file(filename)
-def test_add_host_with_hostnames_and_two_matching_criteria(inventory):
- hosts = [
- {
- "Placement": {
- "AvailabilityZone": "us-east-1a",
- },
- "PublicDnsName": "name-from-PublicDnsName",
- "Tags": [{"Value": "name-from-tag-Name", "Key": "Name"}],
- }
- ]
- inventory._add_hosts(
- hosts, "aws_ec2", hostnames=["tag:Name", "private-dns-name", "dns-name"]
- )
- assert inventory.inventory.add_host.call_count == 1
- inventory.inventory.add_host.assert_called_with(
- "name-from-tag-Name", group="aws_ec2"
- )
+@pytest.mark.parametrize(
+ "preference,instance,expected",
+ [
+ ("tag:os_provider", {"Tags": []}, []),
+ ("tag:os_provider", {}, []),
+ ("tag:os_provider", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, ["RedHat"]),
+ ("tag:OS_Provider", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, []),
+ ("tag:tag:os_provider", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, []),
+ ("tag:os_provider=RedHat", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, ["os_provider_RedHat"]),
+ ("tag:os_provider=CoreOS", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, []),
+ (
+ "tag:os_provider=RedHat,os_release=7",
+ {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "8"}]},
+ ["os_provider_RedHat"],
+ ),
+ (
+ "tag:os_provider=RedHat,os_release=7",
+ {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "7"}]},
+ ["os_provider_RedHat", "os_release_7"],
+ ),
+ (
+ "tag:os_provider,os_release",
+ {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "7"}]},
+ ["RedHat", "7"],
+ ),
+ (
+ "tag:os_provider=RedHat,os_release",
+ {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "7"}]},
+ ["os_provider_RedHat", "7"],
+ ),
+ ],
+)
+def test_get_tag_hostname(preference, instance, expected):
+ assert expected == _get_tag_hostname(preference, instance)
-def test_add_host_with_hostnames_and_two_matching_criteria_and_allow_duplicated_hosts(
- inventory,
-):
- hosts = [
- {
- "Placement": {
- "AvailabilityZone": "us-east-1a",
+@pytest.mark.parametrize(
+ "_options, expected",
+ [
+ ({"filters": {}, "include_filters": []}, [{}]),
+ ({"filters": {}, "include_filters": [{"foo": "bar"}]}, [{"foo": "bar"}]),
+ (
+ {
+ "filters": {"from_filter": 1},
+ "include_filters": [{"from_include_filter": "bar"}],
},
- "PublicDnsName": "name-from-PublicDnsName",
- "Tags": [{"Value": "name-from-tag-Name", "Key": "Name"}],
- }
- ]
-
- inventory._add_hosts(
- hosts,
- "aws_ec2",
- hostnames=["tag:Name", "private-dns-name", "dns-name"],
- allow_duplicated_hosts=True,
- )
- assert inventory.inventory.add_host.call_count == 2
- inventory.inventory.add_host.assert_any_call(
- "name-from-PublicDnsName", group="aws_ec2"
- )
- inventory.inventory.add_host.assert_any_call("name-from-tag-Name", group="aws_ec2")
+ [{"from_filter": 1}, {"from_include_filter": "bar"}],
+ ),
+ ],
+)
+def test_inventory_build_include_filters(inventory, _options, expected):
+ inventory._options = _options
+ assert inventory.build_include_filters() == expected
-def test_sanitize_hostname(inventory):
- assert inventory._sanitize_hostname(1) == "1"
- assert inventory._sanitize_hostname("a:b") == "a_b"
- assert inventory._sanitize_hostname("a:/b") == "a__b"
- assert inventory._sanitize_hostname("example") == "example"
+@pytest.mark.parametrize("hostname,expected", [(1, "1"), ("a:b", "a_b"), ("a:/b", "a__b"), ("example", "example")])
+def test_sanitize_hostname(inventory, hostname, expected):
+ assert inventory._sanitize_hostname(hostname) == expected
def test_sanitize_hostname_legacy(inventory):
- inventory._sanitize_group_name = (
- inventory._legacy_script_compatible_group_sanitization
- )
+ inventory._sanitize_group_name = inventory._legacy_script_compatible_group_sanitization
assert inventory._sanitize_hostname("a:/b") == "a__b"
@@ -413,7 +293,6 @@ def test_sanitize_hostname_legacy(inventory):
],
)
def test_prepare_host_vars(
- inventory,
hostvars_prefix,
hostvars_suffix,
use_contrib_script_compatible_ec2_tag_keys,
@@ -425,7 +304,7 @@ def test_prepare_host_vars(
"Tags": [{"Key": "Name", "Value": "my-name"}],
}
assert (
- inventory.prepare_host_vars(
+ _prepare_host_vars(
original_host_vars,
hostvars_prefix,
hostvars_suffix,
@@ -472,43 +351,339 @@ def test_iter_entry(inventory):
assert entries[1][1]["a_tags_b"]["Name"] == "my-name"
-def test_query_empty(inventory):
- result = inventory._query("us-east-1", [], [], strict_permissions=True)
- assert result == {"aws_ec2": []}
+@pytest.mark.parametrize(
+ "include_filters,exclude_filters,instances_by_region,instances",
+ [
+ ([], [], [], []),
+ (
+ [4, 1, 2],
+ [],
+ [
+ [{"InstanceId": 4, "name": "instance-4"}],
+ [{"InstanceId": 1, "name": "instance-1"}],
+ [{"InstanceId": 2, "name": "instance-2"}],
+ ],
+ [
+ {"InstanceId": 1, "name": "instance-1"},
+ {"InstanceId": 2, "name": "instance-2"},
+ {"InstanceId": 4, "name": "instance-4"},
+ ],
+ ),
+ (
+ [],
+ [4, 1, 2],
+ [
+ [{"InstanceId": 4, "name": "instance-4"}],
+ [{"InstanceId": 1, "name": "instance-1"}],
+ [{"InstanceId": 2, "name": "instance-2"}],
+ ],
+ [],
+ ),
+ (
+ [1, 2],
+ [4],
+ [
+ [{"InstanceId": 4, "name": "instance-4"}],
+ [{"InstanceId": 1, "name": "instance-1"}],
+ [{"InstanceId": 2, "name": "instance-2"}],
+ ],
+ [{"InstanceId": 1, "name": "instance-1"}, {"InstanceId": 2, "name": "instance-2"}],
+ ),
+ (
+ [1, 2],
+ [1],
+ [
+ [{"InstanceId": 1, "name": "instance-1"}],
+ [{"InstanceId": 1, "name": "instance-1"}],
+ [{"InstanceId": 2, "name": "instance-2"}],
+ ],
+ [{"InstanceId": 2, "name": "instance-2"}],
+ ),
+ ],
+)
+def test_inventory_query(inventory, include_filters, exclude_filters, instances_by_region, instances):
+ inventory._get_instances_by_region = MagicMock()
+ inventory._get_instances_by_region.side_effect = instances_by_region
+
+ regions = ["us-east-1", "us-east-2"]
+ strict = False
+
+ params = {
+ "regions": regions,
+ "strict_permissions": strict,
+ "include_filters": [],
+ "exclude_filters": [],
+ "use_ssm_inventory": False,
+ }
+
+ for u in include_filters:
+ params["include_filters"].append({"Name": f"in_filters_{int(u)}", "Values": [u]})
+
+ for u in exclude_filters:
+ params["exclude_filters"].append({"Name": f"ex_filters_{int(u)}", "Values": [u]})
+
+ assert inventory._query(**params) == {"aws_ec2": instances}
+ if not instances_by_region:
+ inventory._get_instances_by_region.assert_not_called()
+
+
+@pytest.mark.parametrize(
+ "filters",
+ [
+ [],
+ [{"Name": "provider", "Values": "sample"}, {"Name": "instance-state-name", "Values": ["active"]}],
+ [
+ {"Name": "tags", "Values": "one_tag"},
+ ],
+ ],
+)
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._describe_ec2_instances")
+def test_inventory_get_instances_by_region(m_describe_ec2_instances, inventory, filters):
+ boto3_conn = [(MagicMock(), "us-east-1"), (MagicMock(), "us-east-2")]
+
+ inventory.all_clients = MagicMock()
+ inventory.all_clients.return_value = boto3_conn
+
+ m_describe_ec2_instances.side_effect = [
+ {
+ "Reservations": [
+ {
+ "OwnerId": "owner01",
+ "RequesterId": "requester01",
+ "ReservationId": "id-0123",
+ "Instances": [
+ {"name": "id-1-0", "os": "RedHat"},
+ {"name": "id-1-1", "os": "CoreOS"},
+ {"name": "id-1-2", "os": "Fedora"},
+ ],
+ },
+ {
+ "OwnerId": "owner01",
+ "ReservationId": "id-0456",
+ "Instances": [{"name": "id-2-0", "phase": "uat"}, {"name": "id-2-1", "phase": "prod"}],
+ },
+ ]
+ },
+ {
+ "Reservations": [
+ {
+ "OwnerId": "owner02",
+ "ReservationId": "id-0789",
+ "Instances": [
+ {"name": "id012345789", "tags": {"phase": "units"}},
+ ],
+ }
+ ],
+ "Metadata": {"Status": "active"},
+ },
+ ]
+
+ expected = [
+ {
+ "name": "id-1-0",
+ "os": "RedHat",
+ "OwnerId": "owner01",
+ "RequesterId": "requester01",
+ "ReservationId": "id-0123",
+ },
+ {
+ "name": "id-1-1",
+ "os": "CoreOS",
+ "OwnerId": "owner01",
+ "RequesterId": "requester01",
+ "ReservationId": "id-0123",
+ },
+ {
+ "name": "id-1-2",
+ "os": "Fedora",
+ "OwnerId": "owner01",
+ "RequesterId": "requester01",
+ "ReservationId": "id-0123",
+ },
+ {"name": "id-2-0", "phase": "uat", "OwnerId": "owner01", "ReservationId": "id-0456", "RequesterId": ""},
+ {"name": "id-2-1", "phase": "prod", "OwnerId": "owner01", "ReservationId": "id-0456", "RequesterId": ""},
+ {
+ "name": "id012345789",
+ "tags": {"phase": "units"},
+ "OwnerId": "owner02",
+ "ReservationId": "id-0789",
+ "RequesterId": "",
+ },
+ ]
+
+ default_filter = {"Name": "instance-state-name", "Values": ["running", "pending", "stopping", "stopped"]}
+ regions = ["us-east-2", "us-east-4"]
+
+ assert inventory._get_instances_by_region(regions, filters, False) == expected
+ inventory.all_clients.assert_called_with("ec2")
+
+ if any((f["Name"] == "instance-state-name" for f in filters)):
+ filters.append(default_filter)
+
+ m_describe_ec2_instances.assert_has_calls([call(conn, filters) for conn, region in boto3_conn], any_order=True)
+
+
+@pytest.mark.parametrize("strict", [True, False])
+@pytest.mark.parametrize(
+ "error",
+ [
+ botocore.exceptions.ClientError(
+ {"Error": {"Code": 1, "Message": "Something went wrong"}, "ResponseMetadata": {"HTTPStatusCode": 404}},
+ "some_botocore_client_error",
+ ),
+ botocore.exceptions.ClientError(
+ {
+ "Error": {"Code": "UnauthorizedOperation", "Message": "Something went wrong"},
+ "ResponseMetadata": {"HTTPStatusCode": 403},
+ },
+ "some_botocore_client_error",
+ ),
+ botocore.exceptions.PaginationError(message="some pagination error"),
+ ],
+)
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._describe_ec2_instances")
+def test_inventory_get_instances_by_region_failures(m_describe_ec2_instances, inventory, strict, error):
+ inventory.all_clients = MagicMock()
+ inventory.all_clients.return_value = [(MagicMock(), "us-west-2")]
+ inventory.fail_aws = MagicMock()
+ inventory.fail_aws.side_effect = SystemExit(1)
+
+ m_describe_ec2_instances.side_effect = error
+ regions = ["us-east-2", "us-east-4"]
+
+ if (
+ isinstance(error, botocore.exceptions.ClientError)
+ and error.response["ResponseMetadata"]["HTTPStatusCode"] == 403
+ and not strict
+ ):
+ assert inventory._get_instances_by_region(regions, [], strict) == []
+ else:
+ with pytest.raises(SystemExit):
+ inventory._get_instances_by_region(regions, [], strict)
+
+
+@pytest.mark.parametrize(
+ "hostnames,expected",
+ [
+ ([], ["test-instance.ansible.com", "test-instance.localhost"]),
+ (["private-dns-name"], ["test-instance.localhost"]),
+ (["tag:os_version"], ["RHEL", "CoreOS"]),
+ (["tag:os_version", "dns-name"], ["RHEL", "CoreOS", "test-instance.ansible.com"]),
+ ([{"name": "Name", "prefix": "Phase"}], ["dev_test-instance-01"]),
+ ([{"name": "Name", "prefix": "Phase", "separator": "-"}], ["dev-test-instance-01"]),
+ ([{"name": "Name", "prefix": "OSVersion", "separator": "-"}], ["test-instance-01"]),
+ ([{"name": "Name", "separator": "-"}], ["test-instance-01"]),
+ (
+ [{"name": "Name", "prefix": "Phase"}, "private-dns-name"],
+ ["dev_test-instance-01", "test-instance.localhost"],
+ ),
+ ([{"name": "Name", "prefix": "Phase"}, "tag:os_version"], ["dev_test-instance-01", "RHEL", "CoreOS"]),
+ (["private-dns-name", {"name": "Name", "separator": "-"}], ["test-instance.localhost", "test-instance-01"]),
+ (["OSRelease"], []),
+ ],
+)
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_tag_hostname")
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_boto_attr_chain")
+def test_inventory_get_all_hostnames(m_get_boto_attr_chain, m_get_tag_hostname, inventory, hostnames, expected):
+ instance = {
+ "Name": "test-instance-01",
+ "Phase": "dev",
+ "tag:os_version": ["RHEL", "CoreOS"],
+ "another_key": "another_value",
+ "dns-name": "test-instance.ansible.com",
+ "private-dns-name": "test-instance.localhost",
+ }
+
+ inventory._sanitize_hostname = MagicMock()
+ inventory._sanitize_hostname.side_effect = lambda x: x
+
+ m_get_boto_attr_chain.side_effect = lambda pref, instance: instance.get(pref)
+ m_get_tag_hostname.side_effect = lambda pref, instance: instance.get(pref)
+ assert expected == inventory._get_all_hostnames(instance, hostnames)
-instance_foobar = {"InstanceId": "foobar"}
-instance_barfoo = {"InstanceId": "barfoo"}
+def test_inventory_get_all_hostnames_failure(inventory):
+ instance = {}
+ hostnames = [{"value": "some_value"}]
-def test_query_empty_include_only(inventory):
- inventory._get_instances_by_region = Mock(side_effect=[[instance_foobar]])
- result = inventory._query("us-east-1", [{"tag:Name": ["foobar"]}], [], strict_permissions=True)
- assert result == {"aws_ec2": [instance_foobar]}
+ with pytest.raises(AnsibleError) as err:
+ inventory._get_all_hostnames(instance, hostnames)
+ assert "A 'name' key must be defined in a hostnames dictionary." in err
-def test_query_empty_include_ordered(inventory):
- inventory._get_instances_by_region = Mock(side_effect=[[instance_foobar], [instance_barfoo]])
- result = inventory._query("us-east-1", [{"tag:Name": ["foobar"]}, {"tag:Name": ["barfoo"]}], [], strict_permissions=True)
- assert result == {"aws_ec2": [instance_barfoo, instance_foobar]}
- inventory._get_instances_by_region.assert_called_with('us-east-1', [{'Name': 'tag:Name', 'Values': ['barfoo']}], True)
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_ssm_information")
+def test_inventory__add_ssm_information(m_get_ssm_information, inventory):
+ instances = [
+ {"InstanceId": "i-001", "Name": "first-instance"},
+ {"InstanceId": "i-002", "Name": "another-instance"},
+ ]
+ result = {
+ "StatusCode": 200,
+ "Entities": [
+ {"Id": "i-001", "Data": {}},
+ {
+ "Id": "i-002",
+ "Data": {
+ "AWS:InstanceInformation": {
+ "Content": [{"os_type": "Linux", "os_name": "Fedora", "os_version": 37}]
+ }
+ },
+ },
+ ],
+ }
+ m_get_ssm_information.return_value = result
-def test_query_empty_include_exclude(inventory):
- inventory._get_instances_by_region = Mock(side_effect=[[instance_foobar], [instance_foobar]])
- result = inventory._query("us-east-1", [{"tag:Name": ["foobar"]}], [{"tag:Name": ["foobar"]}], strict_permissions=True)
- assert result == {"aws_ec2": []}
+ connection = MagicMock()
+ expected = [
+ {"InstanceId": "i-001", "Name": "first-instance"},
+ {
+ "InstanceId": "i-002",
+ "Name": "another-instance",
+ "SsmInventory": {"os_type": "Linux", "os_name": "Fedora", "os_version": 37},
+ },
+ ]
+
+ inventory._add_ssm_information(connection, instances)
+ assert expected == instances
+
+ filters = [{"Key": "AWS:InstanceInformation.InstanceId", "Values": [x["InstanceId"] for x in instances]}]
+ m_get_ssm_information.assert_called_once_with(connection, filters)
+
+
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_ssm_information")
+def test_inventory__get_multiple_ssm_inventories(m_get_ssm_information, inventory):
+ instances = [{"InstanceId": f"i-00{i}", "Name": f"instance {i}"} for i in range(41)]
+ result = {
+ "StatusCode": 200,
+ "Entities": [
+ {
+ "Id": f"i-00{i}",
+ "Data": {
+ "AWS:InstanceInformation": {
+ "Content": [{"os_type": "Linux", "os_name": "Fedora", "os_version": 37}]
+ }
+ },
+ }
+ for i in range(41)
+ ],
+ }
+ m_get_ssm_information.return_value = result
+
+ connection = MagicMock()
-def test_include_extra_api_calls_deprecated(inventory):
- inventory.display.deprecate = Mock()
- inventory._read_config_data = Mock()
- inventory._set_credentials = Mock()
- inventory._query = Mock(return_value=[])
+ expected = [
+ {
+ "InstanceId": f"i-00{i}",
+ "Name": f"instance {i}",
+ "SsmInventory": {"os_type": "Linux", "os_name": "Fedora", "os_version": 37},
+ }
+ for i in range(41)
+ ]
- inventory.parse(inventory=[], loader=None, path=None)
- assert inventory.display.deprecate.call_count == 0
+ inventory._add_ssm_information(connection, instances)
+ assert expected == instances
- inventory._options["include_extra_api_calls"] = True
- inventory.parse(inventory=[], loader=None, path=None)
- assert inventory.display.deprecate.call_count == 1
+ assert 2 == m_get_ssm_information.call_count
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py
new file mode 100644
index 000000000..53be24a48
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py
@@ -0,0 +1,674 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2022 Aubin Bikouo <@abikouo>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+import copy
+import random
+import string
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
+
+import pytest
+
+try:
+ import botocore
+except ImportError:
+ # Handled by HAS_BOTO3
+ pass
+
+from ansible.errors import AnsibleError
+
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import InventoryModule
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _add_tags_for_rds_hosts
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _describe_db_clusters
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _describe_db_instances
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _find_hosts_with_valid_statuses
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _get_rds_hostname
+from ansible_collections.amazon.aws.plugins.inventory.aws_rds import ansible_dict_to_boto3_filter_list
+from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
+
+if not HAS_BOTO3:
+ pytestmark = pytest.mark.skip("test_aws_rds.py requires the python modules 'boto3' and 'botocore'")
+
+
+def make_clienterror_exception(code="AccessDenied"):
+ return botocore.exceptions.ClientError(
+ {
+ "Error": {"Code": code, "Message": "User is not authorized to perform: xxx on resource: user yyyy"},
+ "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"},
+ },
+ "getXXX",
+ )
+
+
+@pytest.fixture()
+def inventory():
+ inventory = InventoryModule()
+ inventory.inventory = MagicMock()
+ inventory._populate_host_vars = MagicMock()
+
+ inventory.all_clients = MagicMock()
+ inventory.get_option = MagicMock()
+
+ inventory._set_composite_vars = MagicMock()
+ inventory._add_host_to_composed_groups = MagicMock()
+ inventory._add_host_to_keyed_groups = MagicMock()
+ inventory._read_config_data = MagicMock()
+ inventory._set_credentials = MagicMock()
+
+ inventory.get_cache_key = MagicMock()
+
+ inventory._cache = {}
+ return inventory
+
+
+@pytest.fixture()
+def connection():
+ conn = MagicMock()
+ return conn
+
+
+@pytest.mark.parametrize(
+ "suffix,result",
+ [
+ ("aws_rds.yml", True),
+ ("aws_rds.yaml", True),
+ ("aws_RDS.yml", False),
+ ("AWS_rds.yaml", False),
+ ],
+)
+def test_inventory_verify_file_suffix(inventory, suffix, result, tmp_path):
+ test_dir = tmp_path / "test_aws_rds"
+ test_dir.mkdir()
+ inventory_file = "inventory" + suffix
+ inventory_file = test_dir / inventory_file
+ inventory_file.write_text("my inventory")
+ assert result == inventory.verify_file(str(inventory_file))
+
+
+def test_inventory_verify_file_with_missing_file(inventory):
+ inventory_file = "this_file_does_not_exist_aws_rds.yml"
+ assert not inventory.verify_file(inventory_file)
+
+
+def generate_random_string(with_digits=True, with_punctuation=True, length=16):
+ data = string.ascii_letters
+ if with_digits:
+ data += string.digits
+ if with_punctuation:
+ data += string.punctuation
+ return "".join([random.choice(data) for i in range(length)])
+
+
+@pytest.mark.parametrize(
+ "hosts,statuses,expected",
+ [
+ (
+ [
+ {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"},
+ {"host": "host2", "DBInstanceStatus": "Creating", "Status": "active"},
+ {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"},
+ {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"},
+ ],
+ ["Available"],
+ [{"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}],
+ ),
+ (
+ [
+ {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"},
+ {"host": "host2", "DBInstanceStatus": "Creating", "Status": "active"},
+ {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"},
+ {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"},
+ ],
+ ["all"],
+ [
+ {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"},
+ {"host": "host2", "DBInstanceStatus": "Creating", "Status": "active"},
+ {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"},
+ {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"},
+ ],
+ ),
+ (
+ [
+ {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"},
+ {"host": "host2", "DBInstanceStatus": "Creating", "Status": "Available"},
+ {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"},
+ {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"},
+ ],
+ ["Available"],
+ [
+ {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"},
+ {"host": "host2", "DBInstanceStatus": "Creating", "Status": "Available"},
+ ],
+ ),
+ ],
+)
+def test_find_hosts_with_valid_statuses(hosts, statuses, expected):
+ assert expected == _find_hosts_with_valid_statuses(hosts, statuses)
+
+
+@pytest.mark.parametrize(
+ "host,expected",
+ [
+ ({"DBClusterIdentifier": "my_cluster_id"}, "my_cluster_id"),
+ ({"DBClusterIdentifier": "my_cluster_id", "DBInstanceIdentifier": "my_instance_id"}, "my_instance_id"),
+ ],
+)
+def test_get_rds_hostname(host, expected):
+ assert expected == _get_rds_hostname(host)
+
+
+@pytest.mark.parametrize("hosts", ["", "host1", "host2,host3", "host2,host3,host1"])
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_rds._get_rds_hostname")
+def test_inventory_format_inventory(m_get_rds_hostname, inventory, hosts):
+ hosts_vars = {
+ "host1": {"var10": "value10"},
+ "host2": {"var20": "value20", "var21": "value21"},
+ "host3": {"var30": "value30", "var31": "value31", "var32": "value32"},
+ }
+
+ m_get_rds_hostname.side_effect = lambda h: h["name"]
+
+ class _inventory_host(object):
+ def __init__(self, name, host_vars):
+ self.name = name
+ self.vars = host_vars
+
+ inventory.inventory = MagicMock()
+ inventory.inventory.get_host.side_effect = lambda x: _inventory_host(name=x, host_vars=hosts_vars.get(x))
+
+ hosts = [{"name": x} for x in hosts.split(",") if x]
+ expected = {
+ "_meta": {"hostvars": {x["name"]: hosts_vars.get(x["name"]) for x in hosts}},
+ "aws_rds": {"hosts": [x["name"] for x in hosts]},
+ }
+
+ assert expected == inventory._format_inventory(hosts)
+ if hosts == []:
+ m_get_rds_hostname.assert_not_called()
+
+
+@pytest.mark.parametrize("length", range(0, 10, 2))
+def test_inventory_populate(inventory, length):
+ group = "aws_rds"
+ hosts = [f"host_{int(i)}" for i in range(length)]
+
+ inventory._add_hosts = MagicMock()
+ inventory._populate(hosts=hosts)
+
+ inventory.inventory.add_group.assert_called_with("aws_rds")
+
+ if len(hosts) == 0:
+ inventory.inventory._add_hosts.assert_not_called()
+ inventory.inventory.add_child.assert_not_called()
+ else:
+ inventory._add_hosts.assert_called_with(hosts=hosts, group=group)
+ inventory.inventory.add_child.assert_called_with("all", group)
+
+
+def test_inventory_populate_from_source(inventory):
+ source_data = {
+ "_meta": {
+ "hostvars": {
+ "host_1_0": {"var10": "value10"},
+ "host_2": {"var2": "value2"},
+ "host_3": {"var3": ["value30", "value31", "value32"]},
+ }
+ },
+ "all": {"hosts": ["host_1_0", "host_1_1", "host_2", "host_3"]},
+ "aws_host_1": {"hosts": ["host_1_0", "host_1_1"]},
+ "aws_host_2": {"hosts": ["host_2"]},
+ "aws_host_3": {"hosts": ["host_3"]},
+ }
+
+ inventory._populate_from_source(source_data)
+ inventory.inventory.add_group.assert_has_calls(
+ [
+ call("aws_host_1"),
+ call("aws_host_2"),
+ call("aws_host_3"),
+ ],
+ any_order=True,
+ )
+ inventory.inventory.add_child.assert_has_calls(
+ [
+ call("all", "aws_host_1"),
+ call("all", "aws_host_2"),
+ call("all", "aws_host_3"),
+ ],
+ any_order=True,
+ )
+
+ inventory._populate_host_vars.assert_has_calls(
+ [
+ call(["host_1_0"], {"var10": "value10"}, "aws_host_1"),
+ call(["host_1_1"], {}, "aws_host_1"),
+ call(["host_2"], {"var2": "value2"}, "aws_host_2"),
+ call(["host_3"], {"var3": ["value30", "value31", "value32"]}, "aws_host_3"),
+ ],
+ any_order=True,
+ )
+
+
+@pytest.mark.parametrize("strict", [True, False])
+def test_add_tags_for_rds_hosts_with_no_hosts(connection, strict):
+ hosts = []
+
+ _add_tags_for_rds_hosts(connection, hosts, strict)
+ connection.list_tags_for_resource.assert_not_called()
+
+
+def test_add_tags_for_rds_hosts_with_hosts(connection):
+ hosts = [
+ {"DBInstanceArn": "dbarn1"},
+ {"DBInstanceArn": "dbarn2"},
+ {"DBClusterArn": "clusterarn1"},
+ ]
+
+ rds_hosts_tags = {
+ "dbarn1": {"TagList": ["tag1=dbarn1", "phase=units"]},
+ "dbarn2": {"TagList": ["tag2=dbarn2", "collection=amazon.aws"]},
+ "clusterarn1": {"TagList": ["tag1=clusterarn1", "tool=ansible-test"]},
+ }
+ connection.list_tags_for_resource.side_effect = lambda **kwargs: rds_hosts_tags.get(kwargs.get("ResourceName"))
+
+ _add_tags_for_rds_hosts(connection, hosts, strict=False)
+
+ assert hosts == [
+ {"DBInstanceArn": "dbarn1", "Tags": ["tag1=dbarn1", "phase=units"]},
+ {"DBInstanceArn": "dbarn2", "Tags": ["tag2=dbarn2", "collection=amazon.aws"]},
+ {"DBClusterArn": "clusterarn1", "Tags": ["tag1=clusterarn1", "tool=ansible-test"]},
+ ]
+
+
+def test_add_tags_for_rds_hosts_with_failure_not_strict(connection):
+ hosts = [{"DBInstanceArn": "dbarn1"}]
+
+ connection.list_tags_for_resource.side_effect = make_clienterror_exception()
+
+ _add_tags_for_rds_hosts(connection, hosts, strict=False)
+
+ assert hosts == [
+ {"DBInstanceArn": "dbarn1", "Tags": []},
+ ]
+
+
+def test_add_tags_for_rds_hosts_with_failure_strict(connection):
+ hosts = [{"DBInstanceArn": "dbarn1"}]
+
+ connection.list_tags_for_resource.side_effect = make_clienterror_exception()
+
+ with pytest.raises(botocore.exceptions.ClientError):
+ _add_tags_for_rds_hosts(connection, hosts, strict=True)
+
+
+ADD_TAGS_FOR_RDS_HOSTS = "ansible_collections.amazon.aws.plugins.inventory.aws_rds._add_tags_for_rds_hosts"
+
+
+@patch(ADD_TAGS_FOR_RDS_HOSTS)
+def test_describe_db_clusters(m_add_tags_for_rds_hosts, connection):
+ db_cluster = {
+ "DatabaseName": "my_sample_db",
+ "DBClusterIdentifier": "db_id_01",
+ "Status": "Stopped",
+ "DbClusterResourceId": "cluster_resource_id",
+ "DBClusterArn": "arn:xxx:xxxx",
+ "DeletionProtection": True,
+ }
+
+ connection.describe_db_clusters.return_value = {"DBClusters": [db_cluster]}
+
+ filters = generate_random_string(with_punctuation=False)
+ strict = False
+
+ result = _describe_db_clusters(connection=connection, filters=filters, strict=strict)
+
+ assert result == [db_cluster]
+
+ m_add_tags_for_rds_hosts.assert_called_with(connection, result, strict)
+
+
+@pytest.mark.parametrize("strict", [True, False])
+@patch(ADD_TAGS_FOR_RDS_HOSTS)
+def test_describe_db_clusters_with_access_denied(m_add_tags_for_rds_hosts, connection, strict):
+ connection.describe_db_clusters.side_effect = make_clienterror_exception()
+
+ filters = generate_random_string(with_punctuation=False)
+
+ if strict:
+ with pytest.raises(AnsibleError):
+ _describe_db_clusters(connection=connection, filters=filters, strict=strict)
+ else:
+ assert _describe_db_clusters(connection=connection, filters=filters, strict=strict) == []
+
+ m_add_tags_for_rds_hosts.assert_not_called()
+
+
+@patch(ADD_TAGS_FOR_RDS_HOSTS)
+def test_describe_db_clusters_with_client_error(m_add_tags_for_rds_hosts, connection):
+ connection.describe_db_clusters.side_effect = make_clienterror_exception(code="Unknown")
+
+ filters = generate_random_string(with_punctuation=False)
+ with pytest.raises(AnsibleError):
+ _describe_db_clusters(connection=connection, filters=filters, strict=False)
+
+ m_add_tags_for_rds_hosts.assert_not_called()
+
+
+@patch(ADD_TAGS_FOR_RDS_HOSTS)
+def test_describe_db_instances(m_add_tags_for_rds_hosts, connection):
+ db_instance = {
+ "DBInstanceIdentifier": "db_id_01",
+ "Status": "Stopped",
+ "DBName": "my_sample_db_01",
+ "DBClusterIdentifier": "db_cluster_001",
+ "DBInstanceArn": "arn:db:xxxx:xxxx:xxxx",
+ "Engine": "mysql",
+ }
+
+ conn_paginator = MagicMock()
+ paginate = MagicMock()
+
+ connection.get_paginator.return_value = conn_paginator
+ conn_paginator.paginate.return_value = paginate
+
+ paginate.build_full_result.return_value = {"DBInstances": [db_instance]}
+
+ filters = generate_random_string(with_punctuation=False)
+ strict = False
+
+ result = _describe_db_instances(connection=connection, filters=filters, strict=strict)
+
+ assert result == [db_instance]
+
+ m_add_tags_for_rds_hosts.assert_called_with(connection, result, strict)
+ connection.get_paginator.assert_called_with("describe_db_instances")
+ conn_paginator.paginate.assert_called_with(Filters=filters)
+
+
+DESCRIBE_DB_INSTANCES = "ansible_collections.amazon.aws.plugins.inventory.aws_rds._describe_db_instances"
+DESCRIBE_DB_CLUSTERS = "ansible_collections.amazon.aws.plugins.inventory.aws_rds._describe_db_clusters"
+FIND_HOSTS_WITH_VALID_STATUSES = (
+ "ansible_collections.amazon.aws.plugins.inventory.aws_rds._find_hosts_with_valid_statuses"
+)
+
+
+@pytest.mark.parametrize("gather_clusters", [True, False])
+@pytest.mark.parametrize("regions", range(1, 5))
+@patch(DESCRIBE_DB_INSTANCES)
+@patch(DESCRIBE_DB_CLUSTERS)
+@patch(FIND_HOSTS_WITH_VALID_STATUSES)
+def test_inventory_get_all_db_hosts(
+ m_find_hosts, m_describe_db_clusters, m_describe_db_instances, inventory, gather_clusters, regions
+):
+ params = {
+ "gather_clusters": gather_clusters,
+ "regions": [f"us-east-{int(i)}" for i in range(regions)],
+ "instance_filters": generate_random_string(),
+ "cluster_filters": generate_random_string(),
+ "strict": random.choice((True, False)),
+ "statuses": [random.choice(["Available", "Stopped", "Running", "Creating"]) for i in range(3)],
+ }
+
+ connections = [MagicMock() for i in range(regions)]
+
+ inventory.all_clients.return_value = [(connections[i], f"us-east-{int(i)}") for i in range(regions)]
+
+ ids = list(reversed(range(regions)))
+ db_instances = [{"DBInstanceIdentifier": f"db_00{int(i)}"} for i in ids]
+ db_clusters = [{"DBClusterIdentifier": f"cluster_00{int(i)}"} for i in ids]
+
+ m_describe_db_instances.side_effect = [[i] for i in db_instances]
+ m_describe_db_clusters.side_effect = [[i] for i in db_clusters]
+
+ result = list(sorted(db_instances, key=lambda x: x["DBInstanceIdentifier"]))
+ if gather_clusters:
+ result += list(sorted(db_clusters, key=lambda x: x["DBClusterIdentifier"]))
+
+ m_find_hosts.return_value = result
+
+ assert result == inventory._get_all_db_hosts(**params)
+ inventory.all_clients.assert_called_with("rds")
+ m_describe_db_instances.assert_has_calls(
+ [call(connections[i], params["instance_filters"], strict=params["strict"]) for i in range(regions)]
+ )
+
+ if gather_clusters:
+ m_describe_db_clusters.assert_has_calls(
+ [call(connections[i], params["cluster_filters"], strict=params["strict"]) for i in range(regions)]
+ )
+
+ m_find_hosts.assert_called_with(result, params["statuses"])
+
+
+@pytest.mark.parametrize("hostvars_prefix", [True])
+@pytest.mark.parametrize("hostvars_suffix", [True])
+@patch("ansible_collections.amazon.aws.plugins.inventory.aws_rds._get_rds_hostname")
+def test_inventory_add_hosts(m_get_rds_hostname, inventory, hostvars_prefix, hostvars_suffix):
+ _options = {
+ "strict": random.choice((False, True)),
+ "compose": random.choice((False, True)),
+ "keyed_groups": "keyed_group_test_inventory_add_hosts",
+ "groups": ["all", "test_inventory_add_hosts"],
+ }
+
+ if hostvars_prefix:
+ _options["hostvars_prefix"] = f"prefix_{generate_random_string(length=8, with_punctuation=False)}"
+ if hostvars_suffix:
+ _options["hostvars_suffix"] = f"suffix_{generate_random_string(length=8, with_punctuation=False)}"
+
+ def _get_option_side_effect(x):
+ return _options.get(x)
+
+ inventory.get_option.side_effect = _get_option_side_effect
+
+ m_get_rds_hostname.side_effect = lambda h: (
+ h["DBInstanceIdentifier"] if "DBInstanceIdentifier" in h else h["DBClusterIdentifier"]
+ )
+
+ hosts = [
+ {
+ "DBInstanceIdentifier": "db_i_001",
+ "Tags": [{"Key": "Name", "Value": "db_001"}, {"Key": "RunningEngine", "Value": "mysql"}],
+ "availability_zone": "us-east-1a",
+ },
+ {
+ "DBInstanceIdentifier": "db_i_002",
+ "Tags": [{"Key": "ClusterName", "Value": "test_cluster"}, {"Key": "RunningOS", "Value": "CoreOS"}],
+ },
+ {
+ "DBClusterIdentifier": "test_cluster",
+ "Tags": [{"Key": "CluserVersionOrigin", "Value": "2.0"}, {"Key": "Provider", "Value": "RedHat"}],
+ },
+ {
+ "DBClusterIdentifier": "another_cluster",
+ "Tags": [{"Key": "TestingPurpose", "Value": "Ansible"}],
+ "availability_zones": ["us-west-1a", "us-east-1b"],
+ },
+ ]
+
+ group = f"test_add_hosts_group_{generate_random_string(length=10, with_punctuation=False)}"
+ inventory._add_hosts(hosts, group)
+
+ m_get_rds_hostname.assert_has_calls([call(h) for h in hosts], any_order=True)
+
+ hosts_names = ["db_i_001", "db_i_002", "test_cluster", "another_cluster"]
+ inventory.inventory.add_host.assert_has_calls([call(name, group=group) for name in hosts_names], any_order=True)
+
+ camel_hosts = [
+ {
+ "db_instance_identifier": "db_i_001",
+ "tags": {"Name": "db_001", "RunningEngine": "mysql"},
+ "availability_zone": "us-east-1a",
+ "region": "us-east-1",
+ },
+ {"db_instance_identifier": "db_i_002", "tags": {"ClusterName": "test_cluster", "RunningOS": "CoreOS"}},
+ {"db_cluster_identifier": "test_cluster", "tags": {"CluserVersionOrigin": "2.0", "Provider": "RedHat"}},
+ {
+ "db_cluster_identifier": "another_cluster",
+ "tags": {"TestingPurpose": "Ansible"},
+ "availability_zones": ["us-west-1a", "us-east-1b"],
+ "region": "us-west-1",
+ },
+ ]
+
+ set_variable_calls = []
+ for i in range(len(camel_hosts)):
+ for var, value in camel_hosts[i].items():
+ if hostvars_prefix:
+ var = _options["hostvars_prefix"] + var
+ if hostvars_suffix:
+ var += _options["hostvars_suffix"]
+ set_variable_calls.append(call(hosts_names[i], var, value))
+
+ inventory.inventory.set_variable.assert_has_calls(set_variable_calls, any_order=True)
+
+ if hostvars_prefix or hostvars_suffix:
+ tmp = []
+ for host in camel_hosts:
+ new_host = copy.deepcopy(host)
+ for key in host:
+ new_key = key
+ if hostvars_prefix:
+ new_key = _options["hostvars_prefix"] + new_key
+ if hostvars_suffix:
+ new_key += _options["hostvars_suffix"]
+ new_host[new_key] = host[key]
+ tmp.append(new_host)
+ camel_hosts = tmp
+
+ inventory._set_composite_vars.assert_has_calls(
+ [
+ call(_options["compose"], camel_hosts[i], hosts_names[i], strict=_options["strict"])
+ for i in range(len(camel_hosts))
+ ],
+ any_order=True,
+ )
+ inventory._add_host_to_composed_groups.assert_has_calls(
+ [
+ call(_options["groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"])
+ for i in range(len(camel_hosts))
+ ],
+ any_order=True,
+ )
+ inventory._add_host_to_keyed_groups.assert_has_calls(
+ [
+ call(_options["keyed_groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"])
+ for i in range(len(camel_hosts))
+ ],
+ any_order=True,
+ )
+
+
+BASE_INVENTORY_PARSE = "ansible_collections.amazon.aws.plugins.inventory.aws_rds.AWSInventoryBase.parse"
+
+
+@pytest.mark.parametrize("include_clusters", [True, False])
+@pytest.mark.parametrize("filter_db_cluster_id", [True, False])
+@pytest.mark.parametrize("user_cache_directive", [True, False])
+@pytest.mark.parametrize("cache", [True, False])
+@pytest.mark.parametrize("cache_hit", [True, False])
+@patch(BASE_INVENTORY_PARSE)
+def test_inventory_parse(
+ m_parse, inventory, include_clusters, filter_db_cluster_id, user_cache_directive, cache, cache_hit
+):
+ inventory_data = MagicMock()
+ loader = MagicMock()
+ path = generate_random_string(with_punctuation=False, with_digits=False)
+
+ options = {}
+ options["regions"] = [f"us-east-{d}" for d in range(random.randint(1, 5))]
+ options["strict_permissions"] = random.choice((True, False))
+ options["statuses"] = generate_random_string(with_punctuation=False)
+ options["include_clusters"] = include_clusters
+ options["filters"] = {
+ "db-instance-id": [
+ f"arn:db:{generate_random_string(with_punctuation=False)}" for i in range(random.randint(1, 10))
+ ],
+ "dbi-resource-id": generate_random_string(with_punctuation=False),
+ "domain": generate_random_string(with_digits=False, with_punctuation=False),
+ "engine": generate_random_string(with_digits=False, with_punctuation=False),
+ }
+ if filter_db_cluster_id:
+ options["filters"]["db-cluster-id"] = [
+ f"arn:cluster:{generate_random_string(with_punctuation=False)}" for i in range(random.randint(1, 10))
+ ]
+
+ options["cache"] = user_cache_directive
+
+ def get_option_side_effect(v):
+ return options.get(v)
+
+ inventory.get_option.side_effect = get_option_side_effect
+
+ cache_key = path + generate_random_string()
+ inventory.get_cache_key.return_value = cache_key
+
+ cache_key_value = generate_random_string()
+ if cache_hit:
+ inventory._cache[cache_key] = cache_key_value
+
+ inventory._populate = MagicMock()
+ inventory._populate_from_source = MagicMock()
+ inventory._get_all_db_hosts = MagicMock()
+ all_db_hosts = [
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ ]
+ inventory._get_all_db_hosts.return_value = all_db_hosts
+
+ format_cache_key_value = f"format_inventory_{all_db_hosts}"
+ inventory._format_inventory = MagicMock()
+ inventory._format_inventory.return_value = format_cache_key_value
+
+ inventory.parse(inventory_data, loader, path, cache)
+
+ m_parse.assert_called_with(inventory_data, loader, path, cache=cache)
+
+ boto3_instance_filters = ansible_dict_to_boto3_filter_list(options["filters"])
+ boto3_cluster_filters = []
+ if filter_db_cluster_id and include_clusters:
+ boto3_cluster_filters = ansible_dict_to_boto3_filter_list(
+ {"db-cluster-id": options["filters"]["db-cluster-id"]}
+ )
+
+ if not cache or not user_cache_directive or (cache and user_cache_directive and not cache_hit):
+ inventory._get_all_db_hosts.assert_called_with(
+ options["regions"],
+ boto3_instance_filters,
+ boto3_cluster_filters,
+ options["strict_permissions"],
+ options["statuses"],
+ include_clusters,
+ )
+ inventory._populate.assert_called_with(all_db_hosts)
+ inventory._format_inventory.assert_called_with(all_db_hosts)
+ else:
+ inventory._get_all_db_hosts.assert_not_called()
+ inventory._populate.assert_not_called()
+ inventory._format_inventory.assert_not_called()
+
+ if cache and user_cache_directive and cache_hit:
+ inventory._populate_from_source.assert_called_with(cache_key_value)
+
+ if cache and user_cache_directive and not cache_hit or (not cache and user_cache_directive):
+ # validate that cache was populated
+ assert inventory._cache[cache_key] == format_cache_key_value
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py b/ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py
new file mode 100644
index 000000000..2c8260b61
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py
@@ -0,0 +1,348 @@
+#
+# (c) 2024 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import random
+from unittest.mock import ANY
+from unittest.mock import MagicMock
+from unittest.mock import call
+
+import pytest
+from botocore.exceptions import ClientError
+
+from ansible.errors import AnsibleLookupError
+
+# from ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret import AnsibleLookupError
+from ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret import LookupModule
+
+
+@pytest.fixture
+def lookup_plugin():
+ lookup = LookupModule()
+ lookup.params = {}
+
+ lookup.get_option = MagicMock()
+
+ def _get_option(x):
+ return lookup.params.get(x)
+
+ lookup.get_option.side_effect = _get_option
+ lookup.client = MagicMock()
+
+ return lookup
+
+
+def pick_from_list(elements=None):
+ if elements is None:
+ elements = ["error", "warn", "skip"]
+ return random.choice(elements)
+
+
+def _raise_boto_clienterror(code, msg):
+ params = {
+ "Error": {"Code": code, "Message": msg},
+ "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"},
+ }
+ return ClientError(params, "get_secret_value")
+
+
+class TestLookupModuleRun:
+ @pytest.mark.parametrize(
+ "params,err",
+ [
+ ({"on_missing": "test"}, '"on_missing" must be a string and one of "error", "warn" or "skip", not test'),
+ ({"on_denied": "return"}, '"on_denied" must be a string and one of "error", "warn" or "skip", not return'),
+ (
+ {"on_deleted": "delete"},
+ '"on_deleted" must be a string and one of "error", "warn" or "skip", not delete',
+ ),
+ (
+ {"on_missing": ["warn"]},
+ '"on_missing" must be a string and one of "error", "warn" or "skip", not [\'warn\']',
+ ),
+ ({"on_denied": True}, '"on_denied" must be a string and one of "error", "warn" or "skip", not True'),
+ (
+ {"on_deleted": {"error": True}},
+ '"on_deleted" must be a string and one of "error", "warn" or "skip", not {\'error\': True}',
+ ),
+ ],
+ )
+ def test_run_invalid_parameters(self, lookup_plugin, mocker, params, err):
+ aws_lookup_base_run = mocker.patch(
+ "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret.AWSLookupBase.run"
+ )
+ aws_lookup_base_run.return_value = True
+ m_list_secrets = mocker.patch(
+ "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret._list_secrets"
+ )
+ m_list_secrets.return_value = {"SecretList": []}
+
+ lookup_plugin.params = params
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ lookup_plugin.run(terms=["testing_secret"], variables=[])
+ assert err == str(exc_info.value)
+
+ def test_run_by_path(self, lookup_plugin, mocker):
+ aws_lookup_base_run = mocker.patch(
+ "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret.AWSLookupBase.run"
+ )
+ aws_lookup_base_run.return_value = True
+ m_list_secrets = mocker.patch(
+ "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret._list_secrets"
+ )
+ secrets_lists = [{"Name": "secret-0"}, {"Name": "secret-1"}, {"Name": "secret-2"}]
+ m_list_secrets.return_value = [{"SecretList": secrets_lists}]
+
+ params = {
+ "on_missing": pick_from_list(),
+ "on_denied": pick_from_list(),
+ "on_deleted": pick_from_list(),
+ "bypath": True,
+ }
+ lookup_plugin.params = params
+
+ lookup_plugin.get_secret_value = MagicMock()
+ secrets_values = {
+ "secret-0": "value-0",
+ "secret-1": "value-1",
+ "secret-2": "value-2",
+ }
+ lookup_plugin.get_secret_value.side_effect = lambda x, client, **kwargs: secrets_values.get(x)
+
+ secretsmanager_client = MagicMock()
+ lookup_plugin.client.return_value = secretsmanager_client
+
+ term = "term0"
+ assert [secrets_values] == lookup_plugin.run(terms=[term], variables=[])
+
+ m_list_secrets.assert_called_once_with(secretsmanager_client, term)
+ lookup_plugin.client.assert_called_once_with("secretsmanager", ANY)
+ lookup_plugin.get_secret_value.assert_has_calls(
+ [
+ call(
+ "secret-0",
+ secretsmanager_client,
+ on_missing=params.get("on_missing"),
+ on_denied=params.get("on_denied"),
+ ),
+ call(
+ "secret-1",
+ secretsmanager_client,
+ on_missing=params.get("on_missing"),
+ on_denied=params.get("on_denied"),
+ ),
+ call(
+ "secret-2",
+ secretsmanager_client,
+ on_missing=params.get("on_missing"),
+ on_denied=params.get("on_denied"),
+ ),
+ ]
+ )
+
+ @pytest.mark.parametrize("join_secrets", [True, False])
+ @pytest.mark.parametrize(
+ "terms", [["secret-0"], ["secret-0", "secret-1"], ["secret-0", "secret-1", "secret-0", "secret-2"]]
+ )
+ def test_run(self, lookup_plugin, mocker, join_secrets, terms):
+ aws_lookup_base_run = mocker.patch(
+ "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret.AWSLookupBase.run"
+ )
+ aws_lookup_base_run.return_value = True
+
+ params = {
+ "on_missing": pick_from_list(),
+ "on_denied": pick_from_list(),
+ "on_deleted": pick_from_list(),
+ "bypath": False,
+ "version_stage": MagicMock(),
+ "version_id": MagicMock(),
+ "nested": pick_from_list([True, False]),
+ "join": join_secrets,
+ }
+ lookup_plugin.params = params
+
+ lookup_plugin.get_secret_value = MagicMock()
+ secrets_values = {
+ "secret-0": "value-0",
+ "secret-1": "value-1",
+ }
+ lookup_plugin.get_secret_value.side_effect = lambda x, client, **kwargs: secrets_values.get(x)
+
+ secretsmanager_client = MagicMock()
+ lookup_plugin.client.return_value = secretsmanager_client
+
+ expected_secrets = [secrets_values.get(x) for x in terms if secrets_values.get(x) is not None]
+ if join_secrets:
+ expected_secrets = ["".join(expected_secrets)]
+
+ assert expected_secrets == lookup_plugin.run(terms=terms, variables=[])
+
+ lookup_plugin.client.assert_called_once_with("secretsmanager", ANY)
+ lookup_plugin.get_secret_value.assert_has_calls(
+ [
+ call(
+ x,
+ secretsmanager_client,
+ version_stage=params.get("version_stage"),
+ version_id=params.get("version_id"),
+ on_missing=params.get("on_missing"),
+ on_denied=params.get("on_denied"),
+ on_deleted=params.get("on_deleted"),
+ nested=params.get("nested"),
+ )
+ for x in terms
+ ]
+ )
+
+
+class TestLookupModuleGetSecretValue:
+ def test_get_secret__invalid_nested_value(self, lookup_plugin):
+ params = {
+ "version_stage": MagicMock(),
+ "version_id": MagicMock(),
+ "on_missing": None,
+ "on_denied": None,
+ "on_deleted": None,
+ }
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ client = MagicMock()
+ lookup_plugin.get_secret_value("aws_invalid_nested_secret", client, nested=True, **params)
+ assert "Nested query must use the following syntax: `aws_secret_name.<key_name>.<key_name>" == str(
+ exc_info.value
+ )
+
+ @pytest.mark.parametrize("versionId", [None, MagicMock()])
+ @pytest.mark.parametrize("versionStage", [None, MagicMock()])
+ @pytest.mark.parametrize(
+ "term,nested,secretId",
+ [
+ ("secret0", False, "secret0"),
+ ("secret0.child", False, "secret0.child"),
+ ("secret0.child", True, "secret0"),
+ ("secret0.root.child", False, "secret0.root.child"),
+ ("secret0.root.child", True, "secret0"),
+ ],
+ )
+ def test_get_secret__binary_secret(self, lookup_plugin, versionId, versionStage, term, nested, secretId):
+ params = {
+ "version_stage": versionStage,
+ "version_id": versionId,
+ "on_missing": None,
+ "on_denied": None,
+ "on_deleted": None,
+ }
+
+ client = MagicMock()
+ client.get_secret_value = MagicMock()
+ bin_secret_value = b"binary_value"
+ client.get_secret_value.return_value = {"SecretBinary": bin_secret_value}
+
+ assert bin_secret_value == lookup_plugin.get_secret_value(term, client, nested=nested, **params)
+ api_params = {"SecretId": secretId}
+ if versionId is not None:
+ api_params["VersionId"] = versionId
+ if versionStage:
+ api_params["VersionStage"] = versionStage
+ client.get_secret_value.assert_called_once_with(aws_retry=True, **api_params)
+
+ @pytest.mark.parametrize("on_missing", ["warn", "error"])
+ @pytest.mark.parametrize(
+ "term,missing_key",
+ [
+ ("secret_name.root.child1", "root.child1"),
+ ("secret_name.root.child1.nested", "root.child1"),
+ ("secret_name.root.child.nested1", "root.child.nested1"),
+ ("secret_name.root.child.nested.value", "root.child.nested.value"),
+ ],
+ )
+ def test_get_secret__missing_nested_secret(self, lookup_plugin, on_missing, term, missing_key):
+ client = MagicMock()
+ client.get_secret_value = MagicMock()
+ json_secret = '{"root": {"child": {"nested": "ansible-test-secret-0"}}}'
+ client.get_secret_value.return_value = {"SecretString": json_secret}
+
+ if on_missing == "error":
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ lookup_plugin.get_secret_value(term, client, nested=True, on_missing=on_missing)
+ assert f"Successfully retrieved secret but there exists no key {missing_key} in the secret" == str(
+ exc_info.value
+ )
+ else:
+ lookup_plugin._display = MagicMock()
+ lookup_plugin._display.warning = MagicMock()
+ assert lookup_plugin.get_secret_value(term, client, nested=True, on_missing=on_missing) is None
+ lookup_plugin._display.warning.assert_called_once_with(
+ f"Skipping, Successfully retrieved secret but there exists no key {missing_key} in the secret"
+ )
+
+ def test_get_secret__missing_secret(self, lookup_plugin):
+ client = MagicMock()
+ client.get_secret_value = MagicMock()
+ client.get_secret_value.side_effect = _raise_boto_clienterror("UnexpecteError", "unable to retrieve Secret")
+
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ lookup_plugin.get_secret_value(MagicMock(), client)
+ assert (
+ "Failed to retrieve secret: An error occurred (UnexpecteError) when calling the get_secret_value operation: unable to retrieve Secret"
+ == str(exc_info.value)
+ )
+
+ @pytest.mark.parametrize("on_denied", ["warn", "error"])
+ def test_get_secret__on_denied(self, lookup_plugin, on_denied):
+ client = MagicMock()
+ client.get_secret_value = MagicMock()
+ client.get_secret_value.side_effect = _raise_boto_clienterror(
+ "AccessDeniedException", "Access denied to Secret"
+ )
+ term = "ansible-test-secret-0123"
+
+ if on_denied == "error":
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ lookup_plugin.get_secret_value(term, client, on_denied=on_denied)
+ assert f"Failed to access secret {term} (AccessDenied)" == str(exc_info.value)
+ else:
+ lookup_plugin._display = MagicMock()
+ lookup_plugin._display.warning = MagicMock()
+ assert lookup_plugin.get_secret_value(term, client, on_denied=on_denied) is None
+ lookup_plugin._display.warning.assert_called_once_with(f"Skipping, access denied for secret {term}")
+
+ @pytest.mark.parametrize("on_missing", ["warn", "error"])
+ def test_get_secret__on_missing(self, lookup_plugin, on_missing):
+ client = MagicMock()
+ client.get_secret_value = MagicMock()
+ client.get_secret_value.side_effect = _raise_boto_clienterror("ResourceNotFoundException", "secret not found")
+ term = "ansible-test-secret-4561"
+
+ if on_missing == "error":
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ lookup_plugin.get_secret_value(term, client, on_missing=on_missing)
+ assert f"Failed to find secret {term} (ResourceNotFound)" == str(exc_info.value)
+ else:
+ lookup_plugin._display = MagicMock()
+ lookup_plugin._display.warning = MagicMock()
+ assert lookup_plugin.get_secret_value(term, client, on_missing=on_missing) is None
+ lookup_plugin._display.warning.assert_called_once_with(f"Skipping, did not find secret {term}")
+
+ @pytest.mark.parametrize("on_deleted", ["warn", "error"])
+ def test_get_secret__on_deleted(self, lookup_plugin, on_deleted):
+ client = MagicMock()
+ client.get_secret_value = MagicMock()
+ client.get_secret_value.side_effect = _raise_boto_clienterror(
+ "ResourceMarkedForDeletion", "marked for deletion"
+ )
+ term = "ansible-test-secret-8790"
+
+ if on_deleted == "error":
+ with pytest.raises(AnsibleLookupError) as exc_info:
+ lookup_plugin.get_secret_value(term, client, on_deleted=on_deleted)
+ assert f"Failed to find secret {term} (marked for deletion)" == str(exc_info.value)
+ else:
+ lookup_plugin._display = MagicMock()
+ lookup_plugin._display.warning = MagicMock()
+ assert lookup_plugin.get_secret_value(term, client, on_deleted=on_deleted) is None
+ lookup_plugin._display.warning.assert_called_once_with(
+ f"Skipping, did not find secret (marked for deletion) {term}"
+ )
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py
index a7d1e0475..7a870163c 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py
@@ -1,16 +1,13 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
import json
import pytest
-from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_bytes
from ansible.module_utils.common._collections_compat import MutableMapping
+from ansible.module_utils.six import string_types
@pytest.fixture
@@ -18,14 +15,14 @@ def patch_ansible_module(request, mocker):
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
- if 'ANSIBLE_MODULE_ARGS' not in request.param:
- request.param = {'ANSIBLE_MODULE_ARGS': request.param}
- if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
- request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
- if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
- request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
+ if "ANSIBLE_MODULE_ARGS" not in request.param:
+ request.param = {"ANSIBLE_MODULE_ARGS": request.param}
+ if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]:
+ request.param["ANSIBLE_MODULE_ARGS"]["_ansible_remote_tmp"] = "/tmp"
+ if "_ansible_keep_remote_files" not in request.param["ANSIBLE_MODULE_ARGS"]:
+ request.param["ANSIBLE_MODULE_ARGS"]["_ansible_keep_remote_files"] = False
args = json.dumps(request.param)
else:
- raise Exception('Malformed data to the patch_ansible_module pytest fixture')
+ raise Exception("Malformed data to the patch_ansible_module pytest fixture")
- mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))
+ mocker.patch("ansible.module_utils.basic._ANSIBLE_ARGS", to_bytes(args))
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py
new file mode 100644
index 000000000..0afeab56a
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py
@@ -0,0 +1,65 @@
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import pytest
+
+from ansible_collections.amazon.aws.plugins.modules import ec2_eip
+
+EXAMPLE_DATA = [
+ (
+ None,
+ True,
+ False,
+ ),
+ (
+ None,
+ False,
+ False,
+ ),
+ (
+ "",
+ True,
+ False,
+ ),
+ (
+ "",
+ False,
+ False,
+ ),
+ (
+ "i-123456789",
+ True,
+ True,
+ ),
+ (
+ "i-123456789",
+ False,
+ True,
+ ),
+ (
+ "eni-123456789",
+ True,
+ False,
+ ),
+ (
+ "junk",
+ True,
+ False,
+ ),
+ (
+ "junk",
+ False,
+ False,
+ ),
+]
+
+
+def test_check_is_instance_needs_in_vpc():
+ with pytest.raises(ec2_eip.EipError):
+ ec2_eip.check_is_instance("eni-123456789", False)
+
+
+@pytest.mark.parametrize("device,in_vpc,expected", EXAMPLE_DATA)
+def test_check_is_instance(device, in_vpc, expected):
+ result = ec2_eip.check_is_instance(device, in_vpc)
+ assert result is expected
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py
index e889b676a..a64c16961 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py
@@ -3,23 +3,21 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from unittest.mock import sentinel
import pytest
-from ansible_collections.amazon.aws.tests.unit.compat.mock import sentinel
import ansible_collections.amazon.aws.plugins.modules.ec2_instance as ec2_instance_module
@pytest.fixture
def params_object():
params = {
- 'iam_instance_profile': None,
- 'exact_count': None,
- 'count': None,
- 'launch_template': None,
- 'instance_type': None,
+ "iam_instance_profile": None,
+ "exact_count": None,
+ "count": None,
+ "launch_template": None,
+ "instance_type": sentinel.INSTANCE_TYPE,
}
return params
@@ -29,11 +27,13 @@ def ec2_instance(monkeypatch):
# monkey patches various ec2_instance module functions, we'll separately test the operation of
# these functions, we just care that it's passing the results into the right place in the
# instance spec.
- monkeypatch.setattr(ec2_instance_module, 'build_top_level_options', lambda params: {'TOP_LEVEL_OPTIONS': sentinel.TOP_LEVEL})
- monkeypatch.setattr(ec2_instance_module, 'build_network_spec', lambda params: sentinel.NETWORK_SPEC)
- monkeypatch.setattr(ec2_instance_module, 'build_volume_spec', lambda params: sentinel.VOlUME_SPEC)
- monkeypatch.setattr(ec2_instance_module, 'build_instance_tags', lambda params: sentinel.TAG_SPEC)
- monkeypatch.setattr(ec2_instance_module, 'determine_iam_role', lambda params: sentinel.IAM_PROFILE_ARN)
+ monkeypatch.setattr(
+ ec2_instance_module, "build_top_level_options", lambda params: {"TOP_LEVEL_OPTIONS": sentinel.TOP_LEVEL}
+ )
+ monkeypatch.setattr(ec2_instance_module, "build_network_spec", lambda params: sentinel.NETWORK_SPEC)
+ monkeypatch.setattr(ec2_instance_module, "build_volume_spec", lambda params: sentinel.VOlUME_SPEC)
+ monkeypatch.setattr(ec2_instance_module, "build_instance_tags", lambda params: sentinel.TAG_SPEC)
+ monkeypatch.setattr(ec2_instance_module, "determine_iam_role", lambda params: sentinel.IAM_PROFILE_ARN)
return ec2_instance_module
@@ -43,33 +43,37 @@ def _assert_defaults(instance_spec, to_skip=None):
assert isinstance(instance_spec, dict)
- if 'TagSpecifications' not in to_skip:
- assert 'TagSpecifications' in instance_spec
- assert instance_spec['TagSpecifications'] is sentinel.TAG_SPEC
+ if "TagSpecifications" not in to_skip:
+ assert "TagSpecifications" in instance_spec
+ assert instance_spec["TagSpecifications"] is sentinel.TAG_SPEC
- if 'NetworkInterfaces' not in to_skip:
- assert 'NetworkInterfaces' in instance_spec
- assert instance_spec['NetworkInterfaces'] is sentinel.NETWORK_SPEC
+ if "NetworkInterfaces" not in to_skip:
+ assert "NetworkInterfaces" in instance_spec
+ assert instance_spec["NetworkInterfaces"] is sentinel.NETWORK_SPEC
- if 'BlockDeviceMappings' not in to_skip:
- assert 'BlockDeviceMappings' in instance_spec
- assert instance_spec['BlockDeviceMappings'] is sentinel.VOlUME_SPEC
+ if "BlockDeviceMappings" not in to_skip:
+ assert "BlockDeviceMappings" in instance_spec
+ assert instance_spec["BlockDeviceMappings"] is sentinel.VOlUME_SPEC
- if 'IamInstanceProfile' not in to_skip:
+ if "IamInstanceProfile" not in to_skip:
# By default, this shouldn't be returned
- assert 'IamInstanceProfile' not in instance_spec
+ assert "IamInstanceProfile" not in instance_spec
- if 'MinCount' not in to_skip:
- assert 'MinCount' in instance_spec
- instance_spec['MinCount'] == 1
+ if "MinCount" not in to_skip:
+ assert "MinCount" in instance_spec
+ instance_spec["MinCount"] == 1
- if 'MaxCount' not in to_skip:
- assert 'MaxCount' in instance_spec
- instance_spec['MaxCount'] == 1
+ if "MaxCount" not in to_skip:
+ assert "MaxCount" in instance_spec
+ instance_spec["MaxCount"] == 1
- if 'TOP_LEVEL_OPTIONS' not in to_skip:
- assert 'TOP_LEVEL_OPTIONS' in instance_spec
- assert instance_spec['TOP_LEVEL_OPTIONS'] is sentinel.TOP_LEVEL
+ if "TOP_LEVEL_OPTIONS" not in to_skip:
+ assert "TOP_LEVEL_OPTIONS" in instance_spec
+ assert instance_spec["TOP_LEVEL_OPTIONS"] is sentinel.TOP_LEVEL
+
+ if "InstanceType" not in to_skip:
+ assert "InstanceType" in instance_spec
+ instance_spec["InstanceType"] == sentinel.INSTANCE_TYPE
def test_build_run_instance_spec_defaults(params_object, ec2_instance):
@@ -77,50 +81,72 @@ def test_build_run_instance_spec_defaults(params_object, ec2_instance):
_assert_defaults(instance_spec)
+def test_build_run_instance_spec_type_required(params_object, ec2_instance):
+ params_object["instance_type"] = None
+ params_object["launch_template"] = None
+ # Test that we throw an Ec2InstanceAWSError if passed neither
+ with pytest.raises(ec2_instance.Ec2InstanceAWSError):
+ instance_spec = ec2_instance.build_run_instance_spec(params_object)
+
+ # Test that instance_type can be None if launch_template is set
+ params_object["launch_template"] = sentinel.LAUNCH_TEMPLATE
+ instance_spec = ec2_instance.build_run_instance_spec(params_object)
+ _assert_defaults(instance_spec, ["InstanceType"])
+ assert "InstanceType" not in instance_spec
+
+
def test_build_run_instance_spec_tagging(params_object, ec2_instance, monkeypatch):
# build_instance_tags can return None, RunInstance doesn't like this
- monkeypatch.setattr(ec2_instance_module, 'build_instance_tags', lambda params: None)
+ monkeypatch.setattr(ec2_instance_module, "build_instance_tags", lambda params: None)
instance_spec = ec2_instance.build_run_instance_spec(params_object)
- _assert_defaults(instance_spec, ['TagSpecifications'])
- assert 'TagSpecifications' not in instance_spec
+ _assert_defaults(instance_spec, ["TagSpecifications"])
+ assert "TagSpecifications" not in instance_spec
# if someone *explicitly* passes {} (rather than not setting it), then [] can be returned
- monkeypatch.setattr(ec2_instance_module, 'build_instance_tags', lambda params: [])
+ monkeypatch.setattr(ec2_instance_module, "build_instance_tags", lambda params: [])
instance_spec = ec2_instance.build_run_instance_spec(params_object)
- _assert_defaults(instance_spec, ['TagSpecifications'])
- assert 'TagSpecifications' in instance_spec
- assert instance_spec['TagSpecifications'] == []
+ _assert_defaults(instance_spec, ["TagSpecifications"])
+ assert "TagSpecifications" in instance_spec
+ assert instance_spec["TagSpecifications"] == []
def test_build_run_instance_spec_instance_profile(params_object, ec2_instance):
- params_object['iam_instance_profile'] = sentinel.INSTANCE_PROFILE_NAME
+ params_object["iam_instance_profile"] = sentinel.INSTANCE_PROFILE_NAME
instance_spec = ec2_instance.build_run_instance_spec(params_object)
- _assert_defaults(instance_spec, ['IamInstanceProfile'])
- assert 'IamInstanceProfile' in instance_spec
- assert instance_spec['IamInstanceProfile'] == {'Arn': sentinel.IAM_PROFILE_ARN}
+ _assert_defaults(instance_spec, ["IamInstanceProfile"])
+ assert "IamInstanceProfile" in instance_spec
+ assert instance_spec["IamInstanceProfile"] == {"Arn": sentinel.IAM_PROFILE_ARN}
def test_build_run_instance_spec_count(params_object, ec2_instance):
# When someone passes 'count', that number of instances will be *launched*
- params_object['count'] = sentinel.COUNT
+ params_object["count"] = sentinel.COUNT
instance_spec = ec2_instance.build_run_instance_spec(params_object)
- _assert_defaults(instance_spec, ['MaxCount', 'MinCount'])
- assert 'MaxCount' in instance_spec
- assert 'MinCount' in instance_spec
- assert instance_spec['MaxCount'] == sentinel.COUNT
- assert instance_spec['MinCount'] == sentinel.COUNT
+ _assert_defaults(instance_spec, ["MaxCount", "MinCount"])
+ assert "MaxCount" in instance_spec
+ assert "MinCount" in instance_spec
+ assert instance_spec["MaxCount"] == sentinel.COUNT
+ assert instance_spec["MinCount"] == sentinel.COUNT
def test_build_run_instance_spec_exact_count(params_object, ec2_instance):
# The "exact_count" logic relies on enforce_count doing the math to figure out how many
# instances to start/stop. The enforce_count call is responsible for ensuring that 'to_launch'
# is set and is a positive integer.
- params_object['exact_count'] = sentinel.EXACT_COUNT
- params_object['to_launch'] = sentinel.TO_LAUNCH
+ params_object["exact_count"] = 42
+ params_object["to_launch"] = sentinel.TO_LAUNCH
instance_spec = ec2_instance.build_run_instance_spec(params_object)
- _assert_defaults(instance_spec, ['MaxCount', 'MinCount'])
- assert 'MaxCount' in instance_spec
- assert 'MinCount' in instance_spec
- assert instance_spec['MaxCount'] == sentinel.TO_LAUNCH
- assert instance_spec['MinCount'] == sentinel.TO_LAUNCH
+ _assert_defaults(instance_spec, ["MaxCount", "MinCount"])
+ assert "MaxCount" in instance_spec
+ assert "MinCount" in instance_spec
+ assert instance_spec["MaxCount"] == 42
+ assert instance_spec["MinCount"] == 42
+
+ instance_spec = ec2_instance.build_run_instance_spec(params_object, 7)
+
+ _assert_defaults(instance_spec, ["MaxCount", "MinCount"])
+ assert "MaxCount" in instance_spec
+ assert "MinCount" in instance_spec
+ assert instance_spec["MaxCount"] == 35
+ assert instance_spec["MinCount"] == 35
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py
index cdde74c97..7645d5559 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py
@@ -3,16 +3,14 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+import sys
+from unittest.mock import MagicMock
+from unittest.mock import sentinel
import pytest
-import sys
-from ansible_collections.amazon.aws.tests.unit.compat.mock import MagicMock
-from ansible_collections.amazon.aws.tests.unit.compat.mock import sentinel
-import ansible_collections.amazon.aws.plugins.modules.ec2_instance as ec2_instance_module
import ansible_collections.amazon.aws.plugins.module_utils.arn as utils_arn
+import ansible_collections.amazon.aws.plugins.modules.ec2_instance as ec2_instance_module
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
try:
@@ -20,24 +18,29 @@ try:
except ImportError:
pass
-pytest.mark.skipif(not HAS_BOTO3, reason="test_determine_iam_role.py requires the python modules 'boto3' and 'botocore'")
+pytest.mark.skipif(
+ not HAS_BOTO3, reason="test_determine_iam_role.py requires the python modules 'boto3' and 'botocore'"
+)
-def _client_error(code='GenericError'):
+def _client_error(code="GenericError"):
return botocore.exceptions.ClientError(
- {'Error': {'Code': code, 'Message': 'Something went wrong'},
- 'ResponseMetadata': {'RequestId': '01234567-89ab-cdef-0123-456789abcdef'}},
- 'some_called_method')
+ {
+ "Error": {"Code": code, "Message": "Something went wrong"},
+ "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"},
+ },
+ "some_called_method",
+ )
@pytest.fixture
def params_object():
params = {
- 'instance_role': None,
- 'exact_count': None,
- 'count': None,
- 'launch_template': None,
- 'instance_type': None,
+ "instance_role": None,
+ "exact_count": None,
+ "count": None,
+ "launch_template": None,
+ "instance_type": None,
}
return params
@@ -49,8 +52,8 @@ class FailJsonException(Exception):
@pytest.fixture
def ec2_instance(monkeypatch):
- monkeypatch.setattr(ec2_instance_module, 'parse_aws_arn', lambda arn: None)
- monkeypatch.setattr(ec2_instance_module, 'module', MagicMock())
+ monkeypatch.setattr(ec2_instance_module, "validate_aws_arn", lambda arn, service, resource_type: None)
+ monkeypatch.setattr(ec2_instance_module, "module", MagicMock())
ec2_instance_module.module.fail_json.side_effect = FailJsonException()
ec2_instance_module.module.fail_json_aws.side_effect = FailJsonException()
return ec2_instance_module
@@ -58,15 +61,15 @@ def ec2_instance(monkeypatch):
def test_determine_iam_role_arn(params_object, ec2_instance, monkeypatch):
# Revert the default monkey patch to make it simple to try passing a valid ARNs
- monkeypatch.setattr(ec2_instance, 'parse_aws_arn', utils_arn.parse_aws_arn)
+ monkeypatch.setattr(ec2_instance, "validate_aws_arn", utils_arn.validate_aws_arn)
# Simplest example, someone passes a valid instance profile ARN
- arn = ec2_instance.determine_iam_role('arn:aws:iam::123456789012:instance-profile/myprofile')
- assert arn == 'arn:aws:iam::123456789012:instance-profile/myprofile'
+ arn = ec2_instance.determine_iam_role("arn:aws:iam::123456789012:instance-profile/myprofile")
+ assert arn == "arn:aws:iam::123456789012:instance-profile/myprofile"
def test_determine_iam_role_name(params_object, ec2_instance):
- profile_description = {'InstanceProfile': {'Arn': sentinel.IAM_PROFILE_ARN}}
+ profile_description = {"InstanceProfile": {"Arn": sentinel.IAM_PROFILE_ARN}}
iam_client = MagicMock(**{"get_instance_profile.return_value": profile_description})
ec2_instance_module.module.client.return_value = iam_client
@@ -75,28 +78,28 @@ def test_determine_iam_role_name(params_object, ec2_instance):
def test_determine_iam_role_missing(params_object, ec2_instance):
- missing_exception = _client_error('NoSuchEntity')
+ missing_exception = _client_error("NoSuchEntity")
iam_client = MagicMock(**{"get_instance_profile.side_effect": missing_exception})
ec2_instance_module.module.client.return_value = iam_client
- with pytest.raises(FailJsonException) as exception:
- arn = ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME)
+ with pytest.raises(FailJsonException):
+ ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME)
assert ec2_instance_module.module.fail_json_aws.call_count == 1
assert ec2_instance_module.module.fail_json_aws.call_args.args[0] is missing_exception
- assert 'Could not find' in ec2_instance_module.module.fail_json_aws.call_args.kwargs['msg']
+ assert "Could not find" in ec2_instance_module.module.fail_json_aws.call_args.kwargs["msg"]
-@pytest.mark.skipif(sys.version_info < (3, 8), reason='call_args behaviour changed in Python 3.8')
+@pytest.mark.skipif(sys.version_info < (3, 8), reason="call_args behaviour changed in Python 3.8")
def test_determine_iam_role_missing(params_object, ec2_instance):
missing_exception = _client_error()
iam_client = MagicMock(**{"get_instance_profile.side_effect": missing_exception})
ec2_instance_module.module.client.return_value = iam_client
- with pytest.raises(FailJsonException) as exception:
- arn = ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME)
+ with pytest.raises(FailJsonException):
+ ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME)
assert ec2_instance_module.module.fail_json_aws.call_count == 1
assert ec2_instance_module.module.fail_json_aws.call_args.args[0] is missing_exception
- assert 'An error occurred while searching' in ec2_instance_module.module.fail_json_aws.call_args.kwargs['msg']
- assert 'Please try supplying the full ARN' in ec2_instance_module.module.fail_json_aws.call_args.kwargs['msg']
+ assert "An error occurred while searching" in ec2_instance_module.module.fail_json_aws.call_args.kwargs["msg"]
+ assert "Please try supplying the full ARN" in ec2_instance_module.module.fail_json_aws.call_args.kwargs["msg"]
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py
new file mode 100644
index 000000000..1abfd526c
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py
@@ -0,0 +1,240 @@
+# (c) 2022 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import sys
+from unittest.mock import sentinel
+
+import pytest
+
+import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module
+
+PORT_EXPANSION = [
+ ({"from_port": 83}, ({"from_port": 83, "to_port": None},)),
+ ({"to_port": 36}, ({"from_port": None, "to_port": 36},)),
+ ({"icmp_type": 90}, ({"from_port": 90, "to_port": None},)),
+ ({"icmp_type": 74, "icmp_code": 66}, ({"from_port": 74, "to_port": 66},)),
+ # Note: ports is explicitly a list of strings because we support "<port a>-<port b>"
+ ({"ports": ["1"]}, ({"from_port": 1, "to_port": 1},)),
+ ({"ports": ["41-85"]}, ({"from_port": 41, "to_port": 85},)),
+ (
+ {"ports": ["63", "74"]},
+ (
+ {"from_port": 63, "to_port": 63},
+ {"from_port": 74, "to_port": 74},
+ ),
+ ),
+ (
+ {"ports": ["97-30", "41-80"]},
+ (
+ {"from_port": 30, "to_port": 97},
+ {"from_port": 41, "to_port": 80},
+ ),
+ ),
+ (
+ {"ports": ["95", "67-79"]},
+ (
+ {"from_port": 95, "to_port": 95},
+ {"from_port": 67, "to_port": 79},
+ ),
+ ),
+ # There are legitimate cases with no port info
+ ({}, ({},)),
+]
+PORTS_EXPANSION = [
+ (["28"], [(28, 28)]),
+ (["80-83"], [(80, 83)]),
+ # We tolerate the order being backwards
+ (["83-80"], [(80, 83)]),
+ (["41", "1"], [(41, 41), (1, 1)]),
+ (["70", "39-0"], [(70, 70), (0, 39)]),
+ (["57-6", "31"], [(6, 57), (31, 31)]),
+ # https://github.com/ansible-collections/amazon.aws/pull/1241
+ (["-1"], [(-1, -1)]),
+]
+SOURCE_EXPANSION = [
+ (
+ {"cidr_ip": ["192.0.2.0/24"]},
+ ({"cidr_ip": "192.0.2.0/24"},),
+ ),
+ (
+ {"cidr_ipv6": ["2001:db8::/32"]},
+ ({"cidr_ipv6": "2001:db8::/32"},),
+ ),
+ (
+ {"group_id": ["sg-123456789"]},
+ ({"group_id": "sg-123456789"},),
+ ),
+ (
+ {"group_name": ["MyExampleGroupName"]},
+ ({"group_name": "MyExampleGroupName"},),
+ ),
+ (
+ {"ip_prefix": ["pl-123456abcde123456"]},
+ ({"ip_prefix": "pl-123456abcde123456"},),
+ ),
+ (
+ {"cidr_ip": ["192.0.2.0/24", "198.51.100.0/24"]},
+ (
+ {"cidr_ip": "192.0.2.0/24"},
+ {"cidr_ip": "198.51.100.0/24"},
+ ),
+ ),
+ (
+ {"cidr_ipv6": ["2001:db8::/32", "100::/64"]},
+ (
+ {"cidr_ipv6": "2001:db8::/32"},
+ {"cidr_ipv6": "100::/64"},
+ ),
+ ),
+ (
+ {"group_id": ["sg-123456789", "sg-abcdef1234"]},
+ (
+ {"group_id": "sg-123456789"},
+ {"group_id": "sg-abcdef1234"},
+ ),
+ ),
+ (
+ {"group_name": ["MyExampleGroupName", "AnotherExample"]},
+ (
+ {"group_name": "MyExampleGroupName"},
+ {"group_name": "AnotherExample"},
+ ),
+ ),
+ (
+ {"ip_prefix": ["pl-123456abcde123456", "pl-abcdef12345abcdef"]},
+ ({"ip_prefix": "pl-123456abcde123456"}, {"ip_prefix": "pl-abcdef12345abcdef"}),
+ ),
+ (
+ {
+ "cidr_ip": ["192.0.2.0/24"],
+ "cidr_ipv6": ["2001:db8::/32"],
+ "group_id": ["sg-123456789"],
+ "group_name": ["MyExampleGroupName"],
+ "ip_prefix": ["pl-123456abcde123456"],
+ },
+ (
+ {"cidr_ip": "192.0.2.0/24"},
+ {"cidr_ipv6": "2001:db8::/32"},
+ {"group_id": "sg-123456789"},
+ {"group_name": "MyExampleGroupName"},
+ {"ip_prefix": "pl-123456abcde123456"},
+ ),
+ ),
+ (
+ {
+ "cidr_ip": ["192.0.2.0/24", "198.51.100.0/24"],
+ "cidr_ipv6": ["2001:db8::/32", "100::/64"],
+ "group_id": ["sg-123456789", "sg-abcdef1234"],
+ "group_name": ["MyExampleGroupName", "AnotherExample"],
+ "ip_prefix": ["pl-123456abcde123456", "pl-abcdef12345abcdef"],
+ },
+ (
+ {"cidr_ip": "192.0.2.0/24"},
+ {"cidr_ip": "198.51.100.0/24"},
+ {"cidr_ipv6": "2001:db8::/32"},
+ {"cidr_ipv6": "100::/64"},
+ {"group_id": "sg-123456789"},
+ {"group_id": "sg-abcdef1234"},
+ {"group_name": "MyExampleGroupName"},
+ {"group_name": "AnotherExample"},
+ {"ip_prefix": "pl-123456abcde123456"},
+ {"ip_prefix": "pl-abcdef12345abcdef"},
+ ),
+ ),
+]
+
+RULE_EXPANSION = [
+ (
+ {"ports": ["24"], "cidr_ip": ["192.0.2.0/24"], "sentinel": sentinel.RULE_VALUE},
+ [
+ {"from_port": 24, "to_port": 24, "cidr_ip": "192.0.2.0/24", "sentinel": sentinel.RULE_VALUE},
+ ],
+ ),
+ (
+ {"ports": ["24", "50"], "cidr_ip": ["192.0.2.0/24", "198.51.100.0/24"], "sentinel": sentinel.RULE_VALUE},
+ [
+ {"from_port": 24, "to_port": 24, "cidr_ip": "192.0.2.0/24", "sentinel": sentinel.RULE_VALUE},
+ {"from_port": 24, "to_port": 24, "cidr_ip": "198.51.100.0/24", "sentinel": sentinel.RULE_VALUE},
+ {"from_port": 50, "to_port": 50, "cidr_ip": "192.0.2.0/24", "sentinel": sentinel.RULE_VALUE},
+ {"from_port": 50, "to_port": 50, "cidr_ip": "198.51.100.0/24", "sentinel": sentinel.RULE_VALUE},
+ ],
+ ),
+]
+
+
+@pytest.mark.parametrize("rule, expected", PORT_EXPANSION)
+def test_expand_ports_from_rule(rule, expected):
+ assert ec2_security_group_module.expand_ports_from_rule(rule) == expected
+
+ # We shouldn't care about extra values lurking in the rule definition
+ rule["junk"] = sentinel.EXTRA_JUNK
+ assert ec2_security_group_module.expand_ports_from_rule(rule) == expected
+
+
+@pytest.mark.parametrize("rule, expected", SOURCE_EXPANSION)
+def test_expand_sources_from_rule(rule, expected):
+ assert ec2_security_group_module.expand_sources_from_rule(rule) == expected
+
+ # We shouldn't care about extra values lurking in the rule definition
+ rule["junk"] = sentinel.EXTRA_JUNK
+ assert ec2_security_group_module.expand_sources_from_rule(rule) == expected
+
+
+@pytest.mark.parametrize("rule, expected", PORTS_EXPANSION)
+def test_expand_ports_list(rule, expected):
+ assert ec2_security_group_module.expand_ports_list(rule) == expected
+
+
+@pytest.mark.skipif(
+ sys.version_info < (3, 7),
+ reason="requires Python 3.7 or higher - sentinel doesn't behave well with deepcopy in Python 3.6",
+)
+@pytest.mark.parametrize("source_type", sorted(ec2_security_group_module.SOURCE_TYPES_ALL))
+def test_strip_rule_source(source_type):
+ rule = {source_type: sentinel.SOURCE_VALUE}
+ assert ec2_security_group_module._strip_rule(rule) == {}
+ assert rule == {source_type: sentinel.SOURCE_VALUE}
+
+ rule = {source_type: sentinel.SOURCE_VALUE, "sentinel": sentinel.SENTINEL_VALUE}
+ assert ec2_security_group_module._strip_rule(rule) == {"sentinel": sentinel.SENTINEL_VALUE}
+ assert rule == {source_type: sentinel.SOURCE_VALUE, "sentinel": sentinel.SENTINEL_VALUE}
+
+
+@pytest.mark.skipif(
+ sys.version_info < (3, 7),
+ reason="requires Python 3.7 or higher - sentinel doesn't behave well with deepcopy in Python 3.6",
+)
+@pytest.mark.parametrize("port_type", sorted(ec2_security_group_module.PORT_TYPES_ALL))
+def test_strip_rule_port(port_type):
+ rule = {port_type: sentinel.PORT_VALUE}
+ assert ec2_security_group_module._strip_rule(rule) == {}
+ assert rule == {port_type: sentinel.PORT_VALUE}
+
+ rule = {port_type: sentinel.PORT_VALUE, "sentinel": sentinel.SENTINEL_VALUE}
+ assert ec2_security_group_module._strip_rule(rule) == {"sentinel": sentinel.SENTINEL_VALUE}
+ assert rule == {port_type: sentinel.PORT_VALUE, "sentinel": sentinel.SENTINEL_VALUE}
+
+
+@pytest.mark.skipif(
+ sys.version_info < (3, 7),
+ reason="requires Python 3.7 or higher - sentinel doesn't behave well with deepcopy in Python 3.6",
+)
+@pytest.mark.parametrize("rule, expected", RULE_EXPANSION)
+def test_rule_expand(rule, expected):
+ assert ec2_security_group_module.expand_rule(rule) == expected
+
+
+##########################################################
+# Examples where we explicitly expect to raise an exception
+
+
+def test_expand_ports_list_bad():
+ with pytest.raises(ec2_security_group_module.SecurityGroupError):
+ ec2_security_group_module.expand_ports_list(["junk"])
+
+
+def test_expand_sources_from_rule_bad():
+ with pytest.raises(ec2_security_group_module.SecurityGroupError):
+ ec2_security_group_module.expand_sources_from_rule(dict())
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py
new file mode 100644
index 000000000..358512a00
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py
@@ -0,0 +1,239 @@
+# (c) 2022 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import sentinel
+
+import pytest
+
+import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module
+
+SORT_ORDER = [
+ (dict(), dict()),
+ (
+ dict(ip_permissions=[], ip_permissions_egress=[]),
+ dict(ip_permissions=[], ip_permissions_egress=[]),
+ ),
+ (
+ dict(
+ ip_permissions=[
+ dict(
+ ip_protocol="tcp",
+ ip_ranges=[],
+ ipv6_ranges=[
+ dict(cidr_ipv6="2001:DB8:8000::/34"),
+ dict(cidr_ipv6="2001:DB8:4000::/34"),
+ ],
+ prefix_list_ids=[],
+ user_id_group_pairs=[],
+ ),
+ dict(
+ ip_protocol="-1",
+ ip_ranges=[
+ dict(cidr_ip="198.51.100.0/24"),
+ dict(cidr_ip="192.0.2.0/24"),
+ ],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ user_id_group_pairs=[],
+ ),
+ dict(
+ from_port="22",
+ ip_ranges=[],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ to_port="22",
+ user_id_group_pairs=[
+ dict(group_id="sg-3950599b", user_id="123456789012"),
+ dict(group_id="sg-fbfd1e3a", user_id="012345678901"),
+ dict(group_id="sg-00ec640f", user_id="012345678901"),
+ ],
+ ),
+ dict(
+ from_port=38,
+ ip_protocol="tcp",
+ ip_ranges=[],
+ ipv6_ranges=[],
+ prefix_list_ids=[
+ dict(prefix_list_id="pl-2263adef"),
+ dict(prefix_list_id="pl-0a5fccee"),
+ dict(prefix_list_id="pl-65911ba9"),
+ ],
+ to_port=38,
+ user_id_group_pairs=[],
+ ),
+ ],
+ ip_permissions_egress=[
+ dict(
+ ip_protocol="-1",
+ ip_ranges=[
+ dict(cidr_ip="198.51.100.0/24"),
+ dict(cidr_ip="192.0.2.0/24"),
+ ],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ user_id_group_pairs=[],
+ ),
+ dict(
+ from_port=443,
+ ip_protocol="tcp",
+ ip_ranges=[],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ to_port=443,
+ user_id_group_pairs=[
+ dict(group_id="sg-fbfd1e3a", user_id="012345678901"),
+ dict(group_id="sg-00ec640f", user_id="012345678901"),
+ ],
+ ),
+ ],
+ ),
+ dict(
+ ip_permissions=[
+ dict(
+ ip_protocol="-1",
+ ip_ranges=[
+ dict(cidr_ip="192.0.2.0/24"),
+ dict(cidr_ip="198.51.100.0/24"),
+ ],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ user_id_group_pairs=[],
+ ),
+ dict(
+ ip_protocol="tcp",
+ ip_ranges=[],
+ ipv6_ranges=[
+ dict(cidr_ipv6="2001:DB8:4000::/34"),
+ dict(cidr_ipv6="2001:DB8:8000::/34"),
+ ],
+ prefix_list_ids=[],
+ user_id_group_pairs=[],
+ ),
+ dict(
+ from_port=38,
+ ip_protocol="tcp",
+ ip_ranges=[],
+ ipv6_ranges=[],
+ prefix_list_ids=[
+ dict(prefix_list_id="pl-0a5fccee"),
+ dict(prefix_list_id="pl-2263adef"),
+ dict(prefix_list_id="pl-65911ba9"),
+ ],
+ to_port=38,
+ user_id_group_pairs=[],
+ ),
+ dict(
+ from_port="22",
+ ip_ranges=[],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ to_port="22",
+ user_id_group_pairs=[
+ dict(group_id="sg-00ec640f", user_id="012345678901"),
+ dict(group_id="sg-3950599b", user_id="123456789012"),
+ dict(group_id="sg-fbfd1e3a", user_id="012345678901"),
+ ],
+ ),
+ ],
+ ip_permissions_egress=[
+ dict(
+ ip_protocol="-1",
+ ip_ranges=[
+ dict(cidr_ip="192.0.2.0/24"),
+ dict(cidr_ip="198.51.100.0/24"),
+ ],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ user_id_group_pairs=[],
+ ),
+ dict(
+ from_port=443,
+ ip_protocol="tcp",
+ ip_ranges=[],
+ ipv6_ranges=[],
+ prefix_list_ids=[],
+ to_port=443,
+ user_id_group_pairs=[
+ dict(group_id="sg-00ec640f", user_id="012345678901"),
+ dict(group_id="sg-fbfd1e3a", user_id="012345678901"),
+ ],
+ ),
+ ],
+ ),
+ ),
+]
+
+
+@pytest.mark.parametrize("group, expected", SORT_ORDER)
+def test_sort_security_group(group, expected):
+ assert ec2_security_group_module.sort_security_group(group) == expected
+
+ # We shouldn't care about extra values lurking in the security group definition
+ group["junk"] = sentinel.EXTRA_JUNK
+ expected["junk"] = sentinel.EXTRA_JUNK
+ assert ec2_security_group_module.sort_security_group(group) == expected
+
+
+def test_get_rule_sort_key():
+ # Random text, to try and ensure the content of the string doesn't affect the key returned
+ dict_to_sort = dict(
+ cidr_ip="MtY0d3Ps6ePsMM0zB18g",
+ cidr_ipv6="ffbCwK2xhCsy8cyXqHuz",
+ prefix_list_id="VXKCoW296XxIRiBrTUw8",
+ group_id="RZpolpZ5wYPPpbqVo1Db",
+ sentinel=sentinel.EXTRA_RULE_KEY,
+ )
+
+ # Walk through through the keys we use and check that they have the priority we expect
+ for key_name in ["cidr_ip", "cidr_ipv6", "prefix_list_id", "group_id"]:
+ assert ec2_security_group_module.get_rule_sort_key(dict_to_sort) == dict_to_sort[key_name]
+ # Remove the current key so that the next time round another key will have priority
+ dict_to_sort.pop(key_name)
+
+ assert dict_to_sort == {"sentinel": sentinel.EXTRA_RULE_KEY}
+ assert ec2_security_group_module.get_rule_sort_key(dict_to_sort) is None
+
+
+def test_get_ip_permissions_sort_key():
+ dict_to_sort = dict(
+ ip_ranges=[
+ dict(cidr_ip="198.51.100.0/24", original_index=0),
+ dict(cidr_ip="192.0.2.0/24", original_index=1),
+ dict(cidr_ip="203.0.113.0/24", original_index=2),
+ ],
+ ipv6_ranges=[
+ dict(cidr_ipv6="2001:DB8:4000::/34", original_index=0),
+ dict(cidr_ipv6="2001:DB8:0000::/34", original_index=1),
+ dict(cidr_ipv6="2001:DB8:8000::/34", original_index=2),
+ ],
+ prefix_list_ids=[
+ dict(prefix_list_id="pl-2263adef", original_index=0),
+ dict(prefix_list_id="pl-0a5fccee", original_index=1),
+ dict(prefix_list_id="pl-65911ba9", original_index=2),
+ ],
+ user_id_group_pairs=[
+ dict(group_id="sg-3950599b", original_index=0),
+ dict(group_id="sg-fbfd1e3a", original_index=1),
+ dict(group_id="sg-00ec640f", original_index=2),
+ ],
+ sentinel=sentinel.EXTRA_RULE_KEY,
+ )
+
+ expected_keys = dict(
+ ip_ranges="ipv4:192.0.2.0/24",
+ ipv6_ranges="ipv6:2001:DB8:0000::/34",
+ prefix_list_ids="pl:pl-0a5fccee",
+ user_id_group_pairs="ugid:sg-00ec640f",
+ )
+
+ # Walk through through the keys we use and check that they have the priority we expect
+ for key_name in ["ip_ranges", "ipv6_ranges", "prefix_list_ids", "user_id_group_pairs"]:
+ sort_key = ec2_security_group_module.get_ip_permissions_sort_key(dict_to_sort)
+ assert sort_key == expected_keys[key_name]
+ # Remove the current key so that the next time round another key will have priority
+ dict_to_sort.pop(key_name)
+
+ assert dict_to_sort == {"sentinel": sentinel.EXTRA_RULE_KEY}
+ assert ec2_security_group_module.get_ip_permissions_sort_key(dict_to_sort) is None
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py
new file mode 100644
index 000000000..34fa8de1a
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py
@@ -0,0 +1,99 @@
+# (c) 2022 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from copy import deepcopy
+from unittest.mock import sentinel
+
+import pytest
+
+import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module
+
+
+@pytest.fixture
+def ec2_security_group(monkeypatch):
+ # monkey patches various ec2_security_group module functions, we'll separately test the operation of
+ # these functions, we just care that it's passing the results into the right place in the
+ # instance spec.
+ monkeypatch.setattr(ec2_security_group_module, "current_account_id", sentinel.CURRENT_ACCOUNT_ID)
+ return ec2_security_group_module
+
+
+def test_target_from_rule_with_group_id_local_group(ec2_security_group):
+ groups = dict()
+ original_groups = deepcopy(groups)
+ rule_type, target, created = ec2_security_group._target_from_rule_with_group_id(
+ dict(group_id="sg-123456789abcdef01"),
+ groups,
+ )
+ assert groups == original_groups
+ assert rule_type == "group"
+ assert created is False
+ assert target[0] is sentinel.CURRENT_ACCOUNT_ID
+ assert target[1] == "sg-123456789abcdef01"
+ assert target[2] is None
+
+
+def test_target_from_rule_with_group_id_peer_group(ec2_security_group):
+ groups = dict()
+ rule_type, target, created = ec2_security_group._target_from_rule_with_group_id(
+ dict(group_id="123456789012/sg-123456789abcdef02/example-group-name"),
+ groups,
+ )
+ assert rule_type == "group"
+ assert created is False
+ assert target[0] == "123456789012"
+ assert target[1] == "sg-123456789abcdef02"
+ assert target[2] is None
+
+ assert sorted(groups.keys()) == ["example-group-name", "sg-123456789abcdef02"]
+ rule_by_id = groups["sg-123456789abcdef02"]
+ rule_by_name = groups["example-group-name"]
+
+ assert rule_by_id is rule_by_name
+ assert rule_by_id["UserId"] == "123456789012"
+ assert rule_by_id["GroupId"] == "sg-123456789abcdef02"
+ assert rule_by_id["GroupName"] == "example-group-name"
+
+
+def test_target_from_rule_with_group_id_elb(ec2_security_group):
+ groups = dict()
+ rule_type, target, created = ec2_security_group._target_from_rule_with_group_id(
+ dict(group_id="amazon-elb/amazon-elb-sg"),
+ groups,
+ )
+ assert rule_type == "group"
+ assert created is False
+ assert target[0] == "amazon-elb"
+ assert target[1] is None
+ assert target[2] == "amazon-elb-sg"
+
+ assert "amazon-elb-sg" in groups.keys()
+ rule_by_name = groups["amazon-elb-sg"]
+
+ assert rule_by_name["UserId"] == "amazon-elb"
+ assert rule_by_name["GroupId"] is None
+ assert rule_by_name["GroupName"] == "amazon-elb-sg"
+
+
+def test_target_from_rule_with_group_id_elb_with_sg(ec2_security_group):
+ groups = dict()
+ rule_type, target, created = ec2_security_group._target_from_rule_with_group_id(
+ dict(group_id="amazon-elb/sg-5a9c116a/amazon-elb-sg"),
+ groups,
+ )
+ assert rule_type == "group"
+ assert created is False
+ assert target[0] == "amazon-elb"
+ assert target[1] is None
+ assert target[2] == "amazon-elb-sg"
+
+ assert sorted(groups.keys()) == ["amazon-elb-sg", "sg-5a9c116a"]
+ rule_by_id = groups["sg-5a9c116a"]
+ rule_by_name = groups["amazon-elb-sg"]
+
+ assert rule_by_id is rule_by_name
+ assert rule_by_id["UserId"] == "amazon-elb"
+ assert rule_by_id["GroupId"] == "sg-5a9c116a"
+ assert rule_by_id["GroupName"] == "amazon-elb-sg"
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py
new file mode 100644
index 000000000..eb2de7596
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py
@@ -0,0 +1,85 @@
+# (c) 2022 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import warnings
+from unittest.mock import MagicMock
+from unittest.mock import sentinel
+
+import pytest
+
+import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module
+
+
+@pytest.fixture
+def aws_module():
+ aws_module = MagicMock()
+ aws_module.warn = warnings.warn
+ return aws_module
+
+
+@pytest.fixture
+def ec2_security_group(monkeypatch):
+ # monkey patches various ec2_security_group module functions, we'll separately test the operation of
+ # these functions, we just care that it's passing the results into the right place in the
+ # instance spec.
+ monkeypatch.setattr(ec2_security_group_module, "current_account_id", sentinel.CURRENT_ACCOUNT_ID)
+ return ec2_security_group_module
+
+
+IPS_GOOD = [
+ (
+ "192.0.2.2",
+ "192.0.2.2",
+ ),
+ (
+ "192.0.2.1/32",
+ "192.0.2.1/32",
+ ),
+ (
+ "192.0.2.1/255.255.255.255",
+ "192.0.2.1/32",
+ ),
+ (
+ "192.0.2.0/24",
+ "192.0.2.0/24",
+ ),
+ (
+ "192.0.2.0/255.255.255.255",
+ "192.0.2.0/32",
+ ),
+ (
+ "2001:db8::1/128",
+ "2001:db8::1/128",
+ ),
+ (
+ "2001:db8::/32",
+ "2001:db8::/32",
+ ),
+ ("2001:db8:fe80:b897:8990:8a7c:99bf:323d/128", "2001:db8:fe80:b897:8990:8a7c:99bf:323d/128"),
+]
+
+IPS_WARN = [
+ ("192.0.2.1/24", "192.0.2.0/24", "One of your CIDR addresses"),
+ ("2001:DB8::1/32", "2001:DB8::/32", "One of your IPv6 CIDR addresses"),
+ ("2001:db8:fe80:b897:8990:8a7c:99bf:323d/64", "2001:db8:fe80:b897::/64", "One of your IPv6 CIDR addresses"),
+]
+
+
+@pytest.mark.parametrize("ip,expected", IPS_GOOD)
+def test_validate_ip_no_warn(ec2_security_group, aws_module, ip, expected):
+ with warnings.catch_warnings():
+ warnings.simplefilter("error")
+ result = ec2_security_group.validate_ip(aws_module, ip)
+
+ assert result == expected
+
+
+@pytest.mark.parametrize("ip,expected,warn_msg", IPS_WARN)
+def test_validate_ip_warn(ec2_security_group, aws_module, ip, warn_msg, expected):
+ with pytest.warns(UserWarning, match=warn_msg) as recorded:
+ result = ec2_security_group.validate_ip(aws_module, ip)
+
+ assert len(recorded) == 1
+ assert result == expected
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py
new file mode 100644
index 000000000..9949c1b5c
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py
@@ -0,0 +1,100 @@
+# (c) 2022 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from copy import deepcopy
+
+import pytest
+
+import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module
+
+VALID_RULES = [
+ dict(
+ proto="all",
+ ),
+ dict(
+ proto="tcp",
+ from_port="1",
+ to_port="65535",
+ ),
+ dict(
+ proto="icmpv6",
+ from_port="-1",
+ to_port="-1",
+ ),
+ dict(
+ proto="icmp",
+ from_port="-1",
+ to_port="-1",
+ ),
+ dict(proto="icmpv6", icmp_type="8", icmp_code="1"),
+ dict(proto="icmpv6", icmp_code="1"),
+ dict(proto="icmpv6", icmp_type="8"),
+ dict(proto="icmp", icmp_type="8", icmp_code="1"),
+ dict(proto="icmp", icmp_code="1"),
+ dict(proto="icmp", icmp_type="8"),
+]
+
+INVALID_RULES = [
+ (
+ dict(
+ proto="tcp",
+ icmp_code="1",
+ ),
+ r"Specify proto: icmp or icmpv6",
+ ),
+ (
+ dict(
+ proto="tcp",
+ icmp_type="8",
+ ),
+ r"Specify proto: icmp or icmpv6",
+ ),
+ (
+ dict(
+ proto="tcp",
+ icmp_type="8",
+ icmp_code="1",
+ ),
+ r"Specify proto: icmp or icmpv6",
+ ),
+ (
+ dict(
+ proto="all",
+ icmp_code="1",
+ ),
+ r"Specify proto: icmp or icmpv6",
+ ),
+ (
+ dict(
+ proto="all",
+ icmp_type="8",
+ ),
+ r"Specify proto: icmp or icmpv6",
+ ),
+ (
+ dict(
+ proto="all",
+ icmp_type="8",
+ icmp_code="1",
+ ),
+ r"Specify proto: icmp or icmpv6",
+ ),
+]
+
+
+@pytest.mark.parametrize("rule,error_msg", INVALID_RULES)
+def test_validate_rule_invalid(rule, error_msg):
+ original_rule = deepcopy(rule)
+ with pytest.raises(ec2_security_group_module.SecurityGroupError, match=error_msg):
+ ec2_security_group_module.validate_rule(rule)
+ assert original_rule == rule
+
+
+@pytest.mark.parametrize("rule", VALID_RULES)
+def test_validate_rule_valid(rule):
+ original_rule = deepcopy(rule)
+ ec2_security_group_module.validate_rule(rule)
+ # validate_rule shouldn't change the rule
+ assert original_rule == rule
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py
new file mode 100644
index 000000000..51c495e30
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py
@@ -0,0 +1,146 @@
+# (c) 2022 Red Hat Inc.
+
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import pytest
+
+from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict
+
+from ansible_collections.amazon.aws.plugins.modules import backup_restore_job_info
+
+module_name = "ansible_collections.amazon.aws.plugins.modules.backup_restore_job_info"
+
+
+@pytest.mark.parametrize(
+ "account_id, status, created_before, created_after, completed_before, completed_after,expected",
+ [
+ ("", "", "", "", "", "", {}),
+ ("123456789012", "", "", "", "", "", {"ByAccountId": "123456789012"}),
+ (
+ "123456789012",
+ "COMPLETED",
+ "",
+ "",
+ "",
+ "",
+ {"ByAccountId": "123456789012", "ByStatus": "COMPLETED"},
+ ),
+ ],
+)
+def test_build_request_args(
+ account_id, status, created_before, created_after, completed_before, completed_after, expected
+):
+ assert (
+ backup_restore_job_info.build_request_args(
+ account_id, status, created_before, created_after, completed_before, completed_after
+ )
+ == expected
+ )
+
+
+def test__describe_restore_job():
+ connection = MagicMock()
+ module = MagicMock()
+
+ restore_job_id = "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD"
+ restore_job_info = {
+ "AccountId": "123456789012",
+ "BackupSizeInBytes": "8589934592",
+ "CompletionDate": "2023-03-13T15:53:07.172000-07:00",
+ "CreatedResourceArn": "arn:aws:ec2:us-east-2:123456789012:instance/i-01234567ec51af3f",
+ "CreationDate": "2023-03-13T15:53:07.172000-07:00",
+ "IamRoleArn": "arn:aws:iam::123456789012:role/service-role/AWSBackupDefaultServiceRole",
+ "PercentDone": "0.00%",
+ "RecoveryPointArn": "arn:aws:ec2:us-east-2::image/ami-01234567ec51af3f",
+ "ResourceType": "EC2",
+ "RestoreJobId": "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD",
+ "Status": "COMPLETED",
+ }
+
+ connection.describe_restore_job.return_value = restore_job_info
+
+ result = backup_restore_job_info._describe_restore_job(connection, module, restore_job_id)
+
+ assert result == [camel_dict_to_snake_dict(restore_job_info)]
+ connection.describe_restore_job.assert_called_with(RestoreJobId=restore_job_id)
+ connection.describe_restore_job.call_count == 1
+
+
+def test__list_restore_jobs():
+ connection = MagicMock()
+ conn_paginator = MagicMock()
+ paginate = MagicMock()
+
+ request_args = {"ByAccountId": "123456789012"}
+
+ restore_job = {
+ "AccountId": "123456789012",
+ "BackupSizeInBytes": "8589934592",
+ "CompletionDate": "2023-03-13T15:53:07.172000-07:00",
+ "CreatedResourceArn": "arn:aws:ec2:us-east-2:123456789012:instance/i-01234567ec51af3f",
+ "CreationDate": "2023-03-13T15:53:07.172000-07:00",
+ "IamRoleArn": "arn:aws:iam::123456789012:role/service-role/AWSBackupDefaultServiceRole",
+ "PercentDone": "0.00%",
+ "RecoveryPointArn": "arn:aws:ec2:us-east-2::image/ami-01234567ec51af3f",
+ "ResourceType": "EC2",
+ "RestoreJobId": "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD",
+ "Status": "COMPLETED",
+ }
+
+ connection.get_paginator.return_value = conn_paginator
+ conn_paginator.paginate.return_value = paginate
+
+ paginate.build_full_result.return_value = {"RestoreJobs": [restore_job]}
+
+ result = backup_restore_job_info._list_restore_jobs(connection=connection, **request_args)
+
+ assert result == paginate.build_full_result.return_value
+ connection.get_paginator.assert_called_with("list_restore_jobs")
+ conn_paginator.paginate.assert_called_with(**request_args)
+
+
+@patch(module_name + "._list_restore_jobs")
+def test_list_restore_jobs(m__list_restore_jobs):
+ connection = MagicMock()
+ module = MagicMock()
+
+ request_args = {"ByAccountId": "123456789012"}
+
+ m__list_restore_jobs.return_value = {
+ "RestoreJobs": [
+ {
+ "AccountId": "123456789012",
+ "BackupSizeInBytes": "8589934592",
+ "CompletionDate": "2023-03-13T15:53:07.172000-07:00",
+ "CreatedResourceArn": "arn:aws:ec2:us-east-2:123456789012:instance/i-01234567ec51af3f",
+ "CreationDate": "2023-03-13T15:53:07.172000-07:00",
+ "IamRoleArn": "arn:aws:iam::123456789012:role/service-role/AWSBackupDefaultServiceRole",
+ "PercentDone": "0.00%",
+ "RecoveryPointArn": "arn:aws:ec2:us-east-2::image/ami-01234567ec51af3f",
+ "ResourceType": "EC2",
+ "RestoreJobId": "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD",
+ "Status": "COMPLETED",
+ }
+ ]
+ }
+
+ list_restore_jobs_result = backup_restore_job_info.list_restore_jobs(connection, module, request_args)
+
+ assert m__list_restore_jobs.call_count == 1
+ m__list_restore_jobs.assert_called_with(connection, **request_args)
+ assert len(list_restore_jobs_result) == 1
+
+
+@patch(module_name + ".AnsibleAWSModule")
+def test_main_success(m_AnsibleAWSModule):
+ m_module = MagicMock()
+ m_AnsibleAWSModule.return_value = m_module
+
+ backup_restore_job_info.main()
+
+ m_module.client.assert_called_with("backup")
+ m_module.exit_json.assert_called_with(changed=False, restore_jobs=[{}])
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py
index f46bc1113..fd0b7ca75 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py
@@ -3,21 +3,23 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
import pytest
-# Magic...
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep, placeboify # pylint: disable=unused-import
-
from ansible_collections.amazon.aws.plugins.module_utils.botocore import boto_exception
-from ansible_collections.amazon.aws.plugins.module_utils.modules import _RetryingBotoClientWrapper
from ansible_collections.amazon.aws.plugins.module_utils.retries import AWSRetry
-
+from ansible_collections.amazon.aws.plugins.module_utils.retries import RetryingBotoClientWrapper
from ansible_collections.amazon.aws.plugins.modules import cloudformation as cfn_module
+# isort: off
+# Magic...
+# pylint: disable-next=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep
+
+# pylint: disable-next=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+# isort: on
+
basic_yaml_tpl = """
---
AWSTemplateFormatVersion: '2010-09-09'
@@ -61,167 +63,153 @@ Resources:
default_events_limit = 10
-class FakeModule(object):
+class FakeModule:
def __init__(self, **kwargs):
self.params = kwargs
def fail_json(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise Exception('FAIL')
+ raise Exception("FAIL")
def fail_json_aws(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise Exception('FAIL')
+ raise Exception("FAIL")
def exit_json(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise Exception('EXIT')
+ raise Exception("EXIT")
def _create_wrapped_client(placeboify):
- connection = placeboify.client('cloudformation')
+ connection = placeboify.client("cloudformation")
retry_decorator = AWSRetry.jittered_backoff()
- wrapped_conn = _RetryingBotoClientWrapper(connection, retry_decorator)
+ wrapped_conn = RetryingBotoClientWrapper(connection, retry_decorator)
return wrapped_conn
def test_invalid_template_json(placeboify):
connection = _create_wrapped_client(placeboify)
params = {
- 'StackName': 'ansible-test-wrong-json',
- 'TemplateBody': bad_json_tpl,
+ "StackName": "ansible-test-wrong-json",
+ "TemplateBody": bad_json_tpl,
}
m = FakeModule(disable_rollback=False)
with pytest.raises(Exception) as exc_info:
cfn_module.create_stack(m, params, connection, default_events_limit)
- pytest.fail('Expected malformed JSON to have caused the call to fail')
+ pytest.fail("Expected malformed JSON to have caused the call to fail")
- assert exc_info.match('FAIL')
+ assert exc_info.match("FAIL")
assert "ValidationError" in boto_exception(m.exit_args[0])
def test_client_request_token_s3_stack(maybe_sleep, placeboify):
connection = _create_wrapped_client(placeboify)
params = {
- 'StackName': 'ansible-test-client-request-token-yaml',
- 'TemplateBody': basic_yaml_tpl,
- 'ClientRequestToken': '3faf3fb5-b289-41fc-b940-44151828f6cf',
+ "StackName": "ansible-test-client-request-token-yaml",
+ "TemplateBody": basic_yaml_tpl,
+ "ClientRequestToken": "3faf3fb5-b289-41fc-b940-44151828f6cf",
}
m = FakeModule(disable_rollback=False)
result = cfn_module.create_stack(m, params, connection, default_events_limit)
- assert result['changed']
- assert len(result['events']) > 1
+ assert result["changed"]
+ assert len(result["events"]) > 1
# require that the final recorded stack state was CREATE_COMPLETE
# events are retrieved newest-first, so 0 is the latest
- assert 'CREATE_COMPLETE' in result['events'][0]
- connection.delete_stack(StackName='ansible-test-client-request-token-yaml')
+ assert "CREATE_COMPLETE" in result["events"][0]
+ connection.delete_stack(StackName="ansible-test-client-request-token-yaml")
def test_basic_s3_stack(maybe_sleep, placeboify):
connection = _create_wrapped_client(placeboify)
- params = {
- 'StackName': 'ansible-test-basic-yaml',
- 'TemplateBody': basic_yaml_tpl
- }
+ params = {"StackName": "ansible-test-basic-yaml", "TemplateBody": basic_yaml_tpl}
m = FakeModule(disable_rollback=False)
result = cfn_module.create_stack(m, params, connection, default_events_limit)
- assert result['changed']
- assert len(result['events']) > 1
+ assert result["changed"]
+ assert len(result["events"]) > 1
# require that the final recorded stack state was CREATE_COMPLETE
# events are retrieved newest-first, so 0 is the latest
- assert 'CREATE_COMPLETE' in result['events'][0]
- connection.delete_stack(StackName='ansible-test-basic-yaml')
+ assert "CREATE_COMPLETE" in result["events"][0]
+ connection.delete_stack(StackName="ansible-test-basic-yaml")
def test_delete_nonexistent_stack(maybe_sleep, placeboify):
connection = _create_wrapped_client(placeboify)
# module is only used if we threw an unexpected error
module = None
- result = cfn_module.stack_operation(module, connection, 'ansible-test-nonexist', 'DELETE', default_events_limit)
- assert result['changed']
- assert 'Stack does not exist.' in result['log']
+ result = cfn_module.stack_operation(module, connection, "ansible-test-nonexist", "DELETE", default_events_limit)
+ assert result["changed"]
+ assert "Stack does not exist." in result["log"]
def test_get_nonexistent_stack(placeboify):
connection = _create_wrapped_client(placeboify)
# module is only used if we threw an unexpected error
module = None
- assert cfn_module.get_stack_facts(module, connection, 'ansible-test-nonexist') is None
+ assert cfn_module.get_stack_facts(module, connection, "ansible-test-nonexist") is None
def test_missing_template_body():
m = FakeModule()
with pytest.raises(Exception) as exc_info:
- cfn_module.create_stack(
- module=m,
- stack_params={},
- cfn=None,
- events_limit=default_events_limit
- )
- pytest.fail('Expected module to have failed with no template')
-
- assert exc_info.match('FAIL')
+ cfn_module.create_stack(module=m, stack_params={}, cfn=None, events_limit=default_events_limit)
+ pytest.fail("Expected module to have failed with no template")
+
+ assert exc_info.match("FAIL")
assert not m.exit_args
- assert "Either 'template', 'template_body' or 'template_url' is required when the stack does not exist." == m.exit_kwargs['msg']
+ assert (
+ "Either 'template', 'template_body' or 'template_url' is required when the stack does not exist."
+ == m.exit_kwargs["msg"]
+ )
def test_on_create_failure_delete(maybe_sleep, placeboify):
m = FakeModule(
- on_create_failure='DELETE',
+ on_create_failure="DELETE",
disable_rollback=False,
)
connection = _create_wrapped_client(placeboify)
- params = {
- 'StackName': 'ansible-test-on-create-failure-delete',
- 'TemplateBody': failing_yaml_tpl
- }
+ params = {"StackName": "ansible-test-on-create-failure-delete", "TemplateBody": failing_yaml_tpl}
result = cfn_module.create_stack(m, params, connection, default_events_limit)
- assert result['changed']
- assert result['failed']
- assert len(result['events']) > 1
+ assert result["changed"]
+ assert result["failed"]
+ assert len(result["events"]) > 1
# require that the final recorded stack state was DELETE_COMPLETE
# events are retrieved newest-first, so 0 is the latest
- assert 'DELETE_COMPLETE' in result['events'][0]
+ assert "DELETE_COMPLETE" in result["events"][0]
def test_on_create_failure_rollback(maybe_sleep, placeboify):
m = FakeModule(
- on_create_failure='ROLLBACK',
+ on_create_failure="ROLLBACK",
disable_rollback=False,
)
connection = _create_wrapped_client(placeboify)
- params = {
- 'StackName': 'ansible-test-on-create-failure-rollback',
- 'TemplateBody': failing_yaml_tpl
- }
+ params = {"StackName": "ansible-test-on-create-failure-rollback", "TemplateBody": failing_yaml_tpl}
result = cfn_module.create_stack(m, params, connection, default_events_limit)
- assert result['changed']
- assert result['failed']
- assert len(result['events']) > 1
+ assert result["changed"]
+ assert result["failed"]
+ assert len(result["events"]) > 1
# require that the final recorded stack state was ROLLBACK_COMPLETE
# events are retrieved newest-first, so 0 is the latest
- assert 'ROLLBACK_COMPLETE' in result['events'][0]
- connection.delete_stack(StackName=params['StackName'])
+ assert "ROLLBACK_COMPLETE" in result["events"][0]
+ connection.delete_stack(StackName=params["StackName"])
def test_on_create_failure_do_nothing(maybe_sleep, placeboify):
m = FakeModule(
- on_create_failure='DO_NOTHING',
+ on_create_failure="DO_NOTHING",
disable_rollback=False,
)
connection = _create_wrapped_client(placeboify)
- params = {
- 'StackName': 'ansible-test-on-create-failure-do-nothing',
- 'TemplateBody': failing_yaml_tpl
- }
+ params = {"StackName": "ansible-test-on-create-failure-do-nothing", "TemplateBody": failing_yaml_tpl}
result = cfn_module.create_stack(m, params, connection, default_events_limit)
- assert result['changed']
- assert result['failed']
- assert len(result['events']) > 1
+ assert result["changed"]
+ assert result["failed"]
+ assert len(result["events"]) > 1
# require that the final recorded stack state was CREATE_FAILED
# events are retrieved newest-first, so 0 is the latest
- assert 'CREATE_FAILED' in result['events'][0]
- connection.delete_stack(StackName=params['StackName'])
+ assert "CREATE_FAILED" in result["events"][0]
+ connection.delete_stack(StackName=params["StackName"])
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py
index 5e8140d4a..b1e23451b 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py
@@ -1,7 +1,9 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from unittest.mock import MagicMock, Mock, patch, call
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
import pytest
@@ -28,17 +30,371 @@ def test_create_image_uefi_data(m_get_image_by_id):
"uefi_data": "QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa",
}
- ec2_ami.create_image(module, connection)
+ ec2_ami.CreateImage.do(module, connection, None)
assert connection.register_image.call_count == 1
connection.register_image.assert_has_calls(
[
call(
aws_retry=True,
- Description=None,
Name="my-image",
BootMode="uefi",
TpmSupport="v2.0",
- UefiData="QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa"
+ UefiData="QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa",
)
]
)
+
+
+def test_get_block_device_mapping_virtual_name():
+ image = {"block_device_mappings": [{"device_name": "/dev/sdc", "virtual_name": "ephemeral0"}]}
+ block_device = ec2_ami.get_block_device_mapping(image)
+ assert block_device == {"/dev/sdc": {"virtual_name": "ephemeral0"}}
+
+
+def test_get_image_by_id_found():
+ connection = MagicMock()
+
+ connection.describe_images.return_value = {"Images": [{"ImageId": "ami-0c7a795306730b288"}]}
+
+ image = ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288")
+ assert image["ImageId"] == "ami-0c7a795306730b288"
+ assert connection.describe_images.call_count == 1
+ assert connection.describe_image_attribute.call_count == 2
+ connection.describe_images.assert_has_calls(
+ [
+ call(
+ aws_retry=True,
+ ImageIds=["ami-0c7a795306730b288"],
+ )
+ ]
+ )
+
+
+def test_get_image_by_too_many():
+ connection = MagicMock()
+
+ connection.describe_images.return_value = {
+ "Images": [
+ {"ImageId": "ami-0c7a795306730b288"},
+ {"ImageId": "ami-0c7a795306730b288"},
+ ]
+ }
+
+ with pytest.raises(ec2_ami.Ec2AmiFailure):
+ ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288")
+
+
+def test_get_image_missing():
+ connection = MagicMock()
+
+ connection.describe_images.return_value = {"Images": []}
+
+ image = ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288")
+ assert image is None
+ assert connection.describe_images.call_count == 1
+ connection.describe_images.assert_has_calls(
+ [
+ call(
+ aws_retry=True,
+ ImageIds=["ami-0c7a795306730b288"],
+ )
+ ]
+ )
+
+
+@patch(
+ module_name + ".get_image_by_id",
+)
+def test_create_image_minimal(m_get_image_by_id):
+ module = MagicMock()
+ connection = MagicMock()
+
+ m_get_image_by_id.return_value = {"ImageId": "ami-0c7a795306730b288"}
+ module.params = {
+ "name": "my-image",
+ "instance_id": "i-123456789",
+ "image_id": "ami-0c7a795306730b288",
+ }
+ ec2_ami.CreateImage.do(module, connection, None)
+ assert connection.create_image.call_count == 1
+ connection.create_image.assert_has_calls(
+ [
+ call(
+ aws_retry=True,
+ InstanceId="i-123456789",
+ Name="my-image",
+ )
+ ]
+ )
+
+
+def test_validate_params():
+ module = MagicMock()
+
+ ec2_ami.validate_params(module)
+ module.fail_json.assert_any_call("one of the following is required: name, image_id")
+ assert module.require_botocore_at_least.call_count == 0
+
+ module = MagicMock()
+ ec2_ami.validate_params(module, tpm_support=True)
+ assert module.require_botocore_at_least.call_count == 0
+
+ module = MagicMock()
+ ec2_ami.validate_params(module, tpm_support=True, boot_mode="legacy-bios")
+ assert module.require_botocore_at_least.call_count == 0
+ module.fail_json.assert_any_call("To specify 'tpm_support', 'boot_mode' must be 'uefi'.")
+
+ module = MagicMock()
+ ec2_ami.validate_params(module, state="present", name="bobby")
+ assert module.require_botocore_at_least.call_count == 0
+ module.fail_json.assert_any_call(
+ "The parameters instance_id or device_mapping (register from EBS snapshot) are required for a new image."
+ )
+
+
+def test_rename_item_if_exists():
+ dict_object = {
+ "Paris": True,
+ "London": {"Heathrow Airport": False},
+ }
+ ec2_ami.rename_item_if_exists(dict_object, "Paris", "NewYork")
+ assert dict_object == {"London": {"Heathrow Airport": False}, "NewYork": True}
+
+ dict_object = {
+ "Cities": {},
+ "London": "bar",
+ }
+
+ ec2_ami.rename_item_if_exists(dict_object, "London", "Abidjan", "Cities")
+ ec2_ami.rename_item_if_exists(dict_object, "Doesnt-exist", "Nowhere", "Cities")
+ assert dict_object == {"Cities": {"Abidjan": "bar"}}
+
+
+def test_DeregisterImage_defer_purge_snapshots():
+ image = {"BlockDeviceMappings": [{"Ebs": {"SnapshotId": "My_snapshot"}}, {}]}
+ func = ec2_ami.DeregisterImage.defer_purge_snapshots(image)
+
+ connection = MagicMock()
+ assert list(func(connection)) == ["My_snapshot"]
+ connection.delete_snapshot.assert_called_with(aws_retry=True, SnapshotId="My_snapshot")
+
+
+@patch(module_name + ".get_image_by_id")
+@patch(module_name + ".time.sleep")
+def test_DeregisterImage_timeout_success(m_sleep, m_get_image_by_id):
+ connection = MagicMock()
+ m_get_image_by_id.side_effect = [{"ImageId": "ami-0c7a795306730b288"}, None]
+
+ ec2_ami.DeregisterImage.timeout(connection, "ami-0c7a795306730b288", 10)
+ assert m_sleep.call_count == 1
+
+
+@patch(module_name + ".get_image_by_id")
+@patch(module_name + ".time.time")
+@patch(module_name + ".time.sleep")
+def test_DeregisterImage_timeout_failure(m_sleep, m_time, m_get_image_by_id):
+ connection = MagicMock()
+ m_time.side_effect = list(range(1, 30))
+ m_get_image_by_id.return_value = {"ImageId": "ami-0c7a795306730b288"}
+
+ with pytest.raises(ec2_ami.Ec2AmiFailure):
+ ec2_ami.DeregisterImage.timeout(connection, "ami-0c7a795306730b288", 10)
+ assert m_sleep.call_count == 9
+
+
+def test_UpdateImage_set_launch_permission_check_mode_no_change():
+ connection = MagicMock()
+ image = {"ImageId": "ami-0c7a795306730b288", "LaunchPermissions": {}}
+
+ changed = ec2_ami.UpdateImage.set_launch_permission(connection, image, launch_permissions={}, check_mode=True)
+ assert changed is False
+ assert connection.modify_image_attribute.call_count == 0
+
+ launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]}
+ image = {
+ "ImageId": "ami-0c7a795306730b288",
+ "LaunchPermissions": [
+ {"UserId": "123456789012"},
+ {"GroupName": "foo"},
+ {"GroupName": "bar"},
+ ],
+ }
+
+
+def test_UpdateImage_set_launch_permission_check_mode_with_change():
+ connection = MagicMock()
+ image = {"ImageId": "ami-0c7a795306730b288", "LaunchPermissions": {}}
+ launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]}
+ changed = ec2_ami.UpdateImage.set_launch_permission(connection, image, launch_permissions, check_mode=True)
+ assert changed is True
+ assert connection.modify_image_attribute.call_count == 0
+
+
+def test_UpdateImage_set_launch_permission_with_change():
+ connection = MagicMock()
+ image = {"ImageId": "ami-0c7a795306730b288", "LaunchPermissions": {}}
+ launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]}
+ changed = ec2_ami.UpdateImage.set_launch_permission(connection, image, launch_permissions, check_mode=False)
+ assert changed is True
+ assert connection.modify_image_attribute.call_count == 1
+ connection.modify_image_attribute.assert_called_with(
+ aws_retry=True,
+ ImageId="ami-0c7a795306730b288",
+ Attribute="launchPermission",
+ LaunchPermission={
+ "Add": [{"Group": "bar"}, {"Group": "foo"}, {"UserId": "123456789012"}],
+ "Remove": [],
+ },
+ )
+
+
+def test_UpdateImage_set_description():
+ connection = MagicMock()
+ module = MagicMock()
+ module.check_mode = False
+ image = {"ImageId": "ami-0c7a795306730b288", "Description": "My description"}
+ changed = ec2_ami.UpdateImage.set_description(connection, module, image, "My description")
+ assert changed is False
+
+ changed = ec2_ami.UpdateImage.set_description(connection, module, image, "New description")
+ assert changed is True
+ assert connection.modify_image_attribute.call_count == 1
+ connection.modify_image_attribute.assert_called_with(
+ aws_retry=True,
+ ImageId="ami-0c7a795306730b288",
+ Attribute="Description",
+ Description={"Value": "New description"},
+ )
+
+
+def test_UpdateImage_set_description_check_mode():
+ connection = MagicMock()
+ module = MagicMock()
+ module.check_mode = True
+ image = {"ImageId": "ami-0c7a795306730b288", "Description": "My description"}
+ changed = ec2_ami.UpdateImage.set_description(connection, module, image, "My description")
+ assert changed is False
+
+ changed = ec2_ami.UpdateImage.set_description(connection, module, image, "New description")
+ assert changed is True
+ assert connection.modify_image_attribute.call_count == 0
+
+
+def test_CreateImage_build_block_device_mapping():
+ device_mapping = [
+ {
+ "device_name": "/dev/xvda",
+ "volume_size": 8,
+ "snapshot_id": "snap-xxxxxxxx",
+ "delete_on_termination": True,
+ "volume_type": "gp2",
+ "no_device": False,
+ },
+ {
+ "device_name": "/dev/xvdb",
+ "no_device": True,
+ },
+ ]
+ result = ec2_ami.CreateImage.build_block_device_mapping(device_mapping)
+ assert result == [
+ {
+ "Ebs": {
+ "DeleteOnTermination": True,
+ "SnapshotId": "snap-xxxxxxxx",
+ "VolumeSize": 8,
+ "VolumeType": "gp2",
+ },
+ "DeviceName": "/dev/xvda",
+ },
+ {"DeviceName": "/dev/xvdb", "Ebs": {}, "NoDevice": ""},
+ ]
+
+
+def test_CreateImage_do_check_mode_no_change():
+ module = MagicMock()
+
+ module.params = {"name": "my-image"}
+ connection = MagicMock()
+ connection.describe_images.return_value = {
+ "Images": [
+ {
+ "InstanceId": "i-123456789",
+ "Name": "my-image",
+ }
+ ]
+ }
+
+ ec2_ami.CreateImage.do_check_mode(module, connection, None)
+ module.exit_json.assert_called_with(
+ changed=False,
+ msg="Error registering image: AMI name is already in use by another AMI",
+ )
+
+
+def test_CreateImage_do_check_mode_with_change():
+ module = MagicMock()
+
+ module.params = {"name": "my-image"}
+ connection = MagicMock()
+ connection.describe_images.return_value = {"Images": []}
+
+ ec2_ami.CreateImage.do_check_mode(module, connection, None)
+ module.exit_json.assert_called_with(changed=True, msg="Would have created a AMI if not in check mode.")
+
+
+@patch(module_name + ".get_waiter")
+def test_CreateImage_wait(m_get_waiter):
+ connection = MagicMock()
+ m_waiter = MagicMock()
+ m_get_waiter.return_value = m_waiter
+
+ assert ec2_ami.CreateImage.wait(connection, wait_timeout=0, image_id=None) is None
+
+ ec2_ami.CreateImage.wait(connection, wait_timeout=600, image_id="ami-0c7a795306730b288")
+ assert m_waiter.wait.call_count == 1
+ m_waiter.wait.assert_called_with(
+ ImageIds=["ami-0c7a795306730b288"],
+ WaiterConfig={"Delay": 15, "MaxAttempts": 40},
+ )
+
+
+@patch(module_name + ".add_ec2_tags")
+@patch(module_name + ".get_image_by_id")
+def test_CreateImage_set_tags(m_get_image_by_id, m_add_ec2_tags):
+ connection = MagicMock()
+ module = MagicMock()
+
+ m_get_image_by_id.return_value = {
+ "ImageId": "ami-0c7a795306730b288",
+ "BlockDeviceMappings": [
+ {"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": "50"}},
+ {
+ "DeviceName": "/dev/sdm",
+ "Ebs": {"VolumeSize": "100", "SnapshotId": "snap-066877671789bd71b"},
+ },
+ {"DeviceName": "/dev/sda2"},
+ ],
+ }
+ tags = {}
+ ec2_ami.CreateImage.set_tags(connection, module, tags, image_id="ami-0c7a795306730b288")
+ assert m_add_ec2_tags.call_count == 0
+
+ tags = {"metro": "LaSalle"}
+ ec2_ami.CreateImage.set_tags(connection, module, tags, image_id="ami-0c7a795306730b288")
+ assert m_add_ec2_tags.call_count == 3
+ m_add_ec2_tags.assert_called_with(connection, module, "snap-066877671789bd71b", tags)
+
+
+def test_CreateInage_set_launch_permissions():
+ connection = MagicMock()
+ launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]}
+ image_id = "ami-0c7a795306730b288"
+ ec2_ami.CreateImage.set_launch_permissions(connection, launch_permissions, image_id)
+
+ assert connection.modify_image_attribute.call_count == 1
+ connection.modify_image_attribute.assert_called_with(
+ Attribute="LaunchPermission",
+ ImageId="ami-0c7a795306730b288",
+ LaunchPermission={"Add": [{"Group": "foo"}, {"Group": "bar"}, {"UserId": "123456789012"}]},
+ aws_retry=True,
+ )
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py
new file mode 100644
index 000000000..a5abc77af
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py
@@ -0,0 +1,224 @@
+# (c) 2022 Red Hat Inc.
+
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import ANY
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
+
+import botocore.exceptions
+import pytest
+
+from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict
+
+from ansible_collections.amazon.aws.plugins.modules import ec2_ami_info
+
+module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_ami_info"
+
+
+@pytest.fixture
+def ec2_client():
+ return MagicMock()
+
+
+@pytest.mark.parametrize(
+ "executable_users,filters,image_ids,owners,expected",
+ [
+ ([], {}, [], [], {}),
+ ([], {}, ["ami-1234567890"], [], {"ImageIds": ["ami-1234567890"]}),
+ ([], {}, [], ["1234567890"], {"Filters": [{"Name": "owner-id", "Values": ["1234567890"]}]}),
+ (
+ [],
+ {"owner-alias": "test_ami_owner"},
+ [],
+ ["1234567890"],
+ {
+ "Filters": [
+ {"Name": "owner-alias", "Values": ["test_ami_owner"]},
+ {"Name": "owner-id", "Values": ["1234567890"]},
+ ]
+ },
+ ),
+ ([], {"is-public": True}, [], [], {"Filters": [{"Name": "is-public", "Values": ["true"]}]}),
+ (["self"], {}, [], [], {"ExecutableUsers": ["self"]}),
+ ([], {}, [], ["self"], {"Owners": ["self"]}),
+ ],
+)
+def test_build_request_args(executable_users, filters, image_ids, owners, expected):
+ assert ec2_ami_info.build_request_args(executable_users, filters, image_ids, owners) == expected
+
+
+def test_get_images(ec2_client):
+ ec2_client.describe_images.return_value = {
+ "Images": [
+ {
+ "Architecture": "x86_64",
+ "BlockDeviceMappings": [
+ {
+ "DeviceName": "/dev/sda1",
+ "Ebs": {
+ "DeleteOnTermination": "True",
+ "Encrypted": "False",
+ "SnapshotId": "snap-0f00cba784af62428",
+ "VolumeSize": 10,
+ "VolumeType": "gp2",
+ },
+ }
+ ],
+ "ImageId": "ami-1234567890",
+ "ImageLocation": "1234567890/test-ami-uefi-boot",
+ "ImageType": "machine",
+ "Name": "test-ami-uefi-boot",
+ "OwnerId": "1234567890",
+ "PlatformDetails": "Linux/UNIX",
+ }
+ ],
+ }
+
+ request_args = {"ImageIds": ["ami-1234567890"]}
+
+ get_images_result = ec2_ami_info.get_images(ec2_client, request_args)
+
+ ec2_client.describe_images.call_count == 2
+ ec2_client.describe_images.assert_called_with(aws_retry=True, **request_args)
+ assert get_images_result == ec2_client.describe_images.return_value
+
+
+def test_get_image_attribute():
+ ec2_client = MagicMock()
+
+ ec2_client.describe_image_attribute.return_value = {
+ "ImageId": "ami-1234567890",
+ "LaunchPermissions": [{"UserId": "1234567890"}, {"UserId": "0987654321"}],
+ }
+
+ image_id = "ami-1234567890"
+
+ get_image_attribute_result = ec2_ami_info.get_image_attribute(ec2_client, image_id)
+
+ ec2_client.describe_image_attribute.call_count == 1
+ ec2_client.describe_image_attribute.assert_called_with(
+ aws_retry=True, Attribute="launchPermission", ImageId=image_id
+ )
+ assert len(get_image_attribute_result["LaunchPermissions"]) == 2
+
+
+@patch(module_name + ".get_image_attribute")
+@patch(module_name + ".get_images")
+def test_list_ec2_images(m_get_images, m_get_image_attribute):
+ module = MagicMock()
+
+ m_get_images.return_value = {
+ "Images": [
+ {
+ "Architecture": "x86_64",
+ "BlockDeviceMappings": [
+ {
+ "DeviceName": "/dev/sda1",
+ "Ebs": {
+ "DeleteOnTermination": "True",
+ "Encrypted": "False",
+ "SnapshotId": "snap-0f00cba784af62428",
+ "VolumeSize": 10,
+ "VolumeType": "gp2",
+ },
+ }
+ ],
+ "ImageId": "ami-1234567890",
+ "ImageLocation": "1234567890/test-ami-uefi-boot",
+ "ImageType": "machine",
+ "Name": "test-ami-uefi-boot",
+ "OwnerId": "1234567890",
+ "OwnerAlias": "test_ami_owner",
+ "PlatformDetails": "Linux/UNIX",
+ },
+ {
+ "Architecture": "x86_64",
+ "BlockDeviceMappings": [
+ {
+ "DeviceName": "/dev/sda1",
+ "Ebs": {
+ "DeleteOnTermination": "True",
+ "Encrypted": "False",
+ "SnapshotId": "snap-0f00cba784af62428",
+ "VolumeSize": 10,
+ "VolumeType": "gp2",
+ },
+ }
+ ],
+ "ImageId": "ami-1523498760",
+ "ImageLocation": "1523498760/test-ami-uefi-boot",
+ "ImageType": "machine",
+ "Name": "test-ami-uefi-boot",
+ "OwnerId": "1234567890",
+ "OwnerAlias": "test_ami_owner",
+ "PlatformDetails": "Linux/UNIX",
+ },
+ ],
+ }
+
+ m_get_image_attribute.return_value = {
+ "ImageId": "ami-1234567890",
+ "LaunchPermissions": [{"UserId": "1234567890"}, {"UserId": "0987654321"}],
+ }
+
+ images = m_get_images.return_value["Images"]
+ images = [camel_dict_to_snake_dict(image) for image in images]
+
+ request_args = {
+ "Filters": [
+ {"Name": "owner-alias", "Values": ["test_ami_owner"]},
+ {"Name": "owner-id", "Values": ["1234567890"]},
+ ]
+ }
+
+ # needed for `assert m_get_image_attribute.call_count == 2`
+ module.params = {"describe_image_attributes": True}
+
+ list_ec2_images_result = ec2_ami_info.list_ec2_images(ec2_client, module, request_args)
+
+ assert m_get_images.call_count == 1
+ m_get_images.assert_called_with(ec2_client, request_args)
+
+ assert m_get_image_attribute.call_count == 2
+ m_get_image_attribute.assert_has_calls(
+ [call(ec2_client, images[0]["image_id"])],
+ [call(ec2_client, images[1]["image_id"])],
+ )
+
+ assert len(list_ec2_images_result) == 2
+ assert list_ec2_images_result[0]["image_id"] == "ami-1234567890"
+ assert list_ec2_images_result[1]["image_id"] == "ami-1523498760"
+
+
+@patch(module_name + ".AnsibleAWSModule")
+def test_main_success(m_AnsibleAWSModule):
+ m_module = MagicMock()
+ m_AnsibleAWSModule.return_value = m_module
+
+ ec2_ami_info.main()
+
+ m_module.client.assert_called_with("ec2", retry_decorator=ANY)
+ m_module.exit_json.assert_called_with(images=[])
+
+
+def a_boto_exception():
+ return botocore.exceptions.UnknownServiceError(service_name="Whoops", known_service_names="Oula")
+
+
+def test_api_failure_get_images(ec2_client):
+ request_args = {}
+ ec2_client.describe_images.side_effect = a_boto_exception()
+
+ with pytest.raises(ec2_ami_info.AmiInfoFailure):
+ ec2_ami_info.get_images(ec2_client, request_args)
+
+
+def test_api_failure_get_image_attribute(ec2_client):
+ image_id = "ami-1234567890"
+ ec2_client.describe_image_attribute.side_effect = a_boto_exception()
+
+ with pytest.raises(ec2_ami_info.AmiInfoFailure):
+ ec2_ami_info.get_image_attribute(ec2_client, image_id)
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py
new file mode 100644
index 000000000..d6323601d
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py
@@ -0,0 +1,108 @@
+# (c) 2022 Red Hat Inc.
+
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
+
+import pytest
+
+from ansible_collections.amazon.aws.plugins.modules import ec2_eni_info
+
+module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_eni_info"
+
+
+@pytest.mark.parametrize(
+ "eni_id,filters,expected", [("", {}, {}), ("eni-1234567890", {}, {"NetworkInterfaceIds": ["eni-1234567890"]})]
+)
+def test_build_request_args(eni_id, filters, expected):
+ assert ec2_eni_info.build_request_args(eni_id, filters) == expected
+
+
+def test_get_network_interfaces():
+ connection = MagicMock()
+ module = MagicMock()
+
+ connection.describe_network_interfaces.return_value = {
+ "NetworkInterfaces": [
+ {
+ "AvailabilityZone": "us-east-2b",
+ "Description": "",
+ "NetworkInterfaceId": "eni-1234567890",
+ "PrivateIpAddresses": [{"Primary": "True", "PrivateIpAddress": "11.22.33.44"}],
+ "RequesterManaged": False,
+ "SourceDestCheck": True,
+ "Status": "available",
+ "SubnetId": "subnet-07d906b8358869bda",
+ "TagSet": [],
+ "VpcId": "vpc-0cb60952be96c9cd8",
+ }
+ ]
+ }
+
+ request_args = {"NetworkInterfaceIds": ["eni-1234567890"]}
+
+ network_interfaces_result = ec2_eni_info.get_network_interfaces(connection, module, request_args)
+
+ connection.describe_network_interfaces.call_count == 1
+ connection.describe_network_interfaces.assert_called_with(aws_retry=True, **request_args)
+ assert len(network_interfaces_result["NetworkInterfaces"]) == 1
+
+
+@patch(module_name + ".get_network_interfaces")
+def test_list_eni(m_get_network_interfaces):
+ connection = MagicMock()
+ module = MagicMock()
+
+ m_get_network_interfaces.return_value = {
+ "NetworkInterfaces": [
+ {
+ "AvailabilityZone": "us-east-2b",
+ "Description": "",
+ "NetworkInterfaceId": "eni-1234567890",
+ "PrivateIpAddresses": [{"Primary": "True", "PrivateIpAddress": "11.22.33.44"}],
+ "RequesterManaged": False,
+ "SourceDestCheck": True,
+ "Status": "available",
+ "SubnetId": "subnet-07d906b8358869bda",
+ "TagSet": [],
+ "VpcId": "vpc-0cb60952be96c9cd8",
+ },
+ {
+ "AvailabilityZone": "us-east-2b",
+ "Description": "",
+ "NetworkInterfaceId": "eni-0987654321",
+ "PrivateIpAddresses": [{"Primary": "True", "PrivateIpAddress": "11.22.33.44"}],
+ "RequesterManaged": False,
+ "SourceDestCheck": True,
+ "Status": "available",
+ "SubnetId": "subnet-07d906b8358869bda",
+ "TagSet": [
+ {"Key": "Name", "Value": "my-test-eni-name"},
+ ],
+ "VpcId": "vpc-0cb60952be96c9cd8",
+ },
+ ]
+ }
+
+ request_args = {"Filters": [{"Name": "owner-id", "Values": ["1234567890"]}]}
+
+ camel_network_interfaces = ec2_eni_info.list_eni(connection, module, request_args)
+
+ m_get_network_interfaces.call_count == 1
+ m_get_network_interfaces.assert_has_calls(
+ [
+ call(connection, module, request_args),
+ ]
+ )
+ assert len(camel_network_interfaces) == 2
+
+ assert camel_network_interfaces[0]["id"] == "eni-1234567890"
+ assert camel_network_interfaces[0]["tags"] == {}
+ assert camel_network_interfaces[0].get("name") is None
+
+ assert camel_network_interfaces[1]["id"] == "eni-0987654321"
+ assert camel_network_interfaces[1]["tags"] == {"Name": "my-test-eni-name"}
+ assert camel_network_interfaces[1]["name"] == "my-test-eni-name"
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py
new file mode 100644
index 000000000..6830fe358
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py
@@ -0,0 +1,224 @@
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import ANY
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import pytest
+
+from ansible_collections.amazon.aws.plugins.modules import ec2_import_image
+from ansible_collections.amazon.aws.plugins.modules import ec2_import_image_info
+
+module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_import_image"
+module_name_info = "ansible_collections.amazon.aws.plugins.modules.ec2_import_image_info"
+utils = "ansible_collections.amazon.aws.plugins.module_utils.ec2"
+
+expected_result = {
+ "import_task_id": "import-ami-0c207d759080a3dff",
+ "progress": "19",
+ "snapshot_details": [
+ {
+ "disk_image_size": 26843545600.0,
+ "format": "RAW",
+ "status": "active",
+ "user_bucket": {"s3_bucket": "clone-vm-s3-bucket", "s3_key": "clone-vm-s3-bucket/ubuntu-vm-clone.raw"},
+ }
+ ],
+ "status": "active",
+ "status_message": "converting",
+ "tags": {"Name": "clone-vm-import-image"},
+ "task_name": "clone-vm-import-image",
+}
+
+describe_import_image_tasks = [
+ {
+ "ImportTaskId": "import-ami-0c207d759080a3dff",
+ "Progress": "19",
+ "SnapshotDetails": [
+ {
+ "DiskImageSize": 26843545600.0,
+ "Format": "RAW",
+ "Status": "active",
+ "UserBucket": {"S3Bucket": "clone-vm-s3-bucket", "S3Key": "clone-vm-s3-bucket/ubuntu-vm-clone.raw"},
+ }
+ ],
+ "Status": "active",
+ "StatusMessage": "converting",
+ "Tags": [{"Key": "Name", "Value": "clone-vm-import-image"}],
+ }
+]
+
+
+@pytest.fixture
+def paginate():
+ # Create a MagicMock for the paginate object
+ paginate_mock = MagicMock()
+
+ return paginate_mock
+
+
+@pytest.fixture
+def conn_paginator(paginate):
+ conn_paginator_mock = MagicMock()
+ conn_paginator_mock.paginate.return_value = paginate
+ return conn_paginator_mock
+
+
+@pytest.fixture
+def client(conn_paginator):
+ client_mock = MagicMock()
+
+ # Configure the client.get_paginator to return the conn_paginator
+ client_mock.get_paginator.return_value = conn_paginator
+
+ return client_mock
+
+
+@pytest.fixture
+def module():
+ # Create a MagicMock for the module object
+ module_mock = MagicMock()
+ module_mock.params = {
+ "task_name": "clone-vm-import-image",
+ "disk_containers": [
+ {
+ "format": "raw",
+ "user_bucket": {"s3_bucket": "clone-vm-s3-bucket", "s3_key": "clone-vm-s3-bucket/ubuntu-vm-clone.raw"},
+ }
+ ],
+ }
+ module_mock.check_mode = False
+
+ return module_mock
+
+
+@pytest.mark.parametrize(
+ "side_effects, expected_result",
+ [
+ (
+ [{"ImportImageTasks": []}, {"ImportImageTasks": describe_import_image_tasks}],
+ {"changed": True, "import_image": expected_result},
+ ),
+ (
+ [{"ImportImageTasks": describe_import_image_tasks}, {"ImportImageTasks": describe_import_image_tasks}],
+ {
+ "changed": False,
+ "msg": "An import task with the specified name already exists",
+ "import_image": expected_result,
+ },
+ ),
+ ],
+)
+def test_present_no_check_mode(client, module, paginate, side_effects, expected_result):
+ paginate.build_full_result.side_effect = side_effects
+ module.exit_json.side_effect = SystemExit(1)
+
+ with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate):
+ with pytest.raises(SystemExit):
+ ec2_import_image.present(client, module)
+
+ module.exit_json.assert_called_with(**expected_result)
+
+
+@pytest.mark.parametrize(
+ "side_effects, expected_result",
+ [
+ (
+ [{"ImportImageTasks": []}, {"ImportImageTasks": describe_import_image_tasks}],
+ {"changed": True, "msg": "Would have created the import task if not in check mode"},
+ ),
+ (
+ [{"ImportImageTasks": describe_import_image_tasks}, {"ImportImageTasks": describe_import_image_tasks}],
+ {
+ "changed": False,
+ "msg": "An import task with the specified name already exists",
+ "import_image": expected_result,
+ },
+ ),
+ ],
+)
+def test_present_check_mode(client, module, paginate, side_effects, expected_result):
+ paginate.build_full_result.side_effect = side_effects
+ module.check_mode = True
+ module.exit_json.side_effect = SystemExit(1)
+
+ with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate):
+ with pytest.raises(SystemExit):
+ ec2_import_image.present(client, module)
+
+ module.exit_json.assert_called_with(**expected_result)
+
+
+@pytest.mark.parametrize(
+ "side_effect, expected_result",
+ [
+ (
+ [
+ {"ImportImageTasks": []},
+ ],
+ {
+ "changed": False,
+ "msg": "The specified import task does not exist or it cannot be cancelled",
+ "import_image": {},
+ },
+ ),
+ (
+ [
+ {"ImportImageTasks": describe_import_image_tasks},
+ ],
+ {"changed": True, "import_image": expected_result},
+ ),
+ ],
+)
+def test_absent_no_check_mode(client, module, paginate, side_effect, expected_result):
+ paginate.build_full_result.side_effect = side_effect
+ module.exit_json.side_effect = SystemExit(1)
+
+ with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate):
+ with pytest.raises(SystemExit):
+ ec2_import_image.absent(client, module)
+
+ module.exit_json.assert_called_with(**expected_result)
+
+
+@pytest.mark.parametrize(
+ "side_effect, expected_result",
+ [
+ (
+ [
+ {"ImportImageTasks": []},
+ ],
+ {
+ "changed": False,
+ "msg": "The specified import task does not exist or it cannot be cancelled",
+ "import_image": {},
+ },
+ ),
+ (
+ [
+ {"ImportImageTasks": describe_import_image_tasks},
+ ],
+ {"changed": True, "import_image": expected_result},
+ ),
+ ],
+)
+def test_present_check_mode(client, module, paginate, side_effect, expected_result):
+ paginate.build_full_result.side_effect = side_effect
+ module.exit_json.side_effect = SystemExit(1)
+
+ with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate):
+ with pytest.raises(SystemExit):
+ ec2_import_image.absent(client, module)
+
+ module.exit_json.assert_called_with(**expected_result)
+
+
+@patch(module_name_info + ".AnsibleAWSModule")
+def test_main_success(m_AnsibleAWSModule):
+ m_module = MagicMock()
+ m_AnsibleAWSModule.return_value = m_module
+
+ ec2_import_image_info.main()
+
+ m_module.client.assert_called_with("ec2", retry_decorator=ANY)
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py
index 2660ced63..cbcf02588 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py
@@ -1,17 +1,17 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+import copy
+import datetime
+from unittest.mock import ANY
from unittest.mock import MagicMock
from unittest.mock import patch
-from unittest.mock import call, ANY
-import pytest
import botocore
-import datetime
+import pytest
from dateutil.tz import tzutc
-from ansible.module_utils._text import to_bytes
-from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code
+from ansible.module_utils._text import to_bytes
from ansible_collections.amazon.aws.plugins.modules import ec2_key
@@ -19,47 +19,41 @@ module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_key"
def raise_botocore_exception_clienterror(action):
-
params = {
- 'Error': {
- 'Code': 1,
- 'Message': 'error creating key'
- },
- 'ResponseMetadata': {
- 'RequestId': '01234567-89ab-cdef-0123-456789abcdef'
- }
+ "Error": {"Code": 1, "Message": "error creating key"},
+ "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"},
}
- if action == 'create_key_pair':
- params['Error']['Message'] = 'error creating key'
+ if action == "create_key_pair":
+ params["Error"]["Message"] = "error creating key"
- elif action == 'describe_key_pair':
- params['Error']['Code'] = 'InvalidKeyPair.NotFound'
- params['Error']['Message'] = 'The key pair does not exist'
+ elif action == "describe_key_pair":
+ params["Error"]["Code"] = "InvalidKeyPair.NotFound"
+ params["Error"]["Message"] = "The key pair does not exist"
- elif action == 'import_key_pair':
- params['Error']['Message'] = 'error importing key'
+ elif action == "import_key_pair":
+ params["Error"]["Message"] = "error importing key"
- elif action == 'delete_key_pair':
- params['Error']['Message'] = 'error deleting key'
+ elif action == "delete_key_pair":
+ params["Error"]["Message"] = "error deleting key"
return botocore.exceptions.ClientError(params, action)
def test__import_key_pair():
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com"
expected_params = {
- 'KeyName': name,
- 'PublicKeyMaterial': to_bytes(key_material),
+ "KeyName": name,
+ "PublicKeyMaterial": to_bytes(key_material),
}
ec2_client.import_key_pair.return_value = {
- 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-012345678905a208d'
+ "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-012345678905a208d",
}
result = ec2_key._import_key_pair(ec2_client, name, key_material)
@@ -71,22 +65,21 @@ def test__import_key_pair():
def test_api_failure__import_key_pair():
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com"
expected_params = {
- 'KeyName': name,
- 'PublicKeyMaterial': to_bytes(key_material),
+ "KeyName": name,
+ "PublicKeyMaterial": to_bytes(key_material),
}
- ec2_client.import_key_pair.side_effect = raise_botocore_exception_clienterror('import_key_pair')
+ ec2_client.import_key_pair.side_effect = raise_botocore_exception_clienterror("import_key_pair")
with pytest.raises(ec2_key.Ec2KeyFailure):
ec2_key._import_key_pair(ec2_client, name, key_material)
def test_extract_key_data_describe_key_pairs():
-
key = {
"CreateTime": datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()),
"KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
@@ -96,66 +89,61 @@ def test_extract_key_data_describe_key_pairs():
}
key_type = "rsa"
-
+ file_name = MagicMock()
expected_result = {
"name": "my_keypair",
"fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"id": "key-043046ef2a9a80b56",
"tags": {},
- "type": "rsa"
+ "type": "rsa",
}
- result = ec2_key.extract_key_data(key, key_type)
+ result = ec2_key.extract_key_data(key, key_type, file_name)
assert result == expected_result
def test_extract_key_data_create_key_pair():
-
key = {
- 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-043046ef2a9a80b56'
+ "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-043046ef2a9a80b56",
}
key_type = "rsa"
-
+ file_name = MagicMock()
expected_result = {
"name": "my_keypair",
"fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"id": "key-043046ef2a9a80b56",
"tags": {},
- "type": "rsa"
+ "type": "rsa",
}
- result = ec2_key.extract_key_data(key, key_type)
+ result = ec2_key.extract_key_data(key, key_type, file_name)
assert result == expected_result
-@patch(module_name + '.delete_key_pair')
-@patch(module_name + '._import_key_pair')
-@patch(module_name + '.find_key_pair')
+@patch(module_name + ".delete_key_pair")
+@patch(module_name + "._import_key_pair")
+@patch(module_name + ".find_key_pair")
def test_get_key_fingerprint(m_find_key_pair, m_import_key_pair, m_delete_key_pair):
-
module = MagicMock()
ec2_client = MagicMock()
+ file_name = MagicMock()
m_find_key_pair.return_value = None
m_import_key_pair.return_value = {
- 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-043046ef2a9a80b56'
+ "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-043046ef2a9a80b56",
}
- m_delete_key_pair.return_value = {
- 'changed': True,
- 'key': None,
- 'msg': 'key deleted'
- }
+ m_delete_key_pair.return_value = {"changed": True, "key": None, "msg": "key deleted"}
- expected_result = 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62'
+ expected_result = "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62"
key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com"
@@ -169,17 +157,17 @@ def test_get_key_fingerprint(m_find_key_pair, m_import_key_pair, m_delete_key_pa
def test_find_key_pair():
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
ec2_client.describe_key_pairs.return_value = {
- 'KeyPairs': [
+ "KeyPairs": [
{
- 'CreateTime': datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()),
- 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-043046ef2a9a80b56',
- 'KeyType': 'rsa',
- 'Tags': []
+ "CreateTime": datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()),
+ "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-043046ef2a9a80b56",
+ "KeyType": "rsa",
+ "Tags": [],
}
],
}
@@ -192,7 +180,7 @@ def test_find_key_pair():
def test_api_failure_find_key_pair():
ec2_client = MagicMock()
- name = 'non_existing_keypair'
+ name = "non_existing_keypair"
ec2_client.describe_key_pairs.side_effect = botocore.exceptions.BotoCoreError
@@ -202,9 +190,9 @@ def test_api_failure_find_key_pair():
def test_invalid_key_pair_find_key_pair():
ec2_client = MagicMock()
- name = 'non_existing_keypair'
+ name = "non_existing_keypair"
- ec2_client.describe_key_pairs.side_effect = raise_botocore_exception_clienterror('describe_key_pair')
+ ec2_client.describe_key_pairs.side_effect = raise_botocore_exception_clienterror("describe_key_pair")
result = ec2_key.find_key_pair(ec2_client, name)
@@ -213,11 +201,11 @@ def test_invalid_key_pair_find_key_pair():
def test__create_key_pair():
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
tag_spec = None
key_type = None
- expected_params = {'KeyName': name}
+ expected_params = {"KeyName": name}
ec2_client.create_key_pair.return_value = {
"KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62",
@@ -239,33 +227,33 @@ def test__create_key_pair():
def test_api_failure__create_key_pair():
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
tag_spec = None
key_type = None
- ec2_client.create_key_pair.side_effect = raise_botocore_exception_clienterror('create_key_pair')
+ ec2_client.create_key_pair.side_effect = raise_botocore_exception_clienterror("create_key_pair")
with pytest.raises(ec2_key.Ec2KeyFailure):
ec2_key._create_key_pair(ec2_client, name, tag_spec, key_type)
-@patch(module_name + '.extract_key_data')
-@patch(module_name + '._import_key_pair')
+@patch(module_name + ".extract_key_data")
+@patch(module_name + "._import_key_pair")
def test_create_new_key_pair_key_material(m_import_key_pair, m_extract_key_data):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com"
- key_type = 'rsa'
+ key_type = "rsa"
tags = None
-
+ file_name = MagicMock()
module.check_mode = False
m_import_key_pair.return_value = {
- 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-012345678905a208d'
+ "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-012345678905a208d",
}
m_extract_key_data.return_value = {
@@ -273,35 +261,36 @@ def test_create_new_key_pair_key_material(m_import_key_pair, m_extract_key_data)
"fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"id": "key-043046ef2a9a80b56",
"tags": {},
- "type": "rsa"
+ "type": "rsa",
}
- expected_result = {'changed': True, 'key': m_extract_key_data.return_value, 'msg': 'key pair created'}
+ expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair created"}
- result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, module.check_mode)
+ result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, file_name, module.check_mode)
assert result == expected_result
assert m_import_key_pair.call_count == 1
assert m_extract_key_data.call_count == 1
-@patch(module_name + '.extract_key_data')
-@patch(module_name + '._create_key_pair')
+@patch(module_name + ".extract_key_data")
+@patch(module_name + "._create_key_pair")
def test_create_new_key_pair_no_key_material(m_create_key_pair, m_extract_key_data):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
- key_type = 'rsa'
+ name = "my_keypair"
+ key_type = "rsa"
key_material = None
tags = None
-
+ file_name = MagicMock()
+ # TODO. file_name=sth
module.check_mode = False
m_create_key_pair.return_value = {
- 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-012345678905a208d'
+ "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-012345678905a208d",
}
m_extract_key_data.return_value = {
@@ -309,12 +298,12 @@ def test_create_new_key_pair_no_key_material(m_create_key_pair, m_extract_key_da
"fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"id": "key-043046ef2a9a80b56",
"tags": {},
- "type": "rsa"
+ "type": "rsa",
}
- expected_result = {'changed': True, 'key': m_extract_key_data.return_value, 'msg': 'key pair created'}
+ expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair created"}
- result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, module.check_mode)
+ result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, file_name, module.check_mode)
assert result == expected_result
assert m_create_key_pair.call_count == 1
@@ -324,7 +313,7 @@ def test_create_new_key_pair_no_key_material(m_create_key_pair, m_extract_key_da
def test__delete_key_pair():
ec2_client = MagicMock()
- key_name = 'my_keypair'
+ key_name = "my_keypair"
ec2_key._delete_key_pair(ec2_client, key_name)
assert ec2_client.delete_key_pair.call_count == 1
@@ -333,23 +322,25 @@ def test__delete_key_pair():
def test_api_failure__delete_key_pair():
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
- ec2_client.delete_key_pair.side_effect = raise_botocore_exception_clienterror('delete_key_pair')
+ ec2_client.delete_key_pair.side_effect = raise_botocore_exception_clienterror("delete_key_pair")
with pytest.raises(ec2_key.Ec2KeyFailure):
ec2_key._delete_key_pair(ec2_client, name)
-@patch(module_name + '.extract_key_data')
-@patch(module_name + '._import_key_pair')
-@patch(module_name + '.delete_key_pair')
-@patch(module_name + '.get_key_fingerprint')
-def test_update_key_pair_by_key_material_update_needed(m_get_key_fingerprint, m_delete_key_pair, m__import_key_pair, m_extract_key_data):
+@patch(module_name + ".extract_key_data")
+@patch(module_name + "._import_key_pair")
+@patch(module_name + ".delete_key_pair")
+@patch(module_name + ".get_key_fingerprint")
+def test_update_key_pair_by_key_material_update_needed(
+ m_get_key_fingerprint, m_delete_key_pair, m__import_key_pair, m_extract_key_data
+):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com"
tag_spec = None
key = {
@@ -358,16 +349,15 @@ def test_update_key_pair_by_key_material_update_needed(m_get_key_fingerprint, m_
"KeyPairId": "key-043046ef2a9a80b56",
"Tags": {},
}
-
module.check_mode = False
- m_get_key_fingerprint.return_value = 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62'
+ m_get_key_fingerprint.return_value = "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62"
m_delete_key_pair.return_value = None
m__import_key_pair.return_value = {
- 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-043046ef2a9a80b56',
- 'Tags': {},
+ "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-043046ef2a9a80b56",
+ "Tags": {},
}
m_extract_key_data.return_value = {
"name": "my_keypair",
@@ -376,7 +366,7 @@ def test_update_key_pair_by_key_material_update_needed(m_get_key_fingerprint, m_
"tags": {},
}
- expected_result = {'changed': True, 'key': m_extract_key_data.return_value, 'msg': "key pair updated"}
+ expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair updated"}
result = ec2_key.update_key_pair_by_key_material(module.check_mode, ec2_client, name, key, key_material, tag_spec)
@@ -407,7 +397,6 @@ def test_update_key_pair_by_key_material_key_exists(m_get_key_fingerprint, m_ext
"KeyPairId": key_id,
"Tags": {},
}
-
check_mode = False
m_get_key_fingerprint.return_value = key_fingerprint
m_extract_key_data.return_value = {
@@ -434,31 +423,31 @@ def test_update_key_pair_by_key_type_update_needed(m_delete_key_pair, m__create_
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
- key_type = 'rsa'
+ name = "my_keypair"
+ key_type = "rsa"
tag_spec = None
-
+ file_name = MagicMock()
module.check_mode = False
m_delete_key_pair.return_value = None
m__create_key_pair.return_value = {
- 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa',
- 'Name': 'my_keypair',
- 'Id': 'key-043046ef2a9a80b56',
- 'Tags': {},
- 'Type': 'rsa'
+ "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
+ "Name": "my_keypair",
+ "Id": "key-043046ef2a9a80b56",
+ "Tags": {},
+ "Type": "rsa",
}
m_extract_key_data.return_value = {
"name": "my_keypair",
"fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"id": "key-043046ef2a9a80b56",
"tags": {},
- "type": "rsa"
+ "type": "rsa",
}
expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair updated"}
- result = ec2_key.update_key_pair_by_key_type(module.check_mode, ec2_client, name, key_type, tag_spec)
+ result = ec2_key.update_key_pair_by_key_type(module.check_mode, ec2_client, name, key_type, tag_spec, file_name)
assert result == expected_result
assert m_delete_key_pair.call_count == 1
@@ -466,30 +455,30 @@ def test_update_key_pair_by_key_type_update_needed(m_delete_key_pair, m__create_
assert m_extract_key_data.call_count == 1
m_delete_key_pair.assert_called_with(module.check_mode, ec2_client, name, finish_task=False)
m__create_key_pair.assert_called_with(ec2_client, name, tag_spec, key_type)
- m_extract_key_data.assert_called_with(m__create_key_pair.return_value, key_type)
+ m_extract_key_data.assert_called_with(m__create_key_pair.return_value, key_type, file_name)
-@patch(module_name + '.update_key_pair_by_key_material')
+@patch(module_name + ".update_key_pair_by_key_material")
def test_handle_existing_key_pair_update_key_matrial_with_force(m_update_key_pair_by_key_material):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key = {
"KeyName": "my_keypair",
"KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"KeyPairId": "key-043046ef2a9a80b56",
"Tags": {},
- "KeyType": "rsa"
+ "KeyType": "rsa",
}
module.params = {
- 'key_material': "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com",
- 'force': True,
- 'key_type': 'rsa',
- 'tags': None,
- 'purge_tags': True,
- 'tag_spec': None
+ "key_material": "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com",
+ "force": True,
+ "key_type": "rsa",
+ "tags": None,
+ "purge_tags": True,
+ "tag_spec": None,
}
key_data = {
@@ -499,9 +488,9 @@ def test_handle_existing_key_pair_update_key_matrial_with_force(m_update_key_pai
"tags": {},
}
- m_update_key_pair_by_key_material.return_value = {'changed': True, 'key': key_data, 'msg': "key pair updated"}
+ m_update_key_pair_by_key_material.return_value = {"changed": True, "key": key_data, "msg": "key pair updated"}
- expected_result = {'changed': True, 'key': key_data, 'msg': "key pair updated"}
+ expected_result = {"changed": True, "key": key_data, "msg": "key pair updated"}
result = ec2_key.handle_existing_key_pair_update(module, ec2_client, name, key)
@@ -509,27 +498,27 @@ def test_handle_existing_key_pair_update_key_matrial_with_force(m_update_key_pai
assert m_update_key_pair_by_key_material.call_count == 1
-@patch(module_name + '.update_key_pair_by_key_type')
+@patch(module_name + ".update_key_pair_by_key_type")
def test_handle_existing_key_pair_update_key_type(m_update_key_pair_by_key_type):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key = {
"KeyName": "my_keypair",
"KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"KeyPairId": "key-043046ef2a9a80b56",
"Tags": {},
- "KeyType": "ed25519"
+ "KeyType": "ed25519",
}
module.params = {
- 'key_material': "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com",
- 'force': False,
- 'key_type': 'rsa',
- 'tags': None,
- 'purge_tags': True,
- 'tag_spec': None
+ "key_material": "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com",
+ "force": False,
+ "key_type": "rsa",
+ "tags": None,
+ "purge_tags": True,
+ "tag_spec": None,
}
key_data = {
@@ -539,9 +528,9 @@ def test_handle_existing_key_pair_update_key_type(m_update_key_pair_by_key_type)
"tags": {},
}
- m_update_key_pair_by_key_type.return_value = {'changed': True, 'key': key_data, 'msg': "key pair updated"}
+ m_update_key_pair_by_key_type.return_value = {"changed": True, "key": key_data, "msg": "key pair updated"}
- expected_result = {'changed': True, 'key': key_data, 'msg': "key pair updated"}
+ expected_result = {"changed": True, "key": key_data, "msg": "key pair updated"}
result = ec2_key.handle_existing_key_pair_update(module, ec2_client, name, key)
@@ -549,27 +538,27 @@ def test_handle_existing_key_pair_update_key_type(m_update_key_pair_by_key_type)
assert m_update_key_pair_by_key_type.call_count == 1
-@patch(module_name + '.extract_key_data')
+@patch(module_name + ".extract_key_data")
def test_handle_existing_key_pair_else(m_extract_key_data):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
+ name = "my_keypair"
key = {
"KeyName": "my_keypair",
"KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"KeyPairId": "key-043046ef2a9a80b56",
"Tags": {},
- "KeyType": "rsa"
+ "KeyType": "rsa",
}
module.params = {
- 'key_material': "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com",
- 'force': False,
- 'key_type': 'rsa',
- 'tags': None,
- 'purge_tags': True,
- 'tag_spec': None
+ "key_material": "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com",
+ "force": False,
+ "key_type": "rsa",
+ "tags": None,
+ "purge_tags": True,
+ "tag_spec": None,
}
m_extract_key_data.return_value = {
@@ -577,7 +566,7 @@ def test_handle_existing_key_pair_else(m_extract_key_data):
"fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
"id": "key-043046ef2a9a80b56",
"tags": {},
- "type": "rsa"
+ "type": "rsa",
}
expected_result = {"changed": False, "key": m_extract_key_data.return_value, "msg": "key pair already exists"}
@@ -588,55 +577,53 @@ def test_handle_existing_key_pair_else(m_extract_key_data):
assert m_extract_key_data.call_count == 1
-@patch(module_name + '._delete_key_pair')
-@patch(module_name + '.find_key_pair')
-def test_delete_key_pair_key_exists(m_find_key_pair, m_delete_key_pair):
+@patch(module_name + "._delete_key_pair")
+@patch(module_name + ".find_key_pair")
+def test_delete_key_pair_key_exists(m_find_key_pair, m_delete_key_pair, tmp_path):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
-
+ name = "my_keypair"
+ file_name = tmp_path / "private_key_data.pem"
module.check_mode = False
m_find_key_pair.return_value = {
- 'KeyPairs': [
+ "KeyPairs": [
{
- 'CreateTime': datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()),
- 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa',
- 'KeyName': 'my_keypair',
- 'KeyPairId': 'key-043046ef2a9a80b56',
- 'KeyType': 'rsa',
- 'Tags': []
+ "CreateTime": datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()),
+ "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
+ "KeyName": "my_keypair",
+ "KeyPairId": "key-043046ef2a9a80b56",
+ "KeyType": "rsa",
+ "Tags": [],
}
],
}
- expected_result = {'changed': True, 'key': None, 'msg': 'key deleted'}
-
result = ec2_key.delete_key_pair(module.check_mode, ec2_client, name)
assert m_find_key_pair.call_count == 1
m_find_key_pair.assert_called_with(ec2_client, name)
assert m_delete_key_pair.call_count == 1
m_delete_key_pair.assert_called_with(ec2_client, name)
- assert result == expected_result
+ assert result == {"changed": True, "key": None, "msg": "key deleted"}
-@patch(module_name + '._delete_key_pair')
-@patch(module_name + '.find_key_pair')
+@patch(module_name + "._delete_key_pair")
+@patch(module_name + ".find_key_pair")
def test_delete_key_pair_key_not_exist(m_find_key_pair, m_delete_key_pair):
module = MagicMock()
ec2_client = MagicMock()
- name = 'my_keypair'
-
+ name = "my_keypair"
+ file_name = "non_existing_file_path"
module.check_mode = False
m_find_key_pair.return_value = None
- expected_result = {'key': None, 'msg': 'key did not exist'}
+ expected_result = {"key": None, "msg": "key did not exist"}
- result = ec2_key.delete_key_pair(module.check_mode, ec2_client, name)
+ result = ec2_key.delete_key_pair(module.check_mode, ec2_client, name, file_name)
assert m_find_key_pair.call_count == 1
m_find_key_pair.assert_called_with(ec2_client, name)
@@ -644,6 +631,24 @@ def test_delete_key_pair_key_not_exist(m_find_key_pair, m_delete_key_pair):
assert result == expected_result
+def test__write_private_key(tmp_path):
+ key_data = {
+ "name": "my_keypair",
+ "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa",
+ "id": "key-043046ef2a9a80b56",
+ "tags": {},
+ "type": "rsa",
+ "private_key": "ABCDEFGH",
+ }
+ file_name = tmp_path / "id_rsa_key"
+ saved_key_data = copy.deepcopy(key_data)
+ result = ec2_key._write_private_key(key_data, str(file_name))
+
+ assert "private_key" not in result.keys()
+ del saved_key_data["private_key"]
+ assert saved_key_data == result
+
+
@patch(module_name + ".AnsibleAWSModule")
def test_main_success(m_AnsibleAWSModule):
m_module = MagicMock()
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py
new file mode 100644
index 000000000..23ba85003
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py
@@ -0,0 +1,101 @@
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import gzip
+import io
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import pytest
+
+from ansible_collections.amazon.aws.plugins.modules import ec2_metadata_facts
+
+module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_metadata_facts"
+
+
+class FailJson(Exception):
+ pass
+
+
+@pytest.fixture()
+def ec2_instance():
+ module = MagicMock()
+ return ec2_metadata_facts.Ec2Metadata(module)
+
+
+@patch(module_name + ".fetch_url")
+def test__fetch_401(m_fetch_url, ec2_instance):
+ ec2_instance.module.fail_json.side_effect = FailJson()
+ m_fetch_url.return_value = (None, {"status": 401, "msg": "Oops"})
+ with pytest.raises(FailJson):
+ ec2_instance._fetch("http://169.254.169.254/latest/meta-data/")
+
+
+@patch(module_name + ".fetch_url")
+def test__fetch_200(m_fetch_url, ec2_instance):
+ m_fetch_url.return_value = (io.StringIO("my-value"), {"status": 200})
+ assert ec2_instance._fetch("http://169.254.169.254/latest/meta-data/ami-id") == "my-value"
+
+ m_fetch_url.return_value = (io.StringIO("1"), {"status": 200})
+ assert ec2_instance._fetch("http://169.254.169.254/latest/meta-data/ami-id") == "1"
+
+
+@patch(module_name + ".fetch_url")
+def test_fetch(m_fetch_url, ec2_instance):
+ raw_list = "ami-id\n"
+ m_fetch_url.side_effect = [
+ (io.StringIO(raw_list), {"status": 200}),
+ (io.StringIO("my-value"), {"status": 200}),
+ ]
+ ec2_instance.fetch("http://169.254.169.254/latest/meta-data/")
+ assert ec2_instance._data == {"http://169.254.169.254/latest/meta-data/ami-id": "my-value"}
+
+
+@patch(module_name + ".fetch_url")
+def test_fetch_recusive(m_fetch_url, ec2_instance):
+ raw_list = "whatever/\n"
+ m_fetch_url.side_effect = [
+ (io.StringIO(raw_list), {"status": 200}),
+ (io.StringIO("my-key"), {"status": 200}),
+ (io.StringIO("my-value"), {"status": 200}),
+ ]
+ ec2_instance.fetch("http://169.254.169.254/latest/meta-data/")
+ assert ec2_instance._data == {"http://169.254.169.254/latest/meta-data/whatever/my-key": "my-value"}
+
+
+@patch(module_name + ".fetch_url")
+def test__fetch_user_data_compressed(m_fetch_url, ec2_instance):
+ user_data = b"""Content-Type: multipart/mixed; boundary="MIMEBOUNDARY"
+MIME-Version: 1.0
+
+--MIMEBOUNDARY
+Content-Transfer-Encoding: 7bit
+Content-Type: text/cloud-config
+Mime-Version: 1.0
+
+packages: ['httpie']
+
+--MIMEBOUNDARY--
+"""
+
+ m_fetch_url.return_value = (io.BytesIO(gzip.compress(user_data)), {"status": 200})
+ assert ec2_instance._fetch("http://169.254.169.254/latest/user-data") == user_data.decode("utf-8")
+
+
+@patch(module_name + ".fetch_url")
+def test__fetch_user_data_plain(m_fetch_url, ec2_instance):
+ user_data = b"""Content-Type: multipart/mixed; boundary="MIMEBOUNDARY"
+MIME-Version: 1.0
+
+--MIMEBOUNDARY
+Content-Transfer-Encoding: 7bit
+Content-Type: text/cloud-config
+Mime-Version: 1.0
+
+packages: ['httpie']
+
+--MIMEBOUNDARY--
+"""
+
+ m_fetch_url.return_value = (io.BytesIO(user_data), {"status": 200})
+ assert ec2_instance._fetch("http://169.254.169.254/latest/user-data") == user_data.decode("utf-8")
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py
index 1ebbe86c6..c47122657 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py
@@ -1,83 +1,59 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
from ansible_collections.amazon.aws.plugins.modules import ec2_security_group as group_module
def test_from_permission():
internal_http = {
- 'FromPort': 80,
- 'IpProtocol': 'tcp',
- 'IpRanges': [
- {
- 'CidrIp': '10.0.0.0/8',
- 'Description': 'Foo Bar Baz'
- },
+ "FromPort": 80,
+ "IpProtocol": "tcp",
+ "IpRanges": [
+ {"CidrIp": "10.0.0.0/8", "Description": "Foo Bar Baz"},
],
- 'Ipv6Ranges': [
- {'CidrIpv6': 'fe80::94cc:8aff:fef6:9cc/64'},
+ "Ipv6Ranges": [
+ {"CidrIpv6": "fe80::94cc:8aff:fef6:9cc/64"},
],
- 'PrefixListIds': [],
- 'ToPort': 80,
- 'UserIdGroupPairs': [],
+ "PrefixListIds": [],
+ "ToPort": 80,
+ "UserIdGroupPairs": [],
}
perms = list(group_module.rule_from_group_permission(internal_http))
assert len(perms) == 2
- assert perms[0].target == '10.0.0.0/8'
- assert perms[0].target_type == 'ipv4'
- assert perms[0].description == 'Foo Bar Baz'
- assert perms[1].target == 'fe80::94cc:8aff:fef6:9cc/64'
+ assert perms[0].target == "10.0.0.0/8"
+ assert perms[0].target_type == "ipv4"
+ assert perms[0].description == "Foo Bar Baz"
+ assert perms[1].target == "fe80::94cc:8aff:fef6:9cc/64"
global_egress = {
- 'IpProtocol': '-1',
- 'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
- 'Ipv6Ranges': [],
- 'PrefixListIds': [],
- 'UserIdGroupPairs': []
+ "IpProtocol": "-1",
+ "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
+ "Ipv6Ranges": [],
+ "PrefixListIds": [],
+ "UserIdGroupPairs": [],
}
perms = list(group_module.rule_from_group_permission(global_egress))
assert len(perms) == 1
- assert perms[0].target == '0.0.0.0/0'
+ assert perms[0].target == "0.0.0.0/0"
assert perms[0].port_range == (None, None)
internal_prefix_http = {
- 'FromPort': 80,
- 'IpProtocol': 'tcp',
- 'PrefixListIds': [
- {'PrefixListId': 'p-1234'}
- ],
- 'ToPort': 80,
- 'UserIdGroupPairs': [],
+ "FromPort": 80,
+ "IpProtocol": "tcp",
+ "PrefixListIds": [{"PrefixListId": "p-1234"}],
+ "ToPort": 80,
+ "UserIdGroupPairs": [],
}
perms = list(group_module.rule_from_group_permission(internal_prefix_http))
assert len(perms) == 1
- assert perms[0].target == 'p-1234'
+ assert perms[0].target == "p-1234"
def test_rule_to_permission():
tests = [
- group_module.Rule((22, 22), 'udp', 'sg-1234567890', 'group', None),
- group_module.Rule((1, 65535), 'tcp', '0.0.0.0/0', 'ipv4', "All TCP from everywhere"),
- group_module.Rule((443, 443), 'tcp', 'ip-123456', 'ip_prefix', "Traffic to privatelink IPs"),
- group_module.Rule((443, 443), 'tcp', 'feed:dead:::beef/64', 'ipv6', None),
+ group_module.Rule((22, 22), "udp", "sg-1234567890", "group", None),
+ group_module.Rule((1, 65535), "tcp", "0.0.0.0/0", "ipv4", "All TCP from everywhere"),
+ group_module.Rule((443, 443), "tcp", "ip-123456", "ip_prefix", "Traffic to privatelink IPs"),
+ group_module.Rule((443, 443), "tcp", "feed:dead:::beef/64", "ipv6", None),
]
for test in tests:
perm = group_module.to_permission(test)
- assert perm['FromPort'], perm['ToPort'] == test.port_range
- assert perm['IpProtocol'] == test.protocol
-
-
-def test_validate_ip():
- class Warner(object):
- def warn(self, msg):
- return
- ips = [
- ('10.1.1.1/24', '10.1.1.0/24'),
- ('192.168.56.101/16', '192.168.0.0/16'),
- # Don't modify IPv6 CIDRs, AWS supports /128 and device ranges
- ('fc00:8fe0:fe80:b897:8990:8a7c:99bf:323d/128', 'fc00:8fe0:fe80:b897:8990:8a7c:99bf:323d/128'),
- ]
-
- for ip, net in ips:
- assert group_module.validate_ip(Warner(), ip) == net
+ assert perm["FromPort"], perm["ToPort"] == test.port_range
+ assert perm["IpProtocol"] == test.protocol
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py
new file mode 100644
index 000000000..34767d38a
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py
@@ -0,0 +1,128 @@
+# (c) 2022 Red Hat Inc.
+
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import ANY
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
+
+import pytest
+
+from ansible_collections.amazon.aws.plugins.modules import ec2_snapshot_info
+
+module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_snapshot_info"
+
+
+@pytest.mark.parametrize(
+ "snapshot_ids,owner_ids,restorable_by_user_ids,filters,max_results,next_token_id,expected",
+ [([], [], [], {}, None, None, {})],
+)
+def test_build_request_args(
+ snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id, expected
+):
+ assert (
+ ec2_snapshot_info.build_request_args(
+ snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id
+ )
+ == expected
+ )
+
+
+def test_get_snapshots():
+ module = MagicMock()
+ connection = MagicMock()
+
+ connection.describe_snapshots.return_value = {
+ "Snapshots": [
+ {
+ "Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890",
+ "Encrypted": False,
+ "OwnerId": "123456789000",
+ "Progress": "100%",
+ "SnapshotId": "snap-0f00cba1234567890",
+ "StartTime": "2021-09-30T01:04:49.724000+00:00",
+ "State": "completed",
+ "StorageTier": "standard",
+ "Tags": [
+ {"Key": "TagKey", "Value": "TagValue"},
+ ],
+ "VolumeId": "vol-0ae6c5e1234567890",
+ "VolumeSize": 10,
+ },
+ {
+ "Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890",
+ "Encrypted": False,
+ "OwnerId": "123456789000",
+ "Progress": "100%",
+ "SnapshotId": "snap-0f00cba1234567890",
+ "StartTime": "2021-09-30T01:04:49.724000+00:00",
+ "State": "completed",
+ "StorageTier": "standard",
+ "Tags": [
+ {"Key": "TagKey", "Value": "TagValue"},
+ ],
+ "VolumeId": "vol-0ae6c5e1234567890",
+ "VolumeSize": 10,
+ },
+ ]
+ }
+
+ request_args = {"SnapshotIds": ["snap-0f00cba1234567890"]}
+
+ snapshot_info = ec2_snapshot_info.get_snapshots(connection, module, request_args)
+
+ assert connection.describe_snapshots.call_count == 1
+ connection.describe_snapshots.assert_called_with(aws_retry=True, SnapshotIds=["snap-0f00cba1234567890"])
+ assert len(snapshot_info["Snapshots"]) == 2
+
+
+@patch(module_name + ".build_request_args")
+@patch(module_name + ".get_snapshots")
+def test_list_ec2_snapshots(m_get_snapshots, m_build_request_args):
+ module = MagicMock()
+ connection = MagicMock()
+
+ m_get_snapshots.return_value = {
+ "Snapshots": [
+ {
+ "Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890",
+ "Encrypted": False,
+ "OwnerId": "123456789000",
+ "Progress": "100%",
+ "SnapshotId": "snap-0f00cba1234567890",
+ "StartTime": "2021-09-30T01:04:49.724000+00:00",
+ "State": "completed",
+ "StorageTier": "standard",
+ "Tags": [
+ {"Key": "TagKey", "Value": "TagValue"},
+ ],
+ "VolumeId": "vol-0ae6c5e1234567890",
+ "VolumeSize": 10,
+ }
+ ]
+ }
+
+ m_build_request_args.return_value = {"SnapshotIds": ["snap-0f00cba1234567890"]}
+
+ request_args = ec2_snapshot_info.build_request_args()
+
+ ec2_snapshot_info.list_ec2_snapshots(connection, module, request_args)
+
+ assert m_get_snapshots.call_count == 1
+ m_get_snapshots.assert_has_calls(
+ [
+ call(connection, module, m_build_request_args.return_value),
+ ]
+ )
+
+
+@patch(module_name + ".AnsibleAWSModule")
+def test_main_success(m_AnsibleAWSModule):
+ m_module = MagicMock()
+ m_AnsibleAWSModule.return_value = m_module
+
+ ec2_snapshot_info.main()
+
+ m_module.client.assert_called_with("ec2", retry_decorator=ANY)
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py
index 73726590f..27517115e 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py
@@ -3,66 +3,71 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-# Magic... Incorrectly identified by pylint as unused
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import
-from ansible_collections.amazon.aws.tests.unit.compat.mock import patch
+from unittest.mock import patch
from ansible_collections.amazon.aws.plugins.modules import ec2_vpc_dhcp_option as dhcp_module
from ansible_collections.amazon.aws.tests.unit.plugins.modules.utils import ModuleTestCase
-test_module_params = {'domain_name': 'us-west-2.compute.internal',
- 'dns_servers': ['AmazonProvidedDNS'],
- 'ntp_servers': ['10.10.2.3', '10.10.4.5'],
- 'netbios_name_servers': ['10.20.2.3', '10.20.4.5'],
- 'netbios_node_type': 2}
-
-test_create_config = [{'Key': 'domain-name', 'Values': [{'Value': 'us-west-2.compute.internal'}]},
- {'Key': 'domain-name-servers', 'Values': [{'Value': 'AmazonProvidedDNS'}]},
- {'Key': 'ntp-servers', 'Values': [{'Value': '10.10.2.3'}, {'Value': '10.10.4.5'}]},
- {'Key': 'netbios-name-servers', 'Values': [{'Value': '10.20.2.3'}, {'Value': '10.20.4.5'}]},
- {'Key': 'netbios-node-type', 'Values': 2}]
-
-
-test_create_option_set = [{'Key': 'domain-name', 'Values': ['us-west-2.compute.internal']},
- {'Key': 'domain-name-servers', 'Values': ['AmazonProvidedDNS']},
- {'Key': 'ntp-servers', 'Values': ['10.10.2.3', '10.10.4.5']},
- {'Key': 'netbios-name-servers', 'Values': ['10.20.2.3', '10.20.4.5']},
- {'Key': 'netbios-node-type', 'Values': ['2']}]
-
-test_normalize_config = {'domain-name': ['us-west-2.compute.internal'],
- 'domain-name-servers': ['AmazonProvidedDNS'],
- 'ntp-servers': ['10.10.2.3', '10.10.4.5'],
- 'netbios-name-servers': ['10.20.2.3', '10.20.4.5'],
- 'netbios-node-type': '2'
- }
-
-
-class FakeModule(object):
+# Magic... Incorrectly identified by pylint as unused
+# pylint: disable-next=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+test_module_params = {
+ "domain_name": "us-west-2.compute.internal",
+ "dns_servers": ["AmazonProvidedDNS"],
+ "ntp_servers": ["10.10.2.3", "10.10.4.5"],
+ "netbios_name_servers": ["10.20.2.3", "10.20.4.5"],
+ "netbios_node_type": 2,
+}
+
+test_create_config = [
+ {"Key": "domain-name", "Values": [{"Value": "us-west-2.compute.internal"}]},
+ {"Key": "domain-name-servers", "Values": [{"Value": "AmazonProvidedDNS"}]},
+ {"Key": "ntp-servers", "Values": [{"Value": "10.10.2.3"}, {"Value": "10.10.4.5"}]},
+ {"Key": "netbios-name-servers", "Values": [{"Value": "10.20.2.3"}, {"Value": "10.20.4.5"}]},
+ {"Key": "netbios-node-type", "Values": 2},
+]
+
+
+test_create_option_set = [
+ {"Key": "domain-name", "Values": ["us-west-2.compute.internal"]},
+ {"Key": "domain-name-servers", "Values": ["AmazonProvidedDNS"]},
+ {"Key": "ntp-servers", "Values": ["10.10.2.3", "10.10.4.5"]},
+ {"Key": "netbios-name-servers", "Values": ["10.20.2.3", "10.20.4.5"]},
+ {"Key": "netbios-node-type", "Values": ["2"]},
+]
+
+test_normalize_config = {
+ "domain-name": ["us-west-2.compute.internal"],
+ "domain-name-servers": ["AmazonProvidedDNS"],
+ "ntp-servers": ["10.10.2.3", "10.10.4.5"],
+ "netbios-name-servers": ["10.20.2.3", "10.20.4.5"],
+ "netbios-node-type": "2",
+}
+
+
+class FakeModule:
def __init__(self, **kwargs):
self.params = kwargs
def fail_json(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise Exception('FAIL')
+ raise Exception("FAIL")
def fail_json_aws(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise Exception('FAIL')
+ raise Exception("FAIL")
def exit_json(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise Exception('EXIT')
+ raise Exception("EXIT")
-@patch.object(dhcp_module.AnsibleAWSModule, 'client')
+@patch.object(dhcp_module.AnsibleAWSModule, "client")
class TestDhcpModule(ModuleTestCase):
-
def test_normalize_config(self, client_mock):
result = dhcp_module.normalize_ec2_vpc_dhcp_config(test_create_config)
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py
index 5a53e2ddb..b2d8e0b50 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py
@@ -4,12 +4,11 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-import pytest
+from unittest.mock import MagicMock
+from unittest.mock import patch
-from unittest.mock import MagicMock, call, patch
from ansible_collections.amazon.aws.plugins.modules import kms_key
-
module_name = "ansible_collections.amazon.aws.plugins.modules.kms_key"
key_details = {
"KeyMetadata": {
@@ -59,7 +58,6 @@ key_details = {
@patch(module_name + ".get_kms_metadata_with_backoff")
def test_fetch_key_metadata(m_get_kms_metadata_with_backoff):
-
module = MagicMock()
kms_client = MagicMock()
@@ -69,14 +67,8 @@ def test_fetch_key_metadata(m_get_kms_metadata_with_backoff):
def test_validate_params():
-
module = MagicMock()
- module.params = {
- "state": "present",
- "multi_region": True
- }
+ module.params = {"state": "present", "multi_region": True}
result = kms_key.validate_params(module, key_details["KeyMetadata"])
- module.fail_json.assert_called_with(
- msg="You cannot change the multi-region property on an existing key."
- )
+ module.fail_json.assert_called_with(msg="You cannot change the multi-region property on an existing key.")
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py
index 451a61766..cd3032ef7 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py
@@ -4,12 +4,12 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
import pytest
-from unittest.mock import MagicMock, call, patch
from ansible_collections.amazon.aws.plugins.modules import lambda_layer
@@ -19,155 +19,120 @@ def raise_lambdalayer_exception(e=None, m=None):
return lambda_layer.LambdaLayerFailure(exc=e, msg=m)
-mod_list_layer = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer.list_layer_versions'
-mod_create_layer = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer.create_layer_version'
-mod_delete_layer = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer.delete_layer_version'
+mod_list_layer = "ansible_collections.amazon.aws.plugins.modules.lambda_layer.list_layer_versions"
+mod_create_layer = "ansible_collections.amazon.aws.plugins.modules.lambda_layer.create_layer_version"
+mod_delete_layer = "ansible_collections.amazon.aws.plugins.modules.lambda_layer.delete_layer_version"
@pytest.mark.parametrize(
- "params,api_result,calls,ansible_result",
+ "params,api_result,calls,_ansible_result",
[
+ ({"name": "testlayer", "version": 4}, [], [], {"changed": False, "layer_versions": []}),
(
- {
- "name": "testlayer",
- "version": 4
- },
- [],
- [],
- {"changed": False, "layer_versions": []}
- ),
- (
- {
- "name": "testlayer",
- "version": 4
- },
+ {"name": "testlayer", "version": 4},
[
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
},
{
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
- }
+ "version": 1,
+ },
],
[],
- {"changed": False, "layer_versions": []}
+ {"changed": False, "layer_versions": []},
),
(
- {
- "name": "testlayer",
- "version": 2
- },
+ {"name": "testlayer", "version": 2},
[
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
},
{
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
- }
- ],
- [
- call(LayerName='testlayer', VersionNumber=2)
+ "version": 1,
+ },
],
+ [call(LayerName="testlayer", VersionNumber=2)],
{
"changed": True,
"layer_versions": [
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
}
- ]
- }
+ ],
+ },
),
(
- {
- "name": "testlayer",
- "version": -1
- },
+ {"name": "testlayer", "version": -1},
[
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
},
{
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
- }
- ],
- [
- call(LayerName='testlayer', VersionNumber=2),
- call(LayerName='testlayer', VersionNumber=1)
+ "version": 1,
+ },
],
+ [call(LayerName="testlayer", VersionNumber=2), call(LayerName="testlayer", VersionNumber=1)],
{
"changed": True,
"layer_versions": [
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
},
{
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
- }
- ]
- }
- )
- ]
+ "version": 1,
+ },
+ ],
+ },
+ ),
+ ],
)
@patch(mod_list_layer)
-def test_delete_layer(m_list_layer, params, api_result, calls, ansible_result):
-
+def test_delete_layer(m_list_layer, params, api_result, calls, _ansible_result):
lambda_client = MagicMock()
lambda_client.delete_layer_version.return_value = None
m_list_layer.return_value = api_result
result = lambda_layer.delete_layer_version(lambda_client, params)
- assert result == ansible_result
+ assert result == _ansible_result
- m_list_layer.assert_called_once_with(
- lambda_client, params.get("name")
- )
+ m_list_layer.assert_called_once_with(lambda_client, params.get("name"))
if not calls:
lambda_client.delete_layer_version.assert_not_called()
@@ -177,62 +142,54 @@ def test_delete_layer(m_list_layer, params, api_result, calls, ansible_result):
@patch(mod_list_layer)
def test_delete_layer_check_mode(m_list_layer):
-
lambda_client = MagicMock()
lambda_client.delete_layer_version.return_value = None
m_list_layer.return_value = [
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
},
{
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
- }
+ "version": 1,
+ },
]
params = {"name": "testlayer", "version": -1}
result = lambda_layer.delete_layer_version(lambda_client, params, check_mode=True)
- ansible_result = {
+ _ansible_result = {
"changed": True,
"layer_versions": [
{
- 'compatible_runtimes': ["python3.7"],
- 'created_date': "2022-09-29T10:31:35.977+0000",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
+ "compatible_runtimes": ["python3.7"],
+ "created_date": "2022-09-29T10:31:35.977+0000",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2",
"license_info": "MIT",
- 'version': 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "version": 2,
+ "compatible_architectures": ["arm64"],
},
{
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
- }
- ]
+ "version": 1,
+ },
+ ],
}
- assert result == ansible_result
+ assert result == _ansible_result
- m_list_layer.assert_called_once_with(
- lambda_client, params.get("name")
- )
+ m_list_layer.assert_called_once_with(lambda_client, params.get("name"))
lambda_client.delete_layer_version.assert_not_called()
@patch(mod_list_layer)
def test_delete_layer_failure(m_list_layer):
-
lambda_client = MagicMock()
lambda_client.delete_layer_version.side_effect = raise_lambdalayer_exception()
@@ -241,7 +198,7 @@ def test_delete_layer_failure(m_list_layer):
"created_date": "2022-09-29T10:31:26.341+0000",
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1",
- "version": 1
+ "version": 1,
}
]
params = {"name": "testlayer", "version": 1}
@@ -249,42 +206,38 @@ def test_delete_layer_failure(m_list_layer):
lambda_layer.delete_layer_version(lambda_client, params)
-@pytest.mark.parametrize(
- "b_s3content",
- [
- (True),
- (False)
- ]
-)
+@pytest.mark.parametrize("b_s3content", [(True), (False)])
@patch(mod_list_layer)
def test_create_layer(m_list_layer, b_s3content, tmp_path):
params = {
"name": "testlayer",
"description": "ansible units testing sample layer",
"content": {},
- "license_info": "MIT"
+ "license_info": "MIT",
}
lambda_client = MagicMock()
lambda_client.publish_layer_version.return_value = {
- 'CompatibleRuntimes': [
- 'python3.6',
- 'python3.7',
+ "CompatibleRuntimes": [
+ "python3.6",
+ "python3.7",
],
- 'Content': {
- 'CodeSha256': 'tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=',
- 'CodeSize': 169,
- 'Location': 'https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb',
+ "Content": {
+ "CodeSha256": "tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=",
+ "CodeSize": 169,
+ "Location": (
+ "https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb"
+ ),
},
- 'CreatedDate': '2018-11-14T23:03:52.894+0000',
- 'Description': "ansible units testing sample layer",
- 'LayerArn': 'arn:aws:lambda:us-west-2:123456789012:layer:my-layer',
- 'LayerVersionArn': 'arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1',
- 'LicenseInfo': 'MIT',
- 'Version': 1,
- 'ResponseMetadata': {
- 'http_header': 'true',
+ "CreatedDate": "2018-11-14T23:03:52.894+0000",
+ "Description": "ansible units testing sample layer",
+ "LayerArn": "arn:aws:lambda:us-west-2:123456789012:layer:my-layer",
+ "LayerVersionArn": "arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1",
+ "LicenseInfo": "MIT",
+ "Version": 1,
+ "ResponseMetadata": {
+ "http_header": "true",
},
}
@@ -292,33 +245,25 @@ def test_create_layer(m_list_layer, b_s3content, tmp_path):
"changed": True,
"layer_versions": [
{
- 'compatible_runtimes': ['python3.6', 'python3.7'],
- 'content': {
- 'code_sha256': 'tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=',
- 'code_size': 169,
- 'location': 'https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb'
+ "compatible_runtimes": ["python3.6", "python3.7"],
+ "content": {
+ "code_sha256": "tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=",
+ "code_size": 169,
+ "location": "https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb",
},
- 'created_date': '2018-11-14T23:03:52.894+0000',
- 'description': 'ansible units testing sample layer',
- 'layer_arn': 'arn:aws:lambda:us-west-2:123456789012:layer:my-layer',
- 'layer_version_arn': 'arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1',
- 'license_info': 'MIT',
- 'version': 1
+ "created_date": "2018-11-14T23:03:52.894+0000",
+ "description": "ansible units testing sample layer",
+ "layer_arn": "arn:aws:lambda:us-west-2:123456789012:layer:my-layer",
+ "layer_version_arn": "arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1",
+ "license_info": "MIT",
+ "version": 1,
}
- ]
+ ],
}
if b_s3content:
- params["content"] = {
- "s3_bucket": "mybucket",
- "s3_key": "mybucket-key",
- "s3_object_version": "v1"
- }
- content_arg = {
- "S3Bucket": "mybucket",
- "S3Key": "mybucket-key",
- "S3ObjectVersion": "v1"
- }
+ params["content"] = {"s3_bucket": "mybucket", "s3_key": "mybucket-key", "s3_object_version": "v1"}
+ content_arg = {"S3Bucket": "mybucket", "S3Key": "mybucket-key", "S3ObjectVersion": "v1"}
else:
binary_data = b"simple lambda layer content"
test_dir = tmp_path / "lambda_layer"
@@ -350,12 +295,8 @@ def test_create_layer_check_mode(m_list_layer):
params = {
"name": "testlayer",
"description": "ansible units testing sample layer",
- "content": {
- "s3_bucket": "mybucket",
- "s3_key": "mybucket-key",
- "s3_object_version": "v1"
- },
- "license_info": "MIT"
+ "content": {"s3_bucket": "mybucket", "s3_key": "mybucket-key", "s3_object_version": "v1"},
+ "license_info": "MIT",
}
lambda_client = MagicMock()
@@ -371,19 +312,9 @@ def test_create_layer_failure():
params = {
"name": "testlayer",
"description": "ansible units testing sample layer",
- "content": {
- "s3_bucket": "mybucket",
- "s3_key": "mybucket-key",
- "s3_object_version": "v1"
- },
- "compatible_runtimes": [
- "nodejs",
- "python3.9"
- ],
- "compatible_architectures": [
- 'x86_64',
- 'arm64'
- ]
+ "content": {"s3_bucket": "mybucket", "s3_key": "mybucket-key", "s3_object_version": "v1"},
+ "compatible_runtimes": ["nodejs", "python3.9"],
+ "compatible_architectures": ["x86_64", "arm64"],
}
lambda_client = MagicMock()
lambda_client.publish_layer_version.side_effect = raise_lambdalayer_exception()
@@ -399,14 +330,8 @@ def test_create_layer_using_unexisting_file():
"content": {
"zip_file": "this_file_does_not_exist",
},
- "compatible_runtimes": [
- "nodejs",
- "python3.9"
- ],
- "compatible_architectures": [
- 'x86_64',
- 'arm64'
- ]
+ "compatible_runtimes": ["nodejs", "python3.9"],
+ "compatible_architectures": ["x86_64", "arm64"],
}
lambda_client = MagicMock()
@@ -421,28 +346,15 @@ def test_create_layer_using_unexisting_file():
@pytest.mark.parametrize(
"params,failure",
[
- (
- {"name": "test-layer"},
- False
- ),
- (
- {"name": "test-layer", "state": "absent"},
- False
- ),
- (
- {"name": "test-layer"},
- True
- ),
- (
- {"name": "test-layer", "state": "absent"},
- True
- ),
- ]
+ ({"name": "test-layer"}, False),
+ ({"name": "test-layer", "state": "absent"}, False),
+ ({"name": "test-layer"}, True),
+ ({"name": "test-layer", "state": "absent"}, True),
+ ],
)
@patch(mod_create_layer)
@patch(mod_delete_layer)
def test_execute_module(m_delete_layer, m_create_layer, params, failure):
-
module = MagicMock()
module.params = params
module.check_mode = False
@@ -462,9 +374,7 @@ def test_execute_module(m_delete_layer, m_create_layer, params, failure):
module.exit_json.assert_called_with(**result)
module.fail_json_aws.assert_not_called()
- m_create_layer.assert_called_with(
- lambda_client, params, module.check_mode
- )
+ m_create_layer.assert_called_with(lambda_client, params, module.check_mode)
m_delete_layer.assert_not_called()
elif state == "absent":
@@ -474,9 +384,7 @@ def test_execute_module(m_delete_layer, m_create_layer, params, failure):
module.exit_json.assert_called_with(**result)
module.fail_json_aws.assert_not_called()
- m_delete_layer.assert_called_with(
- lambda_client, params, module.check_mode
- )
+ m_delete_layer.assert_called_with(lambda_client, params, module.check_mode)
m_create_layer.assert_not_called()
else:
exc = "lambdalayer_execute_module_exception"
@@ -488,6 +396,4 @@ def test_execute_module(m_delete_layer, m_create_layer, params, failure):
lambda_layer.execute_module(module, lambda_client)
module.exit_json.assert_not_called()
- module.fail_json_aws.assert_called_with(
- exc, msg=msg
- )
+ module.fail_json_aws.assert_called_with(exc, msg=msg)
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py
index 25a1f15ac..201625401 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py
@@ -4,104 +4,85 @@
# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
import pytest
from botocore.exceptions import BotoCoreError
-from unittest.mock import MagicMock, call, patch
from ansible_collections.amazon.aws.plugins.modules import lambda_layer_info
-
-mod__list_layer_versions = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layer_versions'
-mod__list_layers = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layers'
-mod_list_layer_versions = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layer_versions'
-mod_list_layers = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layers'
+mod__list_layer_versions = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layer_versions"
+mod__list_layers = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layers"
+mod_list_layer_versions = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layer_versions"
+mod_list_layers = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layers"
list_layers_paginate_result = {
- 'NextMarker': '002',
- 'Layers': [
+ "NextMarker": "002",
+ "Layers": [
{
- 'LayerName': "test-layer-01",
- 'LayerArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01",
- 'LatestMatchingVersion': {
- 'LayerVersionArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1",
- 'Version': 1,
- 'Description': "lambda layer created for unit tests",
- 'CreatedDate': "2022-09-29T10:31:26.341+0000",
- 'CompatibleRuntimes': [
- 'nodejs',
- 'nodejs4.3',
- 'nodejs6.10'
- ],
- 'LicenseInfo': 'MIT',
- 'CompatibleArchitectures': [
- 'arm64'
- ]
- }
+ "LayerName": "test-layer-01",
+ "LayerArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01",
+ "LatestMatchingVersion": {
+ "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1",
+ "Version": 1,
+ "Description": "lambda layer created for unit tests",
+ "CreatedDate": "2022-09-29T10:31:26.341+0000",
+ "CompatibleRuntimes": ["nodejs", "nodejs4.3", "nodejs6.10"],
+ "LicenseInfo": "MIT",
+ "CompatibleArchitectures": ["arm64"],
+ },
},
{
- 'LayerName': "test-layer-02",
- 'LayerArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02",
- 'LatestMatchingVersion': {
- 'LayerVersionArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1",
- 'Version': 1,
- 'CreatedDate': "2022-09-29T10:31:26.341+0000",
- 'CompatibleArchitectures': [
- 'arm64'
- ]
- }
+ "LayerName": "test-layer-02",
+ "LayerArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02",
+ "LatestMatchingVersion": {
+ "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1",
+ "Version": 1,
+ "CreatedDate": "2022-09-29T10:31:26.341+0000",
+ "CompatibleArchitectures": ["arm64"],
+ },
},
],
- 'ResponseMetadata': {
- 'http': 'true',
+ "ResponseMetadata": {
+ "http": "true",
},
}
list_layers_result = [
{
- 'layer_name': "test-layer-01",
- 'layer_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1",
- 'version': 1,
- 'description': "lambda layer created for unit tests",
- 'created_date': "2022-09-29T10:31:26.341+0000",
- 'compatible_runtimes': [
- 'nodejs',
- 'nodejs4.3',
- 'nodejs6.10'
- ],
- 'license_info': 'MIT',
- 'compatible_architectures': [
- 'arm64'
- ]
+ "layer_name": "test-layer-01",
+ "layer_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1",
+ "version": 1,
+ "description": "lambda layer created for unit tests",
+ "created_date": "2022-09-29T10:31:26.341+0000",
+ "compatible_runtimes": ["nodejs", "nodejs4.3", "nodejs6.10"],
+ "license_info": "MIT",
+ "compatible_architectures": ["arm64"],
},
{
- 'layer_name': "test-layer-02",
- 'layer_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02",
- 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1",
- 'version': 1,
- 'created_date': "2022-09-29T10:31:26.341+0000",
- 'compatible_architectures': [
- 'arm64'
- ]
- }
+ "layer_name": "test-layer-02",
+ "layer_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02",
+ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1",
+ "version": 1,
+ "created_date": "2022-09-29T10:31:26.341+0000",
+ "compatible_architectures": ["arm64"],
+ },
]
list_layers_versions_paginate_result = {
- 'LayerVersions': [
+ "LayerVersions": [
{
- 'CompatibleRuntimes': ["python3.7"],
- 'CreatedDate': "2022-09-29T10:31:35.977+0000",
- 'LayerVersionArn': "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:2",
+ "CompatibleRuntimes": ["python3.7"],
+ "CreatedDate": "2022-09-29T10:31:35.977+0000",
+ "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:2",
"LicenseInfo": "MIT",
- 'Version': 2,
- 'CompatibleArchitectures': [
- 'arm64'
- ]
+ "Version": 2,
+ "CompatibleArchitectures": ["arm64"],
},
{
"CompatibleRuntimes": ["python3.7"],
@@ -109,13 +90,13 @@ list_layers_versions_paginate_result = {
"Description": "lambda layer first version",
"LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:1",
"LicenseInfo": "GPL-3.0-only",
- "Version": 1
- }
+ "Version": 1,
+ },
],
- 'ResponseMetadata': {
- 'http': 'true',
+ "ResponseMetadata": {
+ "http": "true",
},
- 'NextMarker': '001',
+ "NextMarker": "001",
}
@@ -126,9 +107,7 @@ list_layers_versions_result = [
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:2",
"license_info": "MIT",
"version": 2,
- 'compatible_architectures': [
- 'arm64'
- ]
+ "compatible_architectures": ["arm64"],
},
{
"compatible_runtimes": ["python3.7"],
@@ -136,8 +115,8 @@ list_layers_versions_result = [
"description": "lambda layer first version",
"layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:1",
"license_info": "GPL-3.0-only",
- "version": 1
- }
+ "version": 1,
+ },
]
@@ -145,14 +124,8 @@ list_layers_versions_result = [
"params,call_args",
[
(
- {
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- },
- {
- "CompatibleRuntime": "nodejs",
- "CompatibleArchitecture": "arm64"
- }
+ {"compatible_runtime": "nodejs", "compatible_architecture": "arm64"},
+ {"CompatibleRuntime": "nodejs", "CompatibleArchitecture": "arm64"},
),
(
{
@@ -160,34 +133,20 @@ list_layers_versions_result = [
},
{
"CompatibleRuntime": "nodejs",
- }
- ),
- (
- {
- "compatible_architecture": "arm64"
},
- {
- "CompatibleArchitecture": "arm64"
- }
),
- (
- {}, {}
- )
- ]
+ ({"compatible_architecture": "arm64"}, {"CompatibleArchitecture": "arm64"}),
+ ({}, {}),
+ ],
)
@patch(mod__list_layers)
def test_list_layers_with_latest_version(m__list_layers, params, call_args):
-
lambda_client = MagicMock()
m__list_layers.return_value = list_layers_paginate_result
layers = lambda_layer_info.list_layers(lambda_client, **params)
- m__list_layers.assert_has_calls(
- [
- call(lambda_client, **call_args)
- ]
- )
+ m__list_layers.assert_has_calls([call(lambda_client, **call_args)])
assert layers == list_layers_result
@@ -195,16 +154,8 @@ def test_list_layers_with_latest_version(m__list_layers, params, call_args):
"params,call_args",
[
(
- {
- "name": "layer-01",
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- },
- {
- "LayerName": "layer-01",
- "CompatibleRuntime": "nodejs",
- "CompatibleArchitecture": "arm64"
- }
+ {"name": "layer-01", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"},
+ {"LayerName": "layer-01", "CompatibleRuntime": "nodejs", "CompatibleArchitecture": "arm64"},
),
(
{
@@ -214,36 +165,23 @@ def test_list_layers_with_latest_version(m__list_layers, params, call_args):
{
"LayerName": "layer-01",
"CompatibleRuntime": "nodejs",
- }
- ),
- (
- {
- "name": "layer-01",
- "compatible_architecture": "arm64"
},
- {
- "LayerName": "layer-01",
- "CompatibleArchitecture": "arm64"
- }
),
(
- {"name": "layer-01"}, {"LayerName": "layer-01"}
- )
- ]
+ {"name": "layer-01", "compatible_architecture": "arm64"},
+ {"LayerName": "layer-01", "CompatibleArchitecture": "arm64"},
+ ),
+ ({"name": "layer-01"}, {"LayerName": "layer-01"}),
+ ],
)
@patch(mod__list_layer_versions)
def test_list_layer_versions(m__list_layer_versions, params, call_args):
-
lambda_client = MagicMock()
m__list_layer_versions.return_value = list_layers_versions_paginate_result
layers = lambda_layer_info.list_layer_versions(lambda_client, **params)
- m__list_layer_versions.assert_has_calls(
- [
- call(lambda_client, **call_args)
- ]
- )
+ m__list_layer_versions.assert_has_calls([call(lambda_client, **call_args)])
assert layers == list_layers_versions_result
@@ -251,28 +189,69 @@ def raise_botocore_exception():
return BotoCoreError(error="failed", operation="list_layers")
+def test_get_layer_version_success():
+ aws_layer_version = {
+ "CompatibleRuntimes": ["python3.8"],
+ "Content": {
+ "CodeSha256": "vqxKx6nTW31obVcB4MYaTWv5H3fBQTn2PHklL9+mF9E=",
+ "CodeSize": 9492621,
+ "Location": "https://test.s3.us-east-1.amazonaws.com/snapshots/123456789012/test-79b29d149e06?versionId=nmEKA3ZgiP7hce3J",
+ },
+ "CreatedDate": "2022-12-05T10:47:32.379+0000",
+ "Description": "Python units test layer",
+ "LayerArn": "arn:aws:lambda:us-east-1:123456789012:layer:test",
+ "LayerVersionArn": "arn:aws:lambda:us-east-1:123456789012:layer:test:2",
+ "LicenseInfo": "GPL-3.0-only",
+ "Version": 2,
+ "ResponseMetadata": {"some-metadata": "some-result"},
+ }
+
+ ansible_layer_version = {
+ "compatible_runtimes": ["python3.8"],
+ "content": {
+ "code_sha256": "vqxKx6nTW31obVcB4MYaTWv5H3fBQTn2PHklL9+mF9E=",
+ "code_size": 9492621,
+ "location": "https://test.s3.us-east-1.amazonaws.com/snapshots/123456789012/test-79b29d149e06?versionId=nmEKA3ZgiP7hce3J",
+ },
+ "created_date": "2022-12-05T10:47:32.379+0000",
+ "description": "Python units test layer",
+ "layer_arn": "arn:aws:lambda:us-east-1:123456789012:layer:test",
+ "layer_version_arn": "arn:aws:lambda:us-east-1:123456789012:layer:test:2",
+ "license_info": "GPL-3.0-only",
+ "version": 2,
+ }
+
+ lambda_client = MagicMock()
+ lambda_client.get_layer_version.return_value = aws_layer_version
+
+ layer_name = "test"
+ layer_version = 2
+
+ assert [ansible_layer_version] == lambda_layer_info.get_layer_version(lambda_client, layer_name, layer_version)
+ lambda_client.get_layer_version.assert_called_once_with(LayerName=layer_name, VersionNumber=layer_version)
+
+
+def test_get_layer_version_failure():
+ lambda_client = MagicMock()
+ lambda_client.get_layer_version.side_effect = raise_botocore_exception()
+
+ layer_name = MagicMock()
+ layer_version = MagicMock()
+
+ with pytest.raises(lambda_layer_info.LambdaLayerInfoFailure):
+ lambda_layer_info.get_layer_version(lambda_client, layer_name, layer_version)
+
+
@pytest.mark.parametrize(
"params",
[
- (
- {
- "name": "test-layer",
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- }
- ),
- (
- {
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- }
- )
- ]
+ ({"name": "test-layer", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}),
+ ({"compatible_runtime": "nodejs", "compatible_architecture": "arm64"}),
+ ],
)
@patch(mod__list_layers)
@patch(mod__list_layer_versions)
def test_list_layers_with_failure(m__list_layer_versions, m__list_layers, params):
-
lambda_client = MagicMock()
if "name" in params:
@@ -293,35 +272,14 @@ def raise_layer_info_exception(exc, msg):
@pytest.mark.parametrize(
"params,failure",
[
- (
- {
- "name": "test-layer",
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- },
- False
- ),
- (
- {
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- },
- False
- ),
- (
- {
- "name": "test-layer",
- "compatible_runtime": "nodejs",
- "compatible_architecture": "arm64"
- },
- True
- )
- ]
+ ({"name": "test-layer", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, False),
+ ({"compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, False),
+ ({"name": "test-layer", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, True),
+ ],
)
@patch(mod_list_layers)
@patch(mod_list_layer_versions)
def test_execute_module(m_list_layer_versions, m_list_layers, params, failure):
-
lambda_client = MagicMock()
module = MagicMock()
@@ -351,8 +309,6 @@ def test_execute_module(m_list_layer_versions, m_list_layers, params, failure):
with pytest.raises(SystemExit):
lambda_layer_info.execute_module(module, lambda_client)
- module.exit_json.assert_called_with(
- changed=False, layers_versions=result
- )
+ module.exit_json.assert_called_with(changed=False, layers_versions=result)
method_called.assert_called_with(lambda_client, **params)
method_not_called.list_layers.assert_not_called()
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py
new file mode 100644
index 000000000..8db20f1a0
--- /dev/null
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py
@@ -0,0 +1,121 @@
+# (c) 2022 Red Hat Inc.
+#
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from unittest.mock import ANY
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
+
+import botocore.exceptions
+import pytest
+
+from ansible_collections.amazon.aws.plugins.modules import rds_instance_info
+
+mod_name = "ansible_collections.amazon.aws.plugins.modules.rds_instance_info"
+
+
+def a_boto_exception():
+ return botocore.exceptions.UnknownServiceError(service_name="Whoops", known_service_names="Oula")
+
+
+@patch(mod_name + "._describe_db_instances")
+@patch(mod_name + ".get_instance_tags")
+def test_instance_info_one_instance(m_get_instance_tags, m_describe_db_instances):
+ conn = MagicMock()
+ instance_name = "my-instance"
+ m_get_instance_tags.return_value = []
+ m_describe_db_instances.return_value = [
+ {
+ "DBInstanceIdentifier": instance_name,
+ "DBInstanceArn": "arn:aws:rds:us-east-2:123456789012:og:" + instance_name,
+ }
+ ]
+ rds_instance_info.instance_info(conn, instance_name, filters={})
+
+ m_describe_db_instances.assert_called_with(conn, DBInstanceIdentifier=instance_name)
+ m_get_instance_tags.assert_called_with(conn, arn="arn:aws:rds:us-east-2:123456789012:og:" + instance_name)
+
+
+@patch(mod_name + "._describe_db_instances")
+@patch(mod_name + ".get_instance_tags")
+def test_instance_info_all_instances(m_get_instance_tags, m_describe_db_instances):
+ conn = MagicMock()
+ m_get_instance_tags.return_value = []
+ m_describe_db_instances.return_value = [
+ {
+ "DBInstanceIdentifier": "first-instance",
+ "DBInstanceArn": "arn:aws:rds:us-east-2:123456789012:og:first-instance",
+ },
+ {
+ "DBInstanceIdentifier": "second-instance",
+ "DBInstanceArn": "arn:aws:rds:us-east-2:123456789012:og:second-instance",
+ },
+ ]
+ rds_instance_info.instance_info(conn, instance_name=None, filters={"engine": "postgres"})
+
+ m_describe_db_instances.assert_called_with(conn, Filters=[{"Name": "engine", "Values": ["postgres"]}])
+ assert m_get_instance_tags.call_count == 2
+ m_get_instance_tags.assert_has_calls(
+ [
+ call(conn, arn="arn:aws:rds:us-east-2:123456789012:og:first-instance"),
+ call(conn, arn="arn:aws:rds:us-east-2:123456789012:og:second-instance"),
+ ]
+ )
+
+
+def test_get_instance_tags():
+ conn = MagicMock()
+ conn.list_tags_for_resource.return_value = {
+ "TagList": [
+ {"Key": "My-tag", "Value": "the-value$"},
+ ],
+ "NextToken": "some-token",
+ }
+
+ tags = rds_instance_info.get_instance_tags(conn, "arn:aws:rds:us-east-2:123456789012:og:second-instance")
+ conn.list_tags_for_resource.assert_called_with(
+ ResourceName="arn:aws:rds:us-east-2:123456789012:og:second-instance",
+ aws_retry=True,
+ )
+ assert tags == {"My-tag": "the-value$"}
+
+
+def test_api_failure_get_tag():
+ conn = MagicMock()
+ conn.list_tags_for_resource.side_effect = a_boto_exception()
+
+ with pytest.raises(rds_instance_info.RdsInstanceInfoFailure):
+ rds_instance_info.get_instance_tags(conn, "arn:blabla")
+
+
+def test_api_failure_describe():
+ conn = MagicMock()
+ conn.get_paginator.side_effect = a_boto_exception()
+
+ with pytest.raises(rds_instance_info.RdsInstanceInfoFailure):
+ rds_instance_info.instance_info(conn, None, {})
+
+
+@patch(mod_name + ".AnsibleAWSModule")
+def test_main_success(m_AnsibleAWSModule):
+ m_module = MagicMock()
+ m_AnsibleAWSModule.return_value = m_module
+
+ rds_instance_info.main()
+
+ m_module.client.assert_called_with("rds", retry_decorator=ANY)
+ m_module.exit_json.assert_called_with(changed=False, instances=[])
+
+
+@patch(mod_name + "._describe_db_instances")
+@patch(mod_name + ".AnsibleAWSModule")
+def test_main_failure(m_AnsibleAWSModule, m_describe_db_instances):
+ m_module = MagicMock()
+ m_AnsibleAWSModule.return_value = m_module
+ m_describe_db_instances.side_effect = a_boto_exception()
+
+ rds_instance_info.main()
+
+ m_module.client.assert_called_with("rds", retry_decorator=ANY)
+ m_module.fail_json_aws.assert_called_with(ANY, "Couldn't get instance information")
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py
index b02513072..deeb1c4a0 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py
@@ -1,29 +1,156 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+# (c) 2022 Red Hat Inc.
-from ansible.module_utils.six.moves.urllib.parse import urlparse
+# This file is part of Ansible
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import botocore.exceptions
+import pytest
from ansible_collections.amazon.aws.plugins.modules import s3_object
+module_name = "ansible_collections.amazon.aws.plugins.modules.s3_object"
+utils = "ansible_collections.amazon.aws.plugins.module_utils.ec2"
+
+
+@patch(module_name + ".paginated_list")
+def test_list_keys_success(m_paginated_list):
+ s3 = MagicMock()
+
+ m_paginated_list.return_value = ["delete.txt"]
+
+ assert ["delete.txt"] == s3_object.list_keys(s3, "a987e6b6026ab04e4717", "", "", 1000)
+ m_paginated_list.assert_called_once()
+
+
+@patch(module_name + ".paginated_list")
+def test_list_keys_failure(m_paginated_list):
+ s3 = MagicMock()
+
+ m_paginated_list.side_effect = botocore.exceptions.BotoCoreError
+
+ with pytest.raises(s3_object.S3ObjectFailure):
+ s3_object.list_keys(s3, "a987e6b6026ab04e4717", "", "", 1000)
+
+
+@patch(module_name + ".delete_key")
+def test_s3_object_do_delobj_success(m_delete_key):
+ module = MagicMock()
+ s3 = MagicMock()
+ var_dict = {
+ "object": "/usr/local/myfile.txt",
+ "bucket": "a987e6b6026ab04e4717",
+ }
+ s3_object.s3_object_do_delobj(module, s3, s3, var_dict)
+ assert m_delete_key.call_count == 1
+ module.exit_json.assert_called_with(msg="Object deleted from bucket a987e6b6026ab04e4717.", changed=True)
+
+
+@patch(module_name + ".delete_key")
+def test_s3_object_do_delobj_failure_nobucket(m_delete_key):
+ module = MagicMock()
+ s3 = MagicMock()
+
+ var_dict = {"object": "/usr/local/myfile.txt", "bucket": ""}
+ s3_object.s3_object_do_delobj(module, s3, s3, var_dict)
+ assert m_delete_key.call_count == 0
+ module.fail_json.assert_called_with(msg="Bucket parameter is required.")
+
+
+@patch(module_name + ".delete_key")
+def test_s3_object_do_delobj_failure_noobj(m_delete_key):
+ module = MagicMock()
+ s3 = MagicMock()
+ var_dict = {"bucket": "a987e6b6026ab04e4717", "object": ""}
+ s3_object.s3_object_do_delobj(module, s3, s3, var_dict)
+ assert m_delete_key.call_count == 0
+ module.fail_json.assert_called_with(msg="object parameter is required")
+
+
+@patch(module_name + ".paginated_list")
+@patch(module_name + ".list_keys")
+def test_s3_object_do_list_success(m_paginated_list, m_list_keys):
+ module = MagicMock()
+ s3 = MagicMock()
+
+ m_paginated_list.return_value = ["delete.txt"]
+ var_dict = {
+ "bucket": "a987e6b6026ab04e4717",
+ "prefix": "",
+ "marker": "",
+ "max_keys": 1000,
+ "bucketrtn": True,
+ }
+
+ s3_object.s3_object_do_list(module, s3, s3, var_dict)
+ assert m_paginated_list.call_count == 1
+ # assert m_list_keys.call_count == 1
+ # module.exit_json.assert_called_with(msg="LIST operation complete", s3_keys=['delete.txt'])
+
+
+@patch(utils + ".get_aws_connection_info")
+def test_populate_params(m_get_aws_connection_info):
+ module = MagicMock()
+ m_get_aws_connection_info.return_value = (
+ "us-east-1",
+ None,
+ {
+ "aws_access_key_id": "xxxx",
+ "aws_secret_access_key": "yyyy",
+ "aws_session_token": "zzzz",
+ "verify": True,
+ },
+ )
-class TestUrlparse():
+ module.params = {
+ "bucket": "4a6cfe3c17b798613fa77b462e402984",
+ "ceph": False,
+ "content": None,
+ "content_base64": None,
+ "copy_src": None,
+ "debug_botocore_endpoint_logs": True,
+ "dest": None,
+ "dualstack": False,
+ "encrypt": True,
+ "encryption_kms_key_id": None,
+ "encryption_mode": "AES256",
+ "endpoint_url": None,
+ "expiry": 600,
+ "headers": None,
+ "ignore_nonexistent_bucket": False,
+ "marker": "",
+ "max_keys": 1000,
+ "metadata": None,
+ "mode": "create",
+ "object": None,
+ "overwrite": "latest",
+ "permission": ["private"],
+ "prefix": "",
+ "profile": None,
+ "purge_tags": True,
+ "region": "us-east-1",
+ "retries": 0,
+ "sig_v4": True,
+ "src": None,
+ "tags": None,
+ "validate_bucket_name": False,
+ "validate_certs": True,
+ "version": None,
+ }
+ result = s3_object.populate_params(module)
+ for k, v in module.params.items():
+ assert result[k] == v
- def test_urlparse(self):
- actual = urlparse("http://test.com/here")
- assert actual.scheme == "http"
- assert actual.netloc == "test.com"
- assert actual.path == "/here"
+ module.params.update({"object": "example.txt", "mode": "get"})
+ result = s3_object.populate_params(module)
+ assert result["object"] == "example.txt"
- def test_is_fakes3(self):
- actual = s3_object.is_fakes3("fakes3://bla.blubb")
- assert actual is True
+ module.params.update({"object": "/example.txt", "mode": "get"})
+ result = s3_object.populate_params(module)
+ assert result["object"] == "example.txt"
- def test_get_s3_connection(self):
- aws_connect_kwargs = dict(aws_access_key_id="access_key",
- aws_secret_access_key="secret_key")
- location = None
- rgw = True
- s3_url = "http://bla.blubb"
- actual = s3_object.get_s3_connection(None, aws_connect_kwargs, location, rgw, s3_url)
- assert "bla.blubb" in str(actual._endpoint)
+ module.params.update({"object": "example.txt", "mode": "delete"})
+ result = s3_object.populate_params(module)
+ module.fail_json.assert_called_with(msg="Parameter object cannot be used with mode=delete")
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py
index 058a5b605..72b3b887e 100644
--- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py
+++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py
@@ -1,21 +1,18 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
import json
+import unittest
+from unittest.mock import patch
-from ansible_collections.amazon.aws.tests.unit.compat import unittest
-from ansible_collections.amazon.aws.tests.unit.compat.mock import patch
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
def set_module_args(args):
- if '_ansible_remote_tmp' not in args:
- args['_ansible_remote_tmp'] = '/tmp'
- if '_ansible_keep_remote_files' not in args:
- args['_ansible_keep_remote_files'] = False
+ if "_ansible_remote_tmp" not in args:
+ args["_ansible_remote_tmp"] = "/tmp"
+ if "_ansible_keep_remote_files" not in args:
+ args["_ansible_keep_remote_files"] = False
- args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ args = json.dumps({"ANSIBLE_MODULE_ARGS": args})
basic._ANSIBLE_ARGS = to_bytes(args)
@@ -28,22 +25,21 @@ class AnsibleFailJson(Exception):
def exit_json(*args, **kwargs):
- if 'changed' not in kwargs:
- kwargs['changed'] = False
+ if "changed" not in kwargs:
+ kwargs["changed"] = False
raise AnsibleExitJson(kwargs)
def fail_json(*args, **kwargs):
- kwargs['failed'] = True
+ kwargs["failed"] = True
raise AnsibleFailJson(kwargs)
class ModuleTestCase(unittest.TestCase):
-
def setUp(self):
self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
self.mock_module.start()
- self.mock_sleep = patch('time.sleep')
+ self.mock_sleep = patch("time.sleep")
self.mock_sleep.start()
set_module_args({})
self.addCleanup(self.mock_module.stop)