diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-18 05:52:35 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-18 05:52:35 +0000 |
commit | 7fec0b69a082aaeec72fee0612766aa42f6b1b4d (patch) | |
tree | efb569b86ca4da888717f5433e757145fa322e08 /ansible_collections/amazon/aws/tests/unit/plugins | |
parent | Releasing progress-linux version 7.7.0+dfsg-3~progress7.99u1. (diff) | |
download | ansible-7fec0b69a082aaeec72fee0612766aa42f6b1b4d.tar.xz ansible-7fec0b69a082aaeec72fee0612766aa42f6b1b4d.zip |
Merging upstream version 9.4.0+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/amazon/aws/tests/unit/plugins')
37 files changed, 4623 insertions, 1213 deletions
diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py index 5386fe6c7..8cced1662 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_ec2.py @@ -17,96 +17,25 @@ # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch import pytest -import datetime -from unittest.mock import Mock, MagicMock + +try: + import botocore +except ImportError: + # Handled by HAS_BOTO3 + pass from ansible.errors import AnsibleError -from ansible.parsing.dataloader import DataLoader -from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import InventoryModule, instance_data_filter_to_boto_attr - - -instances = { - 'Instances': [ - {'Monitoring': {'State': 'disabled'}, - 'PublicDnsName': 'ec2-12-345-67-890.compute-1.amazonaws.com', - 'State': {'Code': 16, 'Name': 'running'}, - 'EbsOptimized': False, - 'LaunchTime': datetime.datetime(2017, 10, 31, 12, 59, 25), - 'PublicIpAddress': '12.345.67.890', - 'PrivateIpAddress': '098.76.54.321', - 'ProductCodes': [], - 'VpcId': 'vpc-12345678', - 'StateTransitionReason': '', - 'InstanceId': 'i-00000000000000000', - 'EnaSupport': True, - 'ImageId': 'ami-12345678', - 'PrivateDnsName': 'ip-098-76-54-321.ec2.internal', - 'KeyName': 'testkey', - 'SecurityGroups': [{'GroupName': 'default', 'GroupId': 'sg-12345678'}], - 'ClientToken': '', - 'SubnetId': 'subnet-12345678', - 'InstanceType': 't2.micro', - 'NetworkInterfaces': [ - {'Status': 'in-use', - 'MacAddress': '12:a0:50:42:3d:a4', - 'SourceDestCheck': True, - 'VpcId': 'vpc-12345678', - 'Description': '', - 'NetworkInterfaceId': 'eni-12345678', - 'PrivateIpAddresses': [ - {'PrivateDnsName': 'ip-098-76-54-321.ec2.internal', - 'PrivateIpAddress': '098.76.54.321', - 'Primary': True, - 'Association': - {'PublicIp': '12.345.67.890', - 'PublicDnsName': 'ec2-12-345-67-890.compute-1.amazonaws.com', - 'IpOwnerId': 'amazon'}}], - 'PrivateDnsName': 'ip-098-76-54-321.ec2.internal', - 'Attachment': - {'Status': 'attached', - 'DeviceIndex': 0, - 'DeleteOnTermination': True, - 'AttachmentId': 'eni-attach-12345678', - 'AttachTime': datetime.datetime(2017, 10, 31, 12, 59, 25)}, - 'Groups': [ - {'GroupName': 'default', - 'GroupId': 'sg-12345678'}], - 'Ipv6Addresses': [], - 'OwnerId': '123456789012', - 'PrivateIpAddress': '098.76.54.321', - 'SubnetId': 'subnet-12345678', - 'Association': - {'PublicIp': '12.345.67.890', - 'PublicDnsName': 'ec2-12-345-67-890.compute-1.amazonaws.com', - 'IpOwnerId': 'amazon'}}], - 'SourceDestCheck': True, - 'Placement': - {'Tenancy': 'default', - 'GroupName': '', - 'AvailabilityZone': 'us-east-1c'}, - 'Hypervisor': 'xen', - 'BlockDeviceMappings': [ - {'DeviceName': '/dev/xvda', - 'Ebs': - {'Status': 'attached', - 'DeleteOnTermination': True, - 'VolumeId': 'vol-01234567890000000', - 'AttachTime': datetime.datetime(2017, 10, 31, 12, 59, 26)}}], - 'Architecture': 'x86_64', - 'RootDeviceType': 'ebs', - 'RootDeviceName': '/dev/xvda', - 'VirtualizationType': 'hvm', - 'Tags': [{'Value': 'test', 'Key': 'ansible'}, {'Value': 'aws_ec2', 'Key': 'Name'}], - 'AmiLaunchIndex': 0}], - 'ReservationId': 'r-01234567890000000', - 'Groups': [], - 'OwnerId': '123456789012' -} + +from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import InventoryModule +from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _compile_values +from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _get_boto_attr_chain +from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _get_tag_hostname +from ansible_collections.amazon.aws.plugins.inventory.aws_ec2 import _prepare_host_vars @pytest.fixture() @@ -140,236 +69,187 @@ def inventory(): return inventory -def test_compile_values(inventory): - found_value = instances['Instances'][0] - chain_of_keys = instance_data_filter_to_boto_attr['instance.group-id'] - for attr in chain_of_keys: - found_value = inventory._compile_values(found_value, attr) - assert found_value == "sg-12345678" - - -def test_get_boto_attr_chain(inventory): - instance = instances['Instances'][0] - assert inventory._get_boto_attr_chain('network-interface.addresses.private-ip-address', instance) == "098.76.54.321" - - -def test_boto3_conn(inventory): - inventory._options = {"aws_profile": "first_precedence", - "aws_access_key": "test_access_key", - "aws_secret_key": "test_secret_key", - "aws_security_token": "test_security_token", - "iam_role_arn": None} - loader = DataLoader() - inventory._set_credentials(loader) - with pytest.raises(AnsibleError) as error_message: - for _connection, _region in inventory._boto3_conn(regions=['us-east-1']): - assert "Insufficient credentials found" in error_message - - -def testget_all_hostnames_default(inventory): - instance = instances['Instances'][0] - assert inventory.get_all_hostnames(instance, hostnames=None) == ["ec2-12-345-67-890.compute-1.amazonaws.com", "ip-098-76-54-321.ec2.internal"] - - -def testget_all_hostnames(inventory): - hostnames = ['ip-address', 'dns-name'] - instance = instances['Instances'][0] - assert inventory.get_all_hostnames(instance, hostnames) == ["12.345.67.890", "ec2-12-345-67-890.compute-1.amazonaws.com"] - - -def testget_all_hostnames_dict(inventory): - hostnames = [{'name': 'private-ip-address', 'separator': '_', 'prefix': 'tag:Name'}] - instance = instances['Instances'][0] - assert inventory.get_all_hostnames(instance, hostnames) == ["aws_ec2_098.76.54.321"] - - -def testget_all_hostnames_with_2_tags(inventory): - hostnames = ['tag:ansible', 'tag:Name'] - instance = instances['Instances'][0] - assert inventory.get_all_hostnames(instance, hostnames) == ["test", "aws_ec2"] - - -def test_get_preferred_hostname_default(inventory): - instance = instances['Instances'][0] - assert inventory._get_preferred_hostname(instance, hostnames=None) == "ec2-12-345-67-890.compute-1.amazonaws.com" - - -def test_get_preferred_hostname(inventory): - hostnames = ['ip-address', 'dns-name'] - instance = instances['Instances'][0] - assert inventory._get_preferred_hostname(instance, hostnames) == "12.345.67.890" - - -def test_get_preferred_hostname_dict(inventory): - hostnames = [{'name': 'private-ip-address', 'separator': '_', 'prefix': 'tag:Name'}] - instance = instances['Instances'][0] - assert inventory._get_preferred_hostname(instance, hostnames) == "aws_ec2_098.76.54.321" - - -def test_get_preferred_hostname_with_2_tags(inventory): - hostnames = ['tag:ansible', 'tag:Name'] - instance = instances['Instances'][0] - assert inventory._get_preferred_hostname(instance, hostnames) == "test" - - -def test_set_credentials(inventory): - inventory._options = {'aws_access_key': 'test_access_key', - 'aws_secret_key': 'test_secret_key', - 'aws_security_token': 'test_security_token', - 'aws_profile': 'test_profile', - 'iam_role_arn': 'arn:aws:iam::123456789012:role/test-role'} - loader = DataLoader() - inventory._set_credentials(loader) - - assert inventory.boto_profile == "test_profile" - assert inventory.aws_access_key_id == "test_access_key" - assert inventory.aws_secret_access_key == "test_secret_key" - assert inventory.aws_security_token == "test_security_token" - assert inventory.iam_role_arn == "arn:aws:iam::123456789012:role/test-role" - - -def test_insufficient_credentials(inventory): - inventory._options = { - 'aws_access_key': None, - 'aws_secret_key': None, - 'aws_security_token': None, - 'aws_profile': None, - 'iam_role_arn': None - } - with pytest.raises(AnsibleError) as error_message: - loader = DataLoader() - inventory._set_credentials(loader) - assert "Insufficient credentials found" in error_message - - -def test_verify_file_bad_config(inventory): - assert inventory.verify_file('not_aws_config.yml') is False +@pytest.mark.parametrize( + "obj,expected", + [ + (None, None), + ({}, None), + ({"GroupId": "test01"}, "test01"), + ({"GroupId": ["test01"]}, "test01"), + ({"GroupId": "test01"}, "test01"), + ({"GroupId": ["test01", "test02"]}, ["test01", "test02"]), + ([{"GroupId": ["test01", "test02"]}], ["test01", "test02"]), + ([{"GroupId": ["test01"]}], "test01"), + ( + [{"GroupId": ["test01", "test02"]}, {"GroupId": ["test03", "test04"]}], + [["test01", "test02"], ["test03", "test04"]], + ), + ( + ({"GroupId": ["test01", "test02"]}, {"GroupId": ["test03", "test04"]}), + [["test01", "test02"], ["test03", "test04"]], + ), + (({"GroupId": ["test01", "test02"]}, {}), ["test01", "test02"]), + ], +) +def test_compile_values(obj, expected): + assert _compile_values(obj, "GroupId") == expected -def test_include_filters_with_no_filter(inventory): - inventory._options = { - 'filters': {}, - 'include_filters': [], - } - print(inventory.build_include_filters()) - assert inventory.build_include_filters() == [{}] +@pytest.mark.parametrize( + "filter_name,expected", + [ + ("ansible.aws.unexpected.file", "ansible.aws.unexpected.file"), + ("instance.group-id", "sg-0123456789"), + ("instance.group-name", "default"), + ("owner-id", "id-012345678L"), + ], +) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._compile_values") +def test_get_boto_attr_chain(m_compile_values, filter_name, expected): + m_compile_values.side_effect = lambda obj, attr: obj.get(attr) + instance = {"SecurityGroups": {"GroupName": "default", "GroupId": "sg-0123456789"}, "OwnerId": "id-012345678L"} -def test_include_filters_with_include_filters_only(inventory): - inventory._options = { - 'filters': {}, - 'include_filters': [{"foo": "bar"}], - } - assert inventory.build_include_filters() == [{"foo": "bar"}] + assert _get_boto_attr_chain(filter_name, instance) == expected -def test_include_filters_with_filter_and_include_filters(inventory): - inventory._options = { - 'filters': {"from_filter": 1}, - 'include_filters': [{"from_include_filter": "bar"}], +@pytest.mark.parametrize( + "hostnames,expected", + [ + ([], "test-instance.ansible.com"), + (["private-dns-name"], "test-instance.localhost"), + (["tag:os_version"], "RHEL"), + (["tag:os_version", "dns-name"], "RHEL"), + ([{"name": "Name", "prefix": "Phase"}], "dev_test-instance-01"), + ([{"name": "Name", "prefix": "Phase", "separator": "-"}], "dev-test-instance-01"), + ([{"name": "Name", "prefix": "OSVersion", "separator": "-"}], "test-instance-01"), + ([{"name": "Name", "separator": "-"}], "test-instance-01"), + ([{"name": "Name", "prefix": "Phase"}, "private-dns-name"], "dev_test-instance-01"), + ([{"name": "Name", "prefix": "Phase"}, "tag:os_version"], "dev_test-instance-01"), + (["private-dns-name", "dns-name"], "test-instance.localhost"), + (["private-dns-name", {"name": "Name", "separator": "-"}], "test-instance.localhost"), + (["private-dns-name", "tag:os_version"], "test-instance.localhost"), + (["OSRelease"], None), + ], +) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_tag_hostname") +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_boto_attr_chain") +def test_inventory_get_preferred_hostname(m_get_boto_attr_chain, m_get_tag_hostname, inventory, hostnames, expected): + instance = { + "Name": "test-instance-01", + "Phase": "dev", + "tag:os_version": ["RHEL", "CoreOS"], + "another_key": "another_value", + "dns-name": "test-instance.ansible.com", + "private-dns-name": "test-instance.localhost", } - print(inventory.build_include_filters()) - assert inventory.build_include_filters() == [ - {"from_filter": 1}, - {"from_include_filter": "bar"}] + inventory._sanitize_hostname = MagicMock() + inventory._sanitize_hostname.side_effect = lambda x: x -def test_add_host_empty_hostnames(inventory): - hosts = [ - { - "Placement": { - "AvailabilityZone": "us-east-1a", - }, - "PublicDnsName": "ip-10-85-0-4.ec2.internal" - }, - ] - inventory._add_hosts(hosts, "aws_ec2", []) - inventory.inventory.add_host.assert_called_with("ip-10-85-0-4.ec2.internal", group="aws_ec2") + m_get_boto_attr_chain.side_effect = lambda pref, instance: instance.get(pref) + m_get_tag_hostname.side_effect = lambda pref, instance: instance.get(pref) + assert expected == inventory._get_preferred_hostname(instance, hostnames) -def test_add_host_with_hostnames_no_criteria(inventory): - hosts = [{}] - inventory._add_hosts( - hosts, "aws_ec2", hostnames=["tag:Name", "private-dns-name", "dns-name"] - ) - assert inventory.inventory.add_host.call_count == 0 +def test_inventory_get_preferred_hostname_failure(inventory): + instance = {} + hostnames = [{"value": "saome_value"}] + inventory._sanitize_hostname = MagicMock() + inventory._sanitize_hostname.side_effect = lambda x: x -def test_add_host_with_hostnames_and_one_criteria(inventory): - hosts = [ - { - "Placement": { - "AvailabilityZone": "us-east-1a", - }, - "PublicDnsName": "sample-host", - } - ] + with pytest.raises(AnsibleError) as err: + inventory._get_preferred_hostname(instance, hostnames) + assert "A 'name' key must be defined in a hostnames dictionary." in err - inventory._add_hosts( - hosts, "aws_ec2", hostnames=["tag:Name", "private-dns-name", "dns-name"] - ) - assert inventory.inventory.add_host.call_count == 1 - inventory.inventory.add_host.assert_called_with("sample-host", group="aws_ec2") +@pytest.mark.parametrize("base_verify_file_return", [True, False]) +@pytest.mark.parametrize( + "filename,result", + [ + ("inventory_aws_ec2.yml", True), + ("inventory_aws_ec2.yaml", True), + ("inventory_aws_EC2.yaml", False), + ("inventory_Aws_ec2.yaml", False), + ("aws_ec2_inventory.yml", False), + ("aws_ec2.yml_inventory", False), + ("aws_ec2.yml", True), + ("aws_ec2.yaml", True), + ], +) +@patch("ansible.plugins.inventory.BaseInventoryPlugin.verify_file") +def test_inventory_verify_file(m_base_verify_file, inventory, base_verify_file_return, filename, result): + m_base_verify_file.return_value = base_verify_file_return + if not base_verify_file_return: + assert not inventory.verify_file(filename) + else: + assert result == inventory.verify_file(filename) -def test_add_host_with_hostnames_and_two_matching_criteria(inventory): - hosts = [ - { - "Placement": { - "AvailabilityZone": "us-east-1a", - }, - "PublicDnsName": "name-from-PublicDnsName", - "Tags": [{"Value": "name-from-tag-Name", "Key": "Name"}], - } - ] - inventory._add_hosts( - hosts, "aws_ec2", hostnames=["tag:Name", "private-dns-name", "dns-name"] - ) - assert inventory.inventory.add_host.call_count == 1 - inventory.inventory.add_host.assert_called_with( - "name-from-tag-Name", group="aws_ec2" - ) +@pytest.mark.parametrize( + "preference,instance,expected", + [ + ("tag:os_provider", {"Tags": []}, []), + ("tag:os_provider", {}, []), + ("tag:os_provider", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, ["RedHat"]), + ("tag:OS_Provider", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, []), + ("tag:tag:os_provider", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, []), + ("tag:os_provider=RedHat", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, ["os_provider_RedHat"]), + ("tag:os_provider=CoreOS", {"Tags": [{"Key": "os_provider", "Value": "RedHat"}]}, []), + ( + "tag:os_provider=RedHat,os_release=7", + {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "8"}]}, + ["os_provider_RedHat"], + ), + ( + "tag:os_provider=RedHat,os_release=7", + {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "7"}]}, + ["os_provider_RedHat", "os_release_7"], + ), + ( + "tag:os_provider,os_release", + {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "7"}]}, + ["RedHat", "7"], + ), + ( + "tag:os_provider=RedHat,os_release", + {"Tags": [{"Key": "os_provider", "Value": "RedHat"}, {"Key": "os_release", "Value": "7"}]}, + ["os_provider_RedHat", "7"], + ), + ], +) +def test_get_tag_hostname(preference, instance, expected): + assert expected == _get_tag_hostname(preference, instance) -def test_add_host_with_hostnames_and_two_matching_criteria_and_allow_duplicated_hosts( - inventory, -): - hosts = [ - { - "Placement": { - "AvailabilityZone": "us-east-1a", +@pytest.mark.parametrize( + "_options, expected", + [ + ({"filters": {}, "include_filters": []}, [{}]), + ({"filters": {}, "include_filters": [{"foo": "bar"}]}, [{"foo": "bar"}]), + ( + { + "filters": {"from_filter": 1}, + "include_filters": [{"from_include_filter": "bar"}], }, - "PublicDnsName": "name-from-PublicDnsName", - "Tags": [{"Value": "name-from-tag-Name", "Key": "Name"}], - } - ] - - inventory._add_hosts( - hosts, - "aws_ec2", - hostnames=["tag:Name", "private-dns-name", "dns-name"], - allow_duplicated_hosts=True, - ) - assert inventory.inventory.add_host.call_count == 2 - inventory.inventory.add_host.assert_any_call( - "name-from-PublicDnsName", group="aws_ec2" - ) - inventory.inventory.add_host.assert_any_call("name-from-tag-Name", group="aws_ec2") + [{"from_filter": 1}, {"from_include_filter": "bar"}], + ), + ], +) +def test_inventory_build_include_filters(inventory, _options, expected): + inventory._options = _options + assert inventory.build_include_filters() == expected -def test_sanitize_hostname(inventory): - assert inventory._sanitize_hostname(1) == "1" - assert inventory._sanitize_hostname("a:b") == "a_b" - assert inventory._sanitize_hostname("a:/b") == "a__b" - assert inventory._sanitize_hostname("example") == "example" +@pytest.mark.parametrize("hostname,expected", [(1, "1"), ("a:b", "a_b"), ("a:/b", "a__b"), ("example", "example")]) +def test_sanitize_hostname(inventory, hostname, expected): + assert inventory._sanitize_hostname(hostname) == expected def test_sanitize_hostname_legacy(inventory): - inventory._sanitize_group_name = ( - inventory._legacy_script_compatible_group_sanitization - ) + inventory._sanitize_group_name = inventory._legacy_script_compatible_group_sanitization assert inventory._sanitize_hostname("a:/b") == "a__b" @@ -413,7 +293,6 @@ def test_sanitize_hostname_legacy(inventory): ], ) def test_prepare_host_vars( - inventory, hostvars_prefix, hostvars_suffix, use_contrib_script_compatible_ec2_tag_keys, @@ -425,7 +304,7 @@ def test_prepare_host_vars( "Tags": [{"Key": "Name", "Value": "my-name"}], } assert ( - inventory.prepare_host_vars( + _prepare_host_vars( original_host_vars, hostvars_prefix, hostvars_suffix, @@ -472,43 +351,339 @@ def test_iter_entry(inventory): assert entries[1][1]["a_tags_b"]["Name"] == "my-name" -def test_query_empty(inventory): - result = inventory._query("us-east-1", [], [], strict_permissions=True) - assert result == {"aws_ec2": []} +@pytest.mark.parametrize( + "include_filters,exclude_filters,instances_by_region,instances", + [ + ([], [], [], []), + ( + [4, 1, 2], + [], + [ + [{"InstanceId": 4, "name": "instance-4"}], + [{"InstanceId": 1, "name": "instance-1"}], + [{"InstanceId": 2, "name": "instance-2"}], + ], + [ + {"InstanceId": 1, "name": "instance-1"}, + {"InstanceId": 2, "name": "instance-2"}, + {"InstanceId": 4, "name": "instance-4"}, + ], + ), + ( + [], + [4, 1, 2], + [ + [{"InstanceId": 4, "name": "instance-4"}], + [{"InstanceId": 1, "name": "instance-1"}], + [{"InstanceId": 2, "name": "instance-2"}], + ], + [], + ), + ( + [1, 2], + [4], + [ + [{"InstanceId": 4, "name": "instance-4"}], + [{"InstanceId": 1, "name": "instance-1"}], + [{"InstanceId": 2, "name": "instance-2"}], + ], + [{"InstanceId": 1, "name": "instance-1"}, {"InstanceId": 2, "name": "instance-2"}], + ), + ( + [1, 2], + [1], + [ + [{"InstanceId": 1, "name": "instance-1"}], + [{"InstanceId": 1, "name": "instance-1"}], + [{"InstanceId": 2, "name": "instance-2"}], + ], + [{"InstanceId": 2, "name": "instance-2"}], + ), + ], +) +def test_inventory_query(inventory, include_filters, exclude_filters, instances_by_region, instances): + inventory._get_instances_by_region = MagicMock() + inventory._get_instances_by_region.side_effect = instances_by_region + + regions = ["us-east-1", "us-east-2"] + strict = False + + params = { + "regions": regions, + "strict_permissions": strict, + "include_filters": [], + "exclude_filters": [], + "use_ssm_inventory": False, + } + + for u in include_filters: + params["include_filters"].append({"Name": f"in_filters_{int(u)}", "Values": [u]}) + + for u in exclude_filters: + params["exclude_filters"].append({"Name": f"ex_filters_{int(u)}", "Values": [u]}) + + assert inventory._query(**params) == {"aws_ec2": instances} + if not instances_by_region: + inventory._get_instances_by_region.assert_not_called() + + +@pytest.mark.parametrize( + "filters", + [ + [], + [{"Name": "provider", "Values": "sample"}, {"Name": "instance-state-name", "Values": ["active"]}], + [ + {"Name": "tags", "Values": "one_tag"}, + ], + ], +) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._describe_ec2_instances") +def test_inventory_get_instances_by_region(m_describe_ec2_instances, inventory, filters): + boto3_conn = [(MagicMock(), "us-east-1"), (MagicMock(), "us-east-2")] + + inventory.all_clients = MagicMock() + inventory.all_clients.return_value = boto3_conn + + m_describe_ec2_instances.side_effect = [ + { + "Reservations": [ + { + "OwnerId": "owner01", + "RequesterId": "requester01", + "ReservationId": "id-0123", + "Instances": [ + {"name": "id-1-0", "os": "RedHat"}, + {"name": "id-1-1", "os": "CoreOS"}, + {"name": "id-1-2", "os": "Fedora"}, + ], + }, + { + "OwnerId": "owner01", + "ReservationId": "id-0456", + "Instances": [{"name": "id-2-0", "phase": "uat"}, {"name": "id-2-1", "phase": "prod"}], + }, + ] + }, + { + "Reservations": [ + { + "OwnerId": "owner02", + "ReservationId": "id-0789", + "Instances": [ + {"name": "id012345789", "tags": {"phase": "units"}}, + ], + } + ], + "Metadata": {"Status": "active"}, + }, + ] + + expected = [ + { + "name": "id-1-0", + "os": "RedHat", + "OwnerId": "owner01", + "RequesterId": "requester01", + "ReservationId": "id-0123", + }, + { + "name": "id-1-1", + "os": "CoreOS", + "OwnerId": "owner01", + "RequesterId": "requester01", + "ReservationId": "id-0123", + }, + { + "name": "id-1-2", + "os": "Fedora", + "OwnerId": "owner01", + "RequesterId": "requester01", + "ReservationId": "id-0123", + }, + {"name": "id-2-0", "phase": "uat", "OwnerId": "owner01", "ReservationId": "id-0456", "RequesterId": ""}, + {"name": "id-2-1", "phase": "prod", "OwnerId": "owner01", "ReservationId": "id-0456", "RequesterId": ""}, + { + "name": "id012345789", + "tags": {"phase": "units"}, + "OwnerId": "owner02", + "ReservationId": "id-0789", + "RequesterId": "", + }, + ] + + default_filter = {"Name": "instance-state-name", "Values": ["running", "pending", "stopping", "stopped"]} + regions = ["us-east-2", "us-east-4"] + + assert inventory._get_instances_by_region(regions, filters, False) == expected + inventory.all_clients.assert_called_with("ec2") + + if any((f["Name"] == "instance-state-name" for f in filters)): + filters.append(default_filter) + + m_describe_ec2_instances.assert_has_calls([call(conn, filters) for conn, region in boto3_conn], any_order=True) + + +@pytest.mark.parametrize("strict", [True, False]) +@pytest.mark.parametrize( + "error", + [ + botocore.exceptions.ClientError( + {"Error": {"Code": 1, "Message": "Something went wrong"}, "ResponseMetadata": {"HTTPStatusCode": 404}}, + "some_botocore_client_error", + ), + botocore.exceptions.ClientError( + { + "Error": {"Code": "UnauthorizedOperation", "Message": "Something went wrong"}, + "ResponseMetadata": {"HTTPStatusCode": 403}, + }, + "some_botocore_client_error", + ), + botocore.exceptions.PaginationError(message="some pagination error"), + ], +) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._describe_ec2_instances") +def test_inventory_get_instances_by_region_failures(m_describe_ec2_instances, inventory, strict, error): + inventory.all_clients = MagicMock() + inventory.all_clients.return_value = [(MagicMock(), "us-west-2")] + inventory.fail_aws = MagicMock() + inventory.fail_aws.side_effect = SystemExit(1) + + m_describe_ec2_instances.side_effect = error + regions = ["us-east-2", "us-east-4"] + + if ( + isinstance(error, botocore.exceptions.ClientError) + and error.response["ResponseMetadata"]["HTTPStatusCode"] == 403 + and not strict + ): + assert inventory._get_instances_by_region(regions, [], strict) == [] + else: + with pytest.raises(SystemExit): + inventory._get_instances_by_region(regions, [], strict) + + +@pytest.mark.parametrize( + "hostnames,expected", + [ + ([], ["test-instance.ansible.com", "test-instance.localhost"]), + (["private-dns-name"], ["test-instance.localhost"]), + (["tag:os_version"], ["RHEL", "CoreOS"]), + (["tag:os_version", "dns-name"], ["RHEL", "CoreOS", "test-instance.ansible.com"]), + ([{"name": "Name", "prefix": "Phase"}], ["dev_test-instance-01"]), + ([{"name": "Name", "prefix": "Phase", "separator": "-"}], ["dev-test-instance-01"]), + ([{"name": "Name", "prefix": "OSVersion", "separator": "-"}], ["test-instance-01"]), + ([{"name": "Name", "separator": "-"}], ["test-instance-01"]), + ( + [{"name": "Name", "prefix": "Phase"}, "private-dns-name"], + ["dev_test-instance-01", "test-instance.localhost"], + ), + ([{"name": "Name", "prefix": "Phase"}, "tag:os_version"], ["dev_test-instance-01", "RHEL", "CoreOS"]), + (["private-dns-name", {"name": "Name", "separator": "-"}], ["test-instance.localhost", "test-instance-01"]), + (["OSRelease"], []), + ], +) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_tag_hostname") +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_boto_attr_chain") +def test_inventory_get_all_hostnames(m_get_boto_attr_chain, m_get_tag_hostname, inventory, hostnames, expected): + instance = { + "Name": "test-instance-01", + "Phase": "dev", + "tag:os_version": ["RHEL", "CoreOS"], + "another_key": "another_value", + "dns-name": "test-instance.ansible.com", + "private-dns-name": "test-instance.localhost", + } + + inventory._sanitize_hostname = MagicMock() + inventory._sanitize_hostname.side_effect = lambda x: x + + m_get_boto_attr_chain.side_effect = lambda pref, instance: instance.get(pref) + m_get_tag_hostname.side_effect = lambda pref, instance: instance.get(pref) + assert expected == inventory._get_all_hostnames(instance, hostnames) -instance_foobar = {"InstanceId": "foobar"} -instance_barfoo = {"InstanceId": "barfoo"} +def test_inventory_get_all_hostnames_failure(inventory): + instance = {} + hostnames = [{"value": "some_value"}] -def test_query_empty_include_only(inventory): - inventory._get_instances_by_region = Mock(side_effect=[[instance_foobar]]) - result = inventory._query("us-east-1", [{"tag:Name": ["foobar"]}], [], strict_permissions=True) - assert result == {"aws_ec2": [instance_foobar]} + with pytest.raises(AnsibleError) as err: + inventory._get_all_hostnames(instance, hostnames) + assert "A 'name' key must be defined in a hostnames dictionary." in err -def test_query_empty_include_ordered(inventory): - inventory._get_instances_by_region = Mock(side_effect=[[instance_foobar], [instance_barfoo]]) - result = inventory._query("us-east-1", [{"tag:Name": ["foobar"]}, {"tag:Name": ["barfoo"]}], [], strict_permissions=True) - assert result == {"aws_ec2": [instance_barfoo, instance_foobar]} - inventory._get_instances_by_region.assert_called_with('us-east-1', [{'Name': 'tag:Name', 'Values': ['barfoo']}], True) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_ssm_information") +def test_inventory__add_ssm_information(m_get_ssm_information, inventory): + instances = [ + {"InstanceId": "i-001", "Name": "first-instance"}, + {"InstanceId": "i-002", "Name": "another-instance"}, + ] + result = { + "StatusCode": 200, + "Entities": [ + {"Id": "i-001", "Data": {}}, + { + "Id": "i-002", + "Data": { + "AWS:InstanceInformation": { + "Content": [{"os_type": "Linux", "os_name": "Fedora", "os_version": 37}] + } + }, + }, + ], + } + m_get_ssm_information.return_value = result -def test_query_empty_include_exclude(inventory): - inventory._get_instances_by_region = Mock(side_effect=[[instance_foobar], [instance_foobar]]) - result = inventory._query("us-east-1", [{"tag:Name": ["foobar"]}], [{"tag:Name": ["foobar"]}], strict_permissions=True) - assert result == {"aws_ec2": []} + connection = MagicMock() + expected = [ + {"InstanceId": "i-001", "Name": "first-instance"}, + { + "InstanceId": "i-002", + "Name": "another-instance", + "SsmInventory": {"os_type": "Linux", "os_name": "Fedora", "os_version": 37}, + }, + ] + + inventory._add_ssm_information(connection, instances) + assert expected == instances + + filters = [{"Key": "AWS:InstanceInformation.InstanceId", "Values": [x["InstanceId"] for x in instances]}] + m_get_ssm_information.assert_called_once_with(connection, filters) + + +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_ec2._get_ssm_information") +def test_inventory__get_multiple_ssm_inventories(m_get_ssm_information, inventory): + instances = [{"InstanceId": f"i-00{i}", "Name": f"instance {i}"} for i in range(41)] + result = { + "StatusCode": 200, + "Entities": [ + { + "Id": f"i-00{i}", + "Data": { + "AWS:InstanceInformation": { + "Content": [{"os_type": "Linux", "os_name": "Fedora", "os_version": 37}] + } + }, + } + for i in range(41) + ], + } + m_get_ssm_information.return_value = result + + connection = MagicMock() -def test_include_extra_api_calls_deprecated(inventory): - inventory.display.deprecate = Mock() - inventory._read_config_data = Mock() - inventory._set_credentials = Mock() - inventory._query = Mock(return_value=[]) + expected = [ + { + "InstanceId": f"i-00{i}", + "Name": f"instance {i}", + "SsmInventory": {"os_type": "Linux", "os_name": "Fedora", "os_version": 37}, + } + for i in range(41) + ] - inventory.parse(inventory=[], loader=None, path=None) - assert inventory.display.deprecate.call_count == 0 + inventory._add_ssm_information(connection, instances) + assert expected == instances - inventory._options["include_extra_api_calls"] = True - inventory.parse(inventory=[], loader=None, path=None) - assert inventory.display.deprecate.call_count == 1 + assert 2 == m_get_ssm_information.call_count diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py new file mode 100644 index 000000000..53be24a48 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/inventory/test_aws_rds.py @@ -0,0 +1,674 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Aubin Bikouo <@abikouo> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +import copy +import random +import string +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import pytest + +try: + import botocore +except ImportError: + # Handled by HAS_BOTO3 + pass + +from ansible.errors import AnsibleError + +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import InventoryModule +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _add_tags_for_rds_hosts +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _describe_db_clusters +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _describe_db_instances +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _find_hosts_with_valid_statuses +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import _get_rds_hostname +from ansible_collections.amazon.aws.plugins.inventory.aws_rds import ansible_dict_to_boto3_filter_list +from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 + +if not HAS_BOTO3: + pytestmark = pytest.mark.skip("test_aws_rds.py requires the python modules 'boto3' and 'botocore'") + + +def make_clienterror_exception(code="AccessDenied"): + return botocore.exceptions.ClientError( + { + "Error": {"Code": code, "Message": "User is not authorized to perform: xxx on resource: user yyyy"}, + "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"}, + }, + "getXXX", + ) + + +@pytest.fixture() +def inventory(): + inventory = InventoryModule() + inventory.inventory = MagicMock() + inventory._populate_host_vars = MagicMock() + + inventory.all_clients = MagicMock() + inventory.get_option = MagicMock() + + inventory._set_composite_vars = MagicMock() + inventory._add_host_to_composed_groups = MagicMock() + inventory._add_host_to_keyed_groups = MagicMock() + inventory._read_config_data = MagicMock() + inventory._set_credentials = MagicMock() + + inventory.get_cache_key = MagicMock() + + inventory._cache = {} + return inventory + + +@pytest.fixture() +def connection(): + conn = MagicMock() + return conn + + +@pytest.mark.parametrize( + "suffix,result", + [ + ("aws_rds.yml", True), + ("aws_rds.yaml", True), + ("aws_RDS.yml", False), + ("AWS_rds.yaml", False), + ], +) +def test_inventory_verify_file_suffix(inventory, suffix, result, tmp_path): + test_dir = tmp_path / "test_aws_rds" + test_dir.mkdir() + inventory_file = "inventory" + suffix + inventory_file = test_dir / inventory_file + inventory_file.write_text("my inventory") + assert result == inventory.verify_file(str(inventory_file)) + + +def test_inventory_verify_file_with_missing_file(inventory): + inventory_file = "this_file_does_not_exist_aws_rds.yml" + assert not inventory.verify_file(inventory_file) + + +def generate_random_string(with_digits=True, with_punctuation=True, length=16): + data = string.ascii_letters + if with_digits: + data += string.digits + if with_punctuation: + data += string.punctuation + return "".join([random.choice(data) for i in range(length)]) + + +@pytest.mark.parametrize( + "hosts,statuses,expected", + [ + ( + [ + {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}, + {"host": "host2", "DBInstanceStatus": "Creating", "Status": "active"}, + {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"}, + {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"}, + ], + ["Available"], + [{"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}], + ), + ( + [ + {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}, + {"host": "host2", "DBInstanceStatus": "Creating", "Status": "active"}, + {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"}, + {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"}, + ], + ["all"], + [ + {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}, + {"host": "host2", "DBInstanceStatus": "Creating", "Status": "active"}, + {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"}, + {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"}, + ], + ), + ( + [ + {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}, + {"host": "host2", "DBInstanceStatus": "Creating", "Status": "Available"}, + {"host": "host3", "DBInstanceStatus": "Stopped", "Status": "active"}, + {"host": "host4", "DBInstanceStatus": "Configuring", "Status": "active"}, + ], + ["Available"], + [ + {"host": "host1", "DBInstanceStatus": "Available", "Status": "active"}, + {"host": "host2", "DBInstanceStatus": "Creating", "Status": "Available"}, + ], + ), + ], +) +def test_find_hosts_with_valid_statuses(hosts, statuses, expected): + assert expected == _find_hosts_with_valid_statuses(hosts, statuses) + + +@pytest.mark.parametrize( + "host,expected", + [ + ({"DBClusterIdentifier": "my_cluster_id"}, "my_cluster_id"), + ({"DBClusterIdentifier": "my_cluster_id", "DBInstanceIdentifier": "my_instance_id"}, "my_instance_id"), + ], +) +def test_get_rds_hostname(host, expected): + assert expected == _get_rds_hostname(host) + + +@pytest.mark.parametrize("hosts", ["", "host1", "host2,host3", "host2,host3,host1"]) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_rds._get_rds_hostname") +def test_inventory_format_inventory(m_get_rds_hostname, inventory, hosts): + hosts_vars = { + "host1": {"var10": "value10"}, + "host2": {"var20": "value20", "var21": "value21"}, + "host3": {"var30": "value30", "var31": "value31", "var32": "value32"}, + } + + m_get_rds_hostname.side_effect = lambda h: h["name"] + + class _inventory_host(object): + def __init__(self, name, host_vars): + self.name = name + self.vars = host_vars + + inventory.inventory = MagicMock() + inventory.inventory.get_host.side_effect = lambda x: _inventory_host(name=x, host_vars=hosts_vars.get(x)) + + hosts = [{"name": x} for x in hosts.split(",") if x] + expected = { + "_meta": {"hostvars": {x["name"]: hosts_vars.get(x["name"]) for x in hosts}}, + "aws_rds": {"hosts": [x["name"] for x in hosts]}, + } + + assert expected == inventory._format_inventory(hosts) + if hosts == []: + m_get_rds_hostname.assert_not_called() + + +@pytest.mark.parametrize("length", range(0, 10, 2)) +def test_inventory_populate(inventory, length): + group = "aws_rds" + hosts = [f"host_{int(i)}" for i in range(length)] + + inventory._add_hosts = MagicMock() + inventory._populate(hosts=hosts) + + inventory.inventory.add_group.assert_called_with("aws_rds") + + if len(hosts) == 0: + inventory.inventory._add_hosts.assert_not_called() + inventory.inventory.add_child.assert_not_called() + else: + inventory._add_hosts.assert_called_with(hosts=hosts, group=group) + inventory.inventory.add_child.assert_called_with("all", group) + + +def test_inventory_populate_from_source(inventory): + source_data = { + "_meta": { + "hostvars": { + "host_1_0": {"var10": "value10"}, + "host_2": {"var2": "value2"}, + "host_3": {"var3": ["value30", "value31", "value32"]}, + } + }, + "all": {"hosts": ["host_1_0", "host_1_1", "host_2", "host_3"]}, + "aws_host_1": {"hosts": ["host_1_0", "host_1_1"]}, + "aws_host_2": {"hosts": ["host_2"]}, + "aws_host_3": {"hosts": ["host_3"]}, + } + + inventory._populate_from_source(source_data) + inventory.inventory.add_group.assert_has_calls( + [ + call("aws_host_1"), + call("aws_host_2"), + call("aws_host_3"), + ], + any_order=True, + ) + inventory.inventory.add_child.assert_has_calls( + [ + call("all", "aws_host_1"), + call("all", "aws_host_2"), + call("all", "aws_host_3"), + ], + any_order=True, + ) + + inventory._populate_host_vars.assert_has_calls( + [ + call(["host_1_0"], {"var10": "value10"}, "aws_host_1"), + call(["host_1_1"], {}, "aws_host_1"), + call(["host_2"], {"var2": "value2"}, "aws_host_2"), + call(["host_3"], {"var3": ["value30", "value31", "value32"]}, "aws_host_3"), + ], + any_order=True, + ) + + +@pytest.mark.parametrize("strict", [True, False]) +def test_add_tags_for_rds_hosts_with_no_hosts(connection, strict): + hosts = [] + + _add_tags_for_rds_hosts(connection, hosts, strict) + connection.list_tags_for_resource.assert_not_called() + + +def test_add_tags_for_rds_hosts_with_hosts(connection): + hosts = [ + {"DBInstanceArn": "dbarn1"}, + {"DBInstanceArn": "dbarn2"}, + {"DBClusterArn": "clusterarn1"}, + ] + + rds_hosts_tags = { + "dbarn1": {"TagList": ["tag1=dbarn1", "phase=units"]}, + "dbarn2": {"TagList": ["tag2=dbarn2", "collection=amazon.aws"]}, + "clusterarn1": {"TagList": ["tag1=clusterarn1", "tool=ansible-test"]}, + } + connection.list_tags_for_resource.side_effect = lambda **kwargs: rds_hosts_tags.get(kwargs.get("ResourceName")) + + _add_tags_for_rds_hosts(connection, hosts, strict=False) + + assert hosts == [ + {"DBInstanceArn": "dbarn1", "Tags": ["tag1=dbarn1", "phase=units"]}, + {"DBInstanceArn": "dbarn2", "Tags": ["tag2=dbarn2", "collection=amazon.aws"]}, + {"DBClusterArn": "clusterarn1", "Tags": ["tag1=clusterarn1", "tool=ansible-test"]}, + ] + + +def test_add_tags_for_rds_hosts_with_failure_not_strict(connection): + hosts = [{"DBInstanceArn": "dbarn1"}] + + connection.list_tags_for_resource.side_effect = make_clienterror_exception() + + _add_tags_for_rds_hosts(connection, hosts, strict=False) + + assert hosts == [ + {"DBInstanceArn": "dbarn1", "Tags": []}, + ] + + +def test_add_tags_for_rds_hosts_with_failure_strict(connection): + hosts = [{"DBInstanceArn": "dbarn1"}] + + connection.list_tags_for_resource.side_effect = make_clienterror_exception() + + with pytest.raises(botocore.exceptions.ClientError): + _add_tags_for_rds_hosts(connection, hosts, strict=True) + + +ADD_TAGS_FOR_RDS_HOSTS = "ansible_collections.amazon.aws.plugins.inventory.aws_rds._add_tags_for_rds_hosts" + + +@patch(ADD_TAGS_FOR_RDS_HOSTS) +def test_describe_db_clusters(m_add_tags_for_rds_hosts, connection): + db_cluster = { + "DatabaseName": "my_sample_db", + "DBClusterIdentifier": "db_id_01", + "Status": "Stopped", + "DbClusterResourceId": "cluster_resource_id", + "DBClusterArn": "arn:xxx:xxxx", + "DeletionProtection": True, + } + + connection.describe_db_clusters.return_value = {"DBClusters": [db_cluster]} + + filters = generate_random_string(with_punctuation=False) + strict = False + + result = _describe_db_clusters(connection=connection, filters=filters, strict=strict) + + assert result == [db_cluster] + + m_add_tags_for_rds_hosts.assert_called_with(connection, result, strict) + + +@pytest.mark.parametrize("strict", [True, False]) +@patch(ADD_TAGS_FOR_RDS_HOSTS) +def test_describe_db_clusters_with_access_denied(m_add_tags_for_rds_hosts, connection, strict): + connection.describe_db_clusters.side_effect = make_clienterror_exception() + + filters = generate_random_string(with_punctuation=False) + + if strict: + with pytest.raises(AnsibleError): + _describe_db_clusters(connection=connection, filters=filters, strict=strict) + else: + assert _describe_db_clusters(connection=connection, filters=filters, strict=strict) == [] + + m_add_tags_for_rds_hosts.assert_not_called() + + +@patch(ADD_TAGS_FOR_RDS_HOSTS) +def test_describe_db_clusters_with_client_error(m_add_tags_for_rds_hosts, connection): + connection.describe_db_clusters.side_effect = make_clienterror_exception(code="Unknown") + + filters = generate_random_string(with_punctuation=False) + with pytest.raises(AnsibleError): + _describe_db_clusters(connection=connection, filters=filters, strict=False) + + m_add_tags_for_rds_hosts.assert_not_called() + + +@patch(ADD_TAGS_FOR_RDS_HOSTS) +def test_describe_db_instances(m_add_tags_for_rds_hosts, connection): + db_instance = { + "DBInstanceIdentifier": "db_id_01", + "Status": "Stopped", + "DBName": "my_sample_db_01", + "DBClusterIdentifier": "db_cluster_001", + "DBInstanceArn": "arn:db:xxxx:xxxx:xxxx", + "Engine": "mysql", + } + + conn_paginator = MagicMock() + paginate = MagicMock() + + connection.get_paginator.return_value = conn_paginator + conn_paginator.paginate.return_value = paginate + + paginate.build_full_result.return_value = {"DBInstances": [db_instance]} + + filters = generate_random_string(with_punctuation=False) + strict = False + + result = _describe_db_instances(connection=connection, filters=filters, strict=strict) + + assert result == [db_instance] + + m_add_tags_for_rds_hosts.assert_called_with(connection, result, strict) + connection.get_paginator.assert_called_with("describe_db_instances") + conn_paginator.paginate.assert_called_with(Filters=filters) + + +DESCRIBE_DB_INSTANCES = "ansible_collections.amazon.aws.plugins.inventory.aws_rds._describe_db_instances" +DESCRIBE_DB_CLUSTERS = "ansible_collections.amazon.aws.plugins.inventory.aws_rds._describe_db_clusters" +FIND_HOSTS_WITH_VALID_STATUSES = ( + "ansible_collections.amazon.aws.plugins.inventory.aws_rds._find_hosts_with_valid_statuses" +) + + +@pytest.mark.parametrize("gather_clusters", [True, False]) +@pytest.mark.parametrize("regions", range(1, 5)) +@patch(DESCRIBE_DB_INSTANCES) +@patch(DESCRIBE_DB_CLUSTERS) +@patch(FIND_HOSTS_WITH_VALID_STATUSES) +def test_inventory_get_all_db_hosts( + m_find_hosts, m_describe_db_clusters, m_describe_db_instances, inventory, gather_clusters, regions +): + params = { + "gather_clusters": gather_clusters, + "regions": [f"us-east-{int(i)}" for i in range(regions)], + "instance_filters": generate_random_string(), + "cluster_filters": generate_random_string(), + "strict": random.choice((True, False)), + "statuses": [random.choice(["Available", "Stopped", "Running", "Creating"]) for i in range(3)], + } + + connections = [MagicMock() for i in range(regions)] + + inventory.all_clients.return_value = [(connections[i], f"us-east-{int(i)}") for i in range(regions)] + + ids = list(reversed(range(regions))) + db_instances = [{"DBInstanceIdentifier": f"db_00{int(i)}"} for i in ids] + db_clusters = [{"DBClusterIdentifier": f"cluster_00{int(i)}"} for i in ids] + + m_describe_db_instances.side_effect = [[i] for i in db_instances] + m_describe_db_clusters.side_effect = [[i] for i in db_clusters] + + result = list(sorted(db_instances, key=lambda x: x["DBInstanceIdentifier"])) + if gather_clusters: + result += list(sorted(db_clusters, key=lambda x: x["DBClusterIdentifier"])) + + m_find_hosts.return_value = result + + assert result == inventory._get_all_db_hosts(**params) + inventory.all_clients.assert_called_with("rds") + m_describe_db_instances.assert_has_calls( + [call(connections[i], params["instance_filters"], strict=params["strict"]) for i in range(regions)] + ) + + if gather_clusters: + m_describe_db_clusters.assert_has_calls( + [call(connections[i], params["cluster_filters"], strict=params["strict"]) for i in range(regions)] + ) + + m_find_hosts.assert_called_with(result, params["statuses"]) + + +@pytest.mark.parametrize("hostvars_prefix", [True]) +@pytest.mark.parametrize("hostvars_suffix", [True]) +@patch("ansible_collections.amazon.aws.plugins.inventory.aws_rds._get_rds_hostname") +def test_inventory_add_hosts(m_get_rds_hostname, inventory, hostvars_prefix, hostvars_suffix): + _options = { + "strict": random.choice((False, True)), + "compose": random.choice((False, True)), + "keyed_groups": "keyed_group_test_inventory_add_hosts", + "groups": ["all", "test_inventory_add_hosts"], + } + + if hostvars_prefix: + _options["hostvars_prefix"] = f"prefix_{generate_random_string(length=8, with_punctuation=False)}" + if hostvars_suffix: + _options["hostvars_suffix"] = f"suffix_{generate_random_string(length=8, with_punctuation=False)}" + + def _get_option_side_effect(x): + return _options.get(x) + + inventory.get_option.side_effect = _get_option_side_effect + + m_get_rds_hostname.side_effect = lambda h: ( + h["DBInstanceIdentifier"] if "DBInstanceIdentifier" in h else h["DBClusterIdentifier"] + ) + + hosts = [ + { + "DBInstanceIdentifier": "db_i_001", + "Tags": [{"Key": "Name", "Value": "db_001"}, {"Key": "RunningEngine", "Value": "mysql"}], + "availability_zone": "us-east-1a", + }, + { + "DBInstanceIdentifier": "db_i_002", + "Tags": [{"Key": "ClusterName", "Value": "test_cluster"}, {"Key": "RunningOS", "Value": "CoreOS"}], + }, + { + "DBClusterIdentifier": "test_cluster", + "Tags": [{"Key": "CluserVersionOrigin", "Value": "2.0"}, {"Key": "Provider", "Value": "RedHat"}], + }, + { + "DBClusterIdentifier": "another_cluster", + "Tags": [{"Key": "TestingPurpose", "Value": "Ansible"}], + "availability_zones": ["us-west-1a", "us-east-1b"], + }, + ] + + group = f"test_add_hosts_group_{generate_random_string(length=10, with_punctuation=False)}" + inventory._add_hosts(hosts, group) + + m_get_rds_hostname.assert_has_calls([call(h) for h in hosts], any_order=True) + + hosts_names = ["db_i_001", "db_i_002", "test_cluster", "another_cluster"] + inventory.inventory.add_host.assert_has_calls([call(name, group=group) for name in hosts_names], any_order=True) + + camel_hosts = [ + { + "db_instance_identifier": "db_i_001", + "tags": {"Name": "db_001", "RunningEngine": "mysql"}, + "availability_zone": "us-east-1a", + "region": "us-east-1", + }, + {"db_instance_identifier": "db_i_002", "tags": {"ClusterName": "test_cluster", "RunningOS": "CoreOS"}}, + {"db_cluster_identifier": "test_cluster", "tags": {"CluserVersionOrigin": "2.0", "Provider": "RedHat"}}, + { + "db_cluster_identifier": "another_cluster", + "tags": {"TestingPurpose": "Ansible"}, + "availability_zones": ["us-west-1a", "us-east-1b"], + "region": "us-west-1", + }, + ] + + set_variable_calls = [] + for i in range(len(camel_hosts)): + for var, value in camel_hosts[i].items(): + if hostvars_prefix: + var = _options["hostvars_prefix"] + var + if hostvars_suffix: + var += _options["hostvars_suffix"] + set_variable_calls.append(call(hosts_names[i], var, value)) + + inventory.inventory.set_variable.assert_has_calls(set_variable_calls, any_order=True) + + if hostvars_prefix or hostvars_suffix: + tmp = [] + for host in camel_hosts: + new_host = copy.deepcopy(host) + for key in host: + new_key = key + if hostvars_prefix: + new_key = _options["hostvars_prefix"] + new_key + if hostvars_suffix: + new_key += _options["hostvars_suffix"] + new_host[new_key] = host[key] + tmp.append(new_host) + camel_hosts = tmp + + inventory._set_composite_vars.assert_has_calls( + [ + call(_options["compose"], camel_hosts[i], hosts_names[i], strict=_options["strict"]) + for i in range(len(camel_hosts)) + ], + any_order=True, + ) + inventory._add_host_to_composed_groups.assert_has_calls( + [ + call(_options["groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"]) + for i in range(len(camel_hosts)) + ], + any_order=True, + ) + inventory._add_host_to_keyed_groups.assert_has_calls( + [ + call(_options["keyed_groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"]) + for i in range(len(camel_hosts)) + ], + any_order=True, + ) + + +BASE_INVENTORY_PARSE = "ansible_collections.amazon.aws.plugins.inventory.aws_rds.AWSInventoryBase.parse" + + +@pytest.mark.parametrize("include_clusters", [True, False]) +@pytest.mark.parametrize("filter_db_cluster_id", [True, False]) +@pytest.mark.parametrize("user_cache_directive", [True, False]) +@pytest.mark.parametrize("cache", [True, False]) +@pytest.mark.parametrize("cache_hit", [True, False]) +@patch(BASE_INVENTORY_PARSE) +def test_inventory_parse( + m_parse, inventory, include_clusters, filter_db_cluster_id, user_cache_directive, cache, cache_hit +): + inventory_data = MagicMock() + loader = MagicMock() + path = generate_random_string(with_punctuation=False, with_digits=False) + + options = {} + options["regions"] = [f"us-east-{d}" for d in range(random.randint(1, 5))] + options["strict_permissions"] = random.choice((True, False)) + options["statuses"] = generate_random_string(with_punctuation=False) + options["include_clusters"] = include_clusters + options["filters"] = { + "db-instance-id": [ + f"arn:db:{generate_random_string(with_punctuation=False)}" for i in range(random.randint(1, 10)) + ], + "dbi-resource-id": generate_random_string(with_punctuation=False), + "domain": generate_random_string(with_digits=False, with_punctuation=False), + "engine": generate_random_string(with_digits=False, with_punctuation=False), + } + if filter_db_cluster_id: + options["filters"]["db-cluster-id"] = [ + f"arn:cluster:{generate_random_string(with_punctuation=False)}" for i in range(random.randint(1, 10)) + ] + + options["cache"] = user_cache_directive + + def get_option_side_effect(v): + return options.get(v) + + inventory.get_option.side_effect = get_option_side_effect + + cache_key = path + generate_random_string() + inventory.get_cache_key.return_value = cache_key + + cache_key_value = generate_random_string() + if cache_hit: + inventory._cache[cache_key] = cache_key_value + + inventory._populate = MagicMock() + inventory._populate_from_source = MagicMock() + inventory._get_all_db_hosts = MagicMock() + all_db_hosts = [ + {"host": f"host_{int(random.randint(1, 1000))}"}, + {"host": f"host_{int(random.randint(1, 1000))}"}, + {"host": f"host_{int(random.randint(1, 1000))}"}, + {"host": f"host_{int(random.randint(1, 1000))}"}, + ] + inventory._get_all_db_hosts.return_value = all_db_hosts + + format_cache_key_value = f"format_inventory_{all_db_hosts}" + inventory._format_inventory = MagicMock() + inventory._format_inventory.return_value = format_cache_key_value + + inventory.parse(inventory_data, loader, path, cache) + + m_parse.assert_called_with(inventory_data, loader, path, cache=cache) + + boto3_instance_filters = ansible_dict_to_boto3_filter_list(options["filters"]) + boto3_cluster_filters = [] + if filter_db_cluster_id and include_clusters: + boto3_cluster_filters = ansible_dict_to_boto3_filter_list( + {"db-cluster-id": options["filters"]["db-cluster-id"]} + ) + + if not cache or not user_cache_directive or (cache and user_cache_directive and not cache_hit): + inventory._get_all_db_hosts.assert_called_with( + options["regions"], + boto3_instance_filters, + boto3_cluster_filters, + options["strict_permissions"], + options["statuses"], + include_clusters, + ) + inventory._populate.assert_called_with(all_db_hosts) + inventory._format_inventory.assert_called_with(all_db_hosts) + else: + inventory._get_all_db_hosts.assert_not_called() + inventory._populate.assert_not_called() + inventory._format_inventory.assert_not_called() + + if cache and user_cache_directive and cache_hit: + inventory._populate_from_source.assert_called_with(cache_key_value) + + if cache and user_cache_directive and not cache_hit or (not cache and user_cache_directive): + # validate that cache was populated + assert inventory._cache[cache_key] == format_cache_key_value diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py b/ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py new file mode 100644 index 000000000..2c8260b61 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/lookup/test_secretsmanager_secret.py @@ -0,0 +1,348 @@ +# +# (c) 2024 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import random +from unittest.mock import ANY +from unittest.mock import MagicMock +from unittest.mock import call + +import pytest +from botocore.exceptions import ClientError + +from ansible.errors import AnsibleLookupError + +# from ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret import AnsibleLookupError +from ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret import LookupModule + + +@pytest.fixture +def lookup_plugin(): + lookup = LookupModule() + lookup.params = {} + + lookup.get_option = MagicMock() + + def _get_option(x): + return lookup.params.get(x) + + lookup.get_option.side_effect = _get_option + lookup.client = MagicMock() + + return lookup + + +def pick_from_list(elements=None): + if elements is None: + elements = ["error", "warn", "skip"] + return random.choice(elements) + + +def _raise_boto_clienterror(code, msg): + params = { + "Error": {"Code": code, "Message": msg}, + "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"}, + } + return ClientError(params, "get_secret_value") + + +class TestLookupModuleRun: + @pytest.mark.parametrize( + "params,err", + [ + ({"on_missing": "test"}, '"on_missing" must be a string and one of "error", "warn" or "skip", not test'), + ({"on_denied": "return"}, '"on_denied" must be a string and one of "error", "warn" or "skip", not return'), + ( + {"on_deleted": "delete"}, + '"on_deleted" must be a string and one of "error", "warn" or "skip", not delete', + ), + ( + {"on_missing": ["warn"]}, + '"on_missing" must be a string and one of "error", "warn" or "skip", not [\'warn\']', + ), + ({"on_denied": True}, '"on_denied" must be a string and one of "error", "warn" or "skip", not True'), + ( + {"on_deleted": {"error": True}}, + '"on_deleted" must be a string and one of "error", "warn" or "skip", not {\'error\': True}', + ), + ], + ) + def test_run_invalid_parameters(self, lookup_plugin, mocker, params, err): + aws_lookup_base_run = mocker.patch( + "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret.AWSLookupBase.run" + ) + aws_lookup_base_run.return_value = True + m_list_secrets = mocker.patch( + "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret._list_secrets" + ) + m_list_secrets.return_value = {"SecretList": []} + + lookup_plugin.params = params + with pytest.raises(AnsibleLookupError) as exc_info: + lookup_plugin.run(terms=["testing_secret"], variables=[]) + assert err == str(exc_info.value) + + def test_run_by_path(self, lookup_plugin, mocker): + aws_lookup_base_run = mocker.patch( + "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret.AWSLookupBase.run" + ) + aws_lookup_base_run.return_value = True + m_list_secrets = mocker.patch( + "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret._list_secrets" + ) + secrets_lists = [{"Name": "secret-0"}, {"Name": "secret-1"}, {"Name": "secret-2"}] + m_list_secrets.return_value = [{"SecretList": secrets_lists}] + + params = { + "on_missing": pick_from_list(), + "on_denied": pick_from_list(), + "on_deleted": pick_from_list(), + "bypath": True, + } + lookup_plugin.params = params + + lookup_plugin.get_secret_value = MagicMock() + secrets_values = { + "secret-0": "value-0", + "secret-1": "value-1", + "secret-2": "value-2", + } + lookup_plugin.get_secret_value.side_effect = lambda x, client, **kwargs: secrets_values.get(x) + + secretsmanager_client = MagicMock() + lookup_plugin.client.return_value = secretsmanager_client + + term = "term0" + assert [secrets_values] == lookup_plugin.run(terms=[term], variables=[]) + + m_list_secrets.assert_called_once_with(secretsmanager_client, term) + lookup_plugin.client.assert_called_once_with("secretsmanager", ANY) + lookup_plugin.get_secret_value.assert_has_calls( + [ + call( + "secret-0", + secretsmanager_client, + on_missing=params.get("on_missing"), + on_denied=params.get("on_denied"), + ), + call( + "secret-1", + secretsmanager_client, + on_missing=params.get("on_missing"), + on_denied=params.get("on_denied"), + ), + call( + "secret-2", + secretsmanager_client, + on_missing=params.get("on_missing"), + on_denied=params.get("on_denied"), + ), + ] + ) + + @pytest.mark.parametrize("join_secrets", [True, False]) + @pytest.mark.parametrize( + "terms", [["secret-0"], ["secret-0", "secret-1"], ["secret-0", "secret-1", "secret-0", "secret-2"]] + ) + def test_run(self, lookup_plugin, mocker, join_secrets, terms): + aws_lookup_base_run = mocker.patch( + "ansible_collections.amazon.aws.plugins.lookup.secretsmanager_secret.AWSLookupBase.run" + ) + aws_lookup_base_run.return_value = True + + params = { + "on_missing": pick_from_list(), + "on_denied": pick_from_list(), + "on_deleted": pick_from_list(), + "bypath": False, + "version_stage": MagicMock(), + "version_id": MagicMock(), + "nested": pick_from_list([True, False]), + "join": join_secrets, + } + lookup_plugin.params = params + + lookup_plugin.get_secret_value = MagicMock() + secrets_values = { + "secret-0": "value-0", + "secret-1": "value-1", + } + lookup_plugin.get_secret_value.side_effect = lambda x, client, **kwargs: secrets_values.get(x) + + secretsmanager_client = MagicMock() + lookup_plugin.client.return_value = secretsmanager_client + + expected_secrets = [secrets_values.get(x) for x in terms if secrets_values.get(x) is not None] + if join_secrets: + expected_secrets = ["".join(expected_secrets)] + + assert expected_secrets == lookup_plugin.run(terms=terms, variables=[]) + + lookup_plugin.client.assert_called_once_with("secretsmanager", ANY) + lookup_plugin.get_secret_value.assert_has_calls( + [ + call( + x, + secretsmanager_client, + version_stage=params.get("version_stage"), + version_id=params.get("version_id"), + on_missing=params.get("on_missing"), + on_denied=params.get("on_denied"), + on_deleted=params.get("on_deleted"), + nested=params.get("nested"), + ) + for x in terms + ] + ) + + +class TestLookupModuleGetSecretValue: + def test_get_secret__invalid_nested_value(self, lookup_plugin): + params = { + "version_stage": MagicMock(), + "version_id": MagicMock(), + "on_missing": None, + "on_denied": None, + "on_deleted": None, + } + with pytest.raises(AnsibleLookupError) as exc_info: + client = MagicMock() + lookup_plugin.get_secret_value("aws_invalid_nested_secret", client, nested=True, **params) + assert "Nested query must use the following syntax: `aws_secret_name.<key_name>.<key_name>" == str( + exc_info.value + ) + + @pytest.mark.parametrize("versionId", [None, MagicMock()]) + @pytest.mark.parametrize("versionStage", [None, MagicMock()]) + @pytest.mark.parametrize( + "term,nested,secretId", + [ + ("secret0", False, "secret0"), + ("secret0.child", False, "secret0.child"), + ("secret0.child", True, "secret0"), + ("secret0.root.child", False, "secret0.root.child"), + ("secret0.root.child", True, "secret0"), + ], + ) + def test_get_secret__binary_secret(self, lookup_plugin, versionId, versionStage, term, nested, secretId): + params = { + "version_stage": versionStage, + "version_id": versionId, + "on_missing": None, + "on_denied": None, + "on_deleted": None, + } + + client = MagicMock() + client.get_secret_value = MagicMock() + bin_secret_value = b"binary_value" + client.get_secret_value.return_value = {"SecretBinary": bin_secret_value} + + assert bin_secret_value == lookup_plugin.get_secret_value(term, client, nested=nested, **params) + api_params = {"SecretId": secretId} + if versionId is not None: + api_params["VersionId"] = versionId + if versionStage: + api_params["VersionStage"] = versionStage + client.get_secret_value.assert_called_once_with(aws_retry=True, **api_params) + + @pytest.mark.parametrize("on_missing", ["warn", "error"]) + @pytest.mark.parametrize( + "term,missing_key", + [ + ("secret_name.root.child1", "root.child1"), + ("secret_name.root.child1.nested", "root.child1"), + ("secret_name.root.child.nested1", "root.child.nested1"), + ("secret_name.root.child.nested.value", "root.child.nested.value"), + ], + ) + def test_get_secret__missing_nested_secret(self, lookup_plugin, on_missing, term, missing_key): + client = MagicMock() + client.get_secret_value = MagicMock() + json_secret = '{"root": {"child": {"nested": "ansible-test-secret-0"}}}' + client.get_secret_value.return_value = {"SecretString": json_secret} + + if on_missing == "error": + with pytest.raises(AnsibleLookupError) as exc_info: + lookup_plugin.get_secret_value(term, client, nested=True, on_missing=on_missing) + assert f"Successfully retrieved secret but there exists no key {missing_key} in the secret" == str( + exc_info.value + ) + else: + lookup_plugin._display = MagicMock() + lookup_plugin._display.warning = MagicMock() + assert lookup_plugin.get_secret_value(term, client, nested=True, on_missing=on_missing) is None + lookup_plugin._display.warning.assert_called_once_with( + f"Skipping, Successfully retrieved secret but there exists no key {missing_key} in the secret" + ) + + def test_get_secret__missing_secret(self, lookup_plugin): + client = MagicMock() + client.get_secret_value = MagicMock() + client.get_secret_value.side_effect = _raise_boto_clienterror("UnexpecteError", "unable to retrieve Secret") + + with pytest.raises(AnsibleLookupError) as exc_info: + lookup_plugin.get_secret_value(MagicMock(), client) + assert ( + "Failed to retrieve secret: An error occurred (UnexpecteError) when calling the get_secret_value operation: unable to retrieve Secret" + == str(exc_info.value) + ) + + @pytest.mark.parametrize("on_denied", ["warn", "error"]) + def test_get_secret__on_denied(self, lookup_plugin, on_denied): + client = MagicMock() + client.get_secret_value = MagicMock() + client.get_secret_value.side_effect = _raise_boto_clienterror( + "AccessDeniedException", "Access denied to Secret" + ) + term = "ansible-test-secret-0123" + + if on_denied == "error": + with pytest.raises(AnsibleLookupError) as exc_info: + lookup_plugin.get_secret_value(term, client, on_denied=on_denied) + assert f"Failed to access secret {term} (AccessDenied)" == str(exc_info.value) + else: + lookup_plugin._display = MagicMock() + lookup_plugin._display.warning = MagicMock() + assert lookup_plugin.get_secret_value(term, client, on_denied=on_denied) is None + lookup_plugin._display.warning.assert_called_once_with(f"Skipping, access denied for secret {term}") + + @pytest.mark.parametrize("on_missing", ["warn", "error"]) + def test_get_secret__on_missing(self, lookup_plugin, on_missing): + client = MagicMock() + client.get_secret_value = MagicMock() + client.get_secret_value.side_effect = _raise_boto_clienterror("ResourceNotFoundException", "secret not found") + term = "ansible-test-secret-4561" + + if on_missing == "error": + with pytest.raises(AnsibleLookupError) as exc_info: + lookup_plugin.get_secret_value(term, client, on_missing=on_missing) + assert f"Failed to find secret {term} (ResourceNotFound)" == str(exc_info.value) + else: + lookup_plugin._display = MagicMock() + lookup_plugin._display.warning = MagicMock() + assert lookup_plugin.get_secret_value(term, client, on_missing=on_missing) is None + lookup_plugin._display.warning.assert_called_once_with(f"Skipping, did not find secret {term}") + + @pytest.mark.parametrize("on_deleted", ["warn", "error"]) + def test_get_secret__on_deleted(self, lookup_plugin, on_deleted): + client = MagicMock() + client.get_secret_value = MagicMock() + client.get_secret_value.side_effect = _raise_boto_clienterror( + "ResourceMarkedForDeletion", "marked for deletion" + ) + term = "ansible-test-secret-8790" + + if on_deleted == "error": + with pytest.raises(AnsibleLookupError) as exc_info: + lookup_plugin.get_secret_value(term, client, on_deleted=on_deleted) + assert f"Failed to find secret {term} (marked for deletion)" == str(exc_info.value) + else: + lookup_plugin._display = MagicMock() + lookup_plugin._display.warning = MagicMock() + assert lookup_plugin.get_secret_value(term, client, on_deleted=on_deleted) is None + lookup_plugin._display.warning.assert_called_once_with( + f"Skipping, did not find secret (marked for deletion) {term}" + ) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py index a7d1e0475..7a870163c 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/conftest.py @@ -1,16 +1,13 @@ # Copyright (c) 2017 Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - import json import pytest -from ansible.module_utils.six import string_types from ansible.module_utils._text import to_bytes from ansible.module_utils.common._collections_compat import MutableMapping +from ansible.module_utils.six import string_types @pytest.fixture @@ -18,14 +15,14 @@ def patch_ansible_module(request, mocker): if isinstance(request.param, string_types): args = request.param elif isinstance(request.param, MutableMapping): - if 'ANSIBLE_MODULE_ARGS' not in request.param: - request.param = {'ANSIBLE_MODULE_ARGS': request.param} - if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']: - request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp' - if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']: - request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False + if "ANSIBLE_MODULE_ARGS" not in request.param: + request.param = {"ANSIBLE_MODULE_ARGS": request.param} + if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"]["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"]["_ansible_keep_remote_files"] = False args = json.dumps(request.param) else: - raise Exception('Malformed data to the patch_ansible_module pytest fixture') + raise Exception("Malformed data to the patch_ansible_module pytest fixture") - mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args)) + mocker.patch("ansible.module_utils.basic._ANSIBLE_ARGS", to_bytes(args)) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py new file mode 100644 index 000000000..0afeab56a --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_eip/test_check_is_instance.py @@ -0,0 +1,65 @@ +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import pytest + +from ansible_collections.amazon.aws.plugins.modules import ec2_eip + +EXAMPLE_DATA = [ + ( + None, + True, + False, + ), + ( + None, + False, + False, + ), + ( + "", + True, + False, + ), + ( + "", + False, + False, + ), + ( + "i-123456789", + True, + True, + ), + ( + "i-123456789", + False, + True, + ), + ( + "eni-123456789", + True, + False, + ), + ( + "junk", + True, + False, + ), + ( + "junk", + False, + False, + ), +] + + +def test_check_is_instance_needs_in_vpc(): + with pytest.raises(ec2_eip.EipError): + ec2_eip.check_is_instance("eni-123456789", False) + + +@pytest.mark.parametrize("device,in_vpc,expected", EXAMPLE_DATA) +def test_check_is_instance(device, in_vpc, expected): + result = ec2_eip.check_is_instance(device, in_vpc) + assert result is expected diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py index e889b676a..a64c16961 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_build_run_instance_spec.py @@ -3,23 +3,21 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +from unittest.mock import sentinel import pytest -from ansible_collections.amazon.aws.tests.unit.compat.mock import sentinel import ansible_collections.amazon.aws.plugins.modules.ec2_instance as ec2_instance_module @pytest.fixture def params_object(): params = { - 'iam_instance_profile': None, - 'exact_count': None, - 'count': None, - 'launch_template': None, - 'instance_type': None, + "iam_instance_profile": None, + "exact_count": None, + "count": None, + "launch_template": None, + "instance_type": sentinel.INSTANCE_TYPE, } return params @@ -29,11 +27,13 @@ def ec2_instance(monkeypatch): # monkey patches various ec2_instance module functions, we'll separately test the operation of # these functions, we just care that it's passing the results into the right place in the # instance spec. - monkeypatch.setattr(ec2_instance_module, 'build_top_level_options', lambda params: {'TOP_LEVEL_OPTIONS': sentinel.TOP_LEVEL}) - monkeypatch.setattr(ec2_instance_module, 'build_network_spec', lambda params: sentinel.NETWORK_SPEC) - monkeypatch.setattr(ec2_instance_module, 'build_volume_spec', lambda params: sentinel.VOlUME_SPEC) - monkeypatch.setattr(ec2_instance_module, 'build_instance_tags', lambda params: sentinel.TAG_SPEC) - monkeypatch.setattr(ec2_instance_module, 'determine_iam_role', lambda params: sentinel.IAM_PROFILE_ARN) + monkeypatch.setattr( + ec2_instance_module, "build_top_level_options", lambda params: {"TOP_LEVEL_OPTIONS": sentinel.TOP_LEVEL} + ) + monkeypatch.setattr(ec2_instance_module, "build_network_spec", lambda params: sentinel.NETWORK_SPEC) + monkeypatch.setattr(ec2_instance_module, "build_volume_spec", lambda params: sentinel.VOlUME_SPEC) + monkeypatch.setattr(ec2_instance_module, "build_instance_tags", lambda params: sentinel.TAG_SPEC) + monkeypatch.setattr(ec2_instance_module, "determine_iam_role", lambda params: sentinel.IAM_PROFILE_ARN) return ec2_instance_module @@ -43,33 +43,37 @@ def _assert_defaults(instance_spec, to_skip=None): assert isinstance(instance_spec, dict) - if 'TagSpecifications' not in to_skip: - assert 'TagSpecifications' in instance_spec - assert instance_spec['TagSpecifications'] is sentinel.TAG_SPEC + if "TagSpecifications" not in to_skip: + assert "TagSpecifications" in instance_spec + assert instance_spec["TagSpecifications"] is sentinel.TAG_SPEC - if 'NetworkInterfaces' not in to_skip: - assert 'NetworkInterfaces' in instance_spec - assert instance_spec['NetworkInterfaces'] is sentinel.NETWORK_SPEC + if "NetworkInterfaces" not in to_skip: + assert "NetworkInterfaces" in instance_spec + assert instance_spec["NetworkInterfaces"] is sentinel.NETWORK_SPEC - if 'BlockDeviceMappings' not in to_skip: - assert 'BlockDeviceMappings' in instance_spec - assert instance_spec['BlockDeviceMappings'] is sentinel.VOlUME_SPEC + if "BlockDeviceMappings" not in to_skip: + assert "BlockDeviceMappings" in instance_spec + assert instance_spec["BlockDeviceMappings"] is sentinel.VOlUME_SPEC - if 'IamInstanceProfile' not in to_skip: + if "IamInstanceProfile" not in to_skip: # By default, this shouldn't be returned - assert 'IamInstanceProfile' not in instance_spec + assert "IamInstanceProfile" not in instance_spec - if 'MinCount' not in to_skip: - assert 'MinCount' in instance_spec - instance_spec['MinCount'] == 1 + if "MinCount" not in to_skip: + assert "MinCount" in instance_spec + instance_spec["MinCount"] == 1 - if 'MaxCount' not in to_skip: - assert 'MaxCount' in instance_spec - instance_spec['MaxCount'] == 1 + if "MaxCount" not in to_skip: + assert "MaxCount" in instance_spec + instance_spec["MaxCount"] == 1 - if 'TOP_LEVEL_OPTIONS' not in to_skip: - assert 'TOP_LEVEL_OPTIONS' in instance_spec - assert instance_spec['TOP_LEVEL_OPTIONS'] is sentinel.TOP_LEVEL + if "TOP_LEVEL_OPTIONS" not in to_skip: + assert "TOP_LEVEL_OPTIONS" in instance_spec + assert instance_spec["TOP_LEVEL_OPTIONS"] is sentinel.TOP_LEVEL + + if "InstanceType" not in to_skip: + assert "InstanceType" in instance_spec + instance_spec["InstanceType"] == sentinel.INSTANCE_TYPE def test_build_run_instance_spec_defaults(params_object, ec2_instance): @@ -77,50 +81,72 @@ def test_build_run_instance_spec_defaults(params_object, ec2_instance): _assert_defaults(instance_spec) +def test_build_run_instance_spec_type_required(params_object, ec2_instance): + params_object["instance_type"] = None + params_object["launch_template"] = None + # Test that we throw an Ec2InstanceAWSError if passed neither + with pytest.raises(ec2_instance.Ec2InstanceAWSError): + instance_spec = ec2_instance.build_run_instance_spec(params_object) + + # Test that instance_type can be None if launch_template is set + params_object["launch_template"] = sentinel.LAUNCH_TEMPLATE + instance_spec = ec2_instance.build_run_instance_spec(params_object) + _assert_defaults(instance_spec, ["InstanceType"]) + assert "InstanceType" not in instance_spec + + def test_build_run_instance_spec_tagging(params_object, ec2_instance, monkeypatch): # build_instance_tags can return None, RunInstance doesn't like this - monkeypatch.setattr(ec2_instance_module, 'build_instance_tags', lambda params: None) + monkeypatch.setattr(ec2_instance_module, "build_instance_tags", lambda params: None) instance_spec = ec2_instance.build_run_instance_spec(params_object) - _assert_defaults(instance_spec, ['TagSpecifications']) - assert 'TagSpecifications' not in instance_spec + _assert_defaults(instance_spec, ["TagSpecifications"]) + assert "TagSpecifications" not in instance_spec # if someone *explicitly* passes {} (rather than not setting it), then [] can be returned - monkeypatch.setattr(ec2_instance_module, 'build_instance_tags', lambda params: []) + monkeypatch.setattr(ec2_instance_module, "build_instance_tags", lambda params: []) instance_spec = ec2_instance.build_run_instance_spec(params_object) - _assert_defaults(instance_spec, ['TagSpecifications']) - assert 'TagSpecifications' in instance_spec - assert instance_spec['TagSpecifications'] == [] + _assert_defaults(instance_spec, ["TagSpecifications"]) + assert "TagSpecifications" in instance_spec + assert instance_spec["TagSpecifications"] == [] def test_build_run_instance_spec_instance_profile(params_object, ec2_instance): - params_object['iam_instance_profile'] = sentinel.INSTANCE_PROFILE_NAME + params_object["iam_instance_profile"] = sentinel.INSTANCE_PROFILE_NAME instance_spec = ec2_instance.build_run_instance_spec(params_object) - _assert_defaults(instance_spec, ['IamInstanceProfile']) - assert 'IamInstanceProfile' in instance_spec - assert instance_spec['IamInstanceProfile'] == {'Arn': sentinel.IAM_PROFILE_ARN} + _assert_defaults(instance_spec, ["IamInstanceProfile"]) + assert "IamInstanceProfile" in instance_spec + assert instance_spec["IamInstanceProfile"] == {"Arn": sentinel.IAM_PROFILE_ARN} def test_build_run_instance_spec_count(params_object, ec2_instance): # When someone passes 'count', that number of instances will be *launched* - params_object['count'] = sentinel.COUNT + params_object["count"] = sentinel.COUNT instance_spec = ec2_instance.build_run_instance_spec(params_object) - _assert_defaults(instance_spec, ['MaxCount', 'MinCount']) - assert 'MaxCount' in instance_spec - assert 'MinCount' in instance_spec - assert instance_spec['MaxCount'] == sentinel.COUNT - assert instance_spec['MinCount'] == sentinel.COUNT + _assert_defaults(instance_spec, ["MaxCount", "MinCount"]) + assert "MaxCount" in instance_spec + assert "MinCount" in instance_spec + assert instance_spec["MaxCount"] == sentinel.COUNT + assert instance_spec["MinCount"] == sentinel.COUNT def test_build_run_instance_spec_exact_count(params_object, ec2_instance): # The "exact_count" logic relies on enforce_count doing the math to figure out how many # instances to start/stop. The enforce_count call is responsible for ensuring that 'to_launch' # is set and is a positive integer. - params_object['exact_count'] = sentinel.EXACT_COUNT - params_object['to_launch'] = sentinel.TO_LAUNCH + params_object["exact_count"] = 42 + params_object["to_launch"] = sentinel.TO_LAUNCH instance_spec = ec2_instance.build_run_instance_spec(params_object) - _assert_defaults(instance_spec, ['MaxCount', 'MinCount']) - assert 'MaxCount' in instance_spec - assert 'MinCount' in instance_spec - assert instance_spec['MaxCount'] == sentinel.TO_LAUNCH - assert instance_spec['MinCount'] == sentinel.TO_LAUNCH + _assert_defaults(instance_spec, ["MaxCount", "MinCount"]) + assert "MaxCount" in instance_spec + assert "MinCount" in instance_spec + assert instance_spec["MaxCount"] == 42 + assert instance_spec["MinCount"] == 42 + + instance_spec = ec2_instance.build_run_instance_spec(params_object, 7) + + _assert_defaults(instance_spec, ["MaxCount", "MinCount"]) + assert "MaxCount" in instance_spec + assert "MinCount" in instance_spec + assert instance_spec["MaxCount"] == 35 + assert instance_spec["MinCount"] == 35 diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py index cdde74c97..7645d5559 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_instance/test_determine_iam_role.py @@ -3,16 +3,14 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +import sys +from unittest.mock import MagicMock +from unittest.mock import sentinel import pytest -import sys -from ansible_collections.amazon.aws.tests.unit.compat.mock import MagicMock -from ansible_collections.amazon.aws.tests.unit.compat.mock import sentinel -import ansible_collections.amazon.aws.plugins.modules.ec2_instance as ec2_instance_module import ansible_collections.amazon.aws.plugins.module_utils.arn as utils_arn +import ansible_collections.amazon.aws.plugins.modules.ec2_instance as ec2_instance_module from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 try: @@ -20,24 +18,29 @@ try: except ImportError: pass -pytest.mark.skipif(not HAS_BOTO3, reason="test_determine_iam_role.py requires the python modules 'boto3' and 'botocore'") +pytest.mark.skipif( + not HAS_BOTO3, reason="test_determine_iam_role.py requires the python modules 'boto3' and 'botocore'" +) -def _client_error(code='GenericError'): +def _client_error(code="GenericError"): return botocore.exceptions.ClientError( - {'Error': {'Code': code, 'Message': 'Something went wrong'}, - 'ResponseMetadata': {'RequestId': '01234567-89ab-cdef-0123-456789abcdef'}}, - 'some_called_method') + { + "Error": {"Code": code, "Message": "Something went wrong"}, + "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"}, + }, + "some_called_method", + ) @pytest.fixture def params_object(): params = { - 'instance_role': None, - 'exact_count': None, - 'count': None, - 'launch_template': None, - 'instance_type': None, + "instance_role": None, + "exact_count": None, + "count": None, + "launch_template": None, + "instance_type": None, } return params @@ -49,8 +52,8 @@ class FailJsonException(Exception): @pytest.fixture def ec2_instance(monkeypatch): - monkeypatch.setattr(ec2_instance_module, 'parse_aws_arn', lambda arn: None) - monkeypatch.setattr(ec2_instance_module, 'module', MagicMock()) + monkeypatch.setattr(ec2_instance_module, "validate_aws_arn", lambda arn, service, resource_type: None) + monkeypatch.setattr(ec2_instance_module, "module", MagicMock()) ec2_instance_module.module.fail_json.side_effect = FailJsonException() ec2_instance_module.module.fail_json_aws.side_effect = FailJsonException() return ec2_instance_module @@ -58,15 +61,15 @@ def ec2_instance(monkeypatch): def test_determine_iam_role_arn(params_object, ec2_instance, monkeypatch): # Revert the default monkey patch to make it simple to try passing a valid ARNs - monkeypatch.setattr(ec2_instance, 'parse_aws_arn', utils_arn.parse_aws_arn) + monkeypatch.setattr(ec2_instance, "validate_aws_arn", utils_arn.validate_aws_arn) # Simplest example, someone passes a valid instance profile ARN - arn = ec2_instance.determine_iam_role('arn:aws:iam::123456789012:instance-profile/myprofile') - assert arn == 'arn:aws:iam::123456789012:instance-profile/myprofile' + arn = ec2_instance.determine_iam_role("arn:aws:iam::123456789012:instance-profile/myprofile") + assert arn == "arn:aws:iam::123456789012:instance-profile/myprofile" def test_determine_iam_role_name(params_object, ec2_instance): - profile_description = {'InstanceProfile': {'Arn': sentinel.IAM_PROFILE_ARN}} + profile_description = {"InstanceProfile": {"Arn": sentinel.IAM_PROFILE_ARN}} iam_client = MagicMock(**{"get_instance_profile.return_value": profile_description}) ec2_instance_module.module.client.return_value = iam_client @@ -75,28 +78,28 @@ def test_determine_iam_role_name(params_object, ec2_instance): def test_determine_iam_role_missing(params_object, ec2_instance): - missing_exception = _client_error('NoSuchEntity') + missing_exception = _client_error("NoSuchEntity") iam_client = MagicMock(**{"get_instance_profile.side_effect": missing_exception}) ec2_instance_module.module.client.return_value = iam_client - with pytest.raises(FailJsonException) as exception: - arn = ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME) + with pytest.raises(FailJsonException): + ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME) assert ec2_instance_module.module.fail_json_aws.call_count == 1 assert ec2_instance_module.module.fail_json_aws.call_args.args[0] is missing_exception - assert 'Could not find' in ec2_instance_module.module.fail_json_aws.call_args.kwargs['msg'] + assert "Could not find" in ec2_instance_module.module.fail_json_aws.call_args.kwargs["msg"] -@pytest.mark.skipif(sys.version_info < (3, 8), reason='call_args behaviour changed in Python 3.8') +@pytest.mark.skipif(sys.version_info < (3, 8), reason="call_args behaviour changed in Python 3.8") def test_determine_iam_role_missing(params_object, ec2_instance): missing_exception = _client_error() iam_client = MagicMock(**{"get_instance_profile.side_effect": missing_exception}) ec2_instance_module.module.client.return_value = iam_client - with pytest.raises(FailJsonException) as exception: - arn = ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME) + with pytest.raises(FailJsonException): + ec2_instance.determine_iam_role(sentinel.IAM_PROFILE_NAME) assert ec2_instance_module.module.fail_json_aws.call_count == 1 assert ec2_instance_module.module.fail_json_aws.call_args.args[0] is missing_exception - assert 'An error occurred while searching' in ec2_instance_module.module.fail_json_aws.call_args.kwargs['msg'] - assert 'Please try supplying the full ARN' in ec2_instance_module.module.fail_json_aws.call_args.kwargs['msg'] + assert "An error occurred while searching" in ec2_instance_module.module.fail_json_aws.call_args.kwargs["msg"] + assert "Please try supplying the full ARN" in ec2_instance_module.module.fail_json_aws.call_args.kwargs["msg"] diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py new file mode 100644 index 000000000..1abfd526c --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_expand_rules.py @@ -0,0 +1,240 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import sys +from unittest.mock import sentinel + +import pytest + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + +PORT_EXPANSION = [ + ({"from_port": 83}, ({"from_port": 83, "to_port": None},)), + ({"to_port": 36}, ({"from_port": None, "to_port": 36},)), + ({"icmp_type": 90}, ({"from_port": 90, "to_port": None},)), + ({"icmp_type": 74, "icmp_code": 66}, ({"from_port": 74, "to_port": 66},)), + # Note: ports is explicitly a list of strings because we support "<port a>-<port b>" + ({"ports": ["1"]}, ({"from_port": 1, "to_port": 1},)), + ({"ports": ["41-85"]}, ({"from_port": 41, "to_port": 85},)), + ( + {"ports": ["63", "74"]}, + ( + {"from_port": 63, "to_port": 63}, + {"from_port": 74, "to_port": 74}, + ), + ), + ( + {"ports": ["97-30", "41-80"]}, + ( + {"from_port": 30, "to_port": 97}, + {"from_port": 41, "to_port": 80}, + ), + ), + ( + {"ports": ["95", "67-79"]}, + ( + {"from_port": 95, "to_port": 95}, + {"from_port": 67, "to_port": 79}, + ), + ), + # There are legitimate cases with no port info + ({}, ({},)), +] +PORTS_EXPANSION = [ + (["28"], [(28, 28)]), + (["80-83"], [(80, 83)]), + # We tolerate the order being backwards + (["83-80"], [(80, 83)]), + (["41", "1"], [(41, 41), (1, 1)]), + (["70", "39-0"], [(70, 70), (0, 39)]), + (["57-6", "31"], [(6, 57), (31, 31)]), + # https://github.com/ansible-collections/amazon.aws/pull/1241 + (["-1"], [(-1, -1)]), +] +SOURCE_EXPANSION = [ + ( + {"cidr_ip": ["192.0.2.0/24"]}, + ({"cidr_ip": "192.0.2.0/24"},), + ), + ( + {"cidr_ipv6": ["2001:db8::/32"]}, + ({"cidr_ipv6": "2001:db8::/32"},), + ), + ( + {"group_id": ["sg-123456789"]}, + ({"group_id": "sg-123456789"},), + ), + ( + {"group_name": ["MyExampleGroupName"]}, + ({"group_name": "MyExampleGroupName"},), + ), + ( + {"ip_prefix": ["pl-123456abcde123456"]}, + ({"ip_prefix": "pl-123456abcde123456"},), + ), + ( + {"cidr_ip": ["192.0.2.0/24", "198.51.100.0/24"]}, + ( + {"cidr_ip": "192.0.2.0/24"}, + {"cidr_ip": "198.51.100.0/24"}, + ), + ), + ( + {"cidr_ipv6": ["2001:db8::/32", "100::/64"]}, + ( + {"cidr_ipv6": "2001:db8::/32"}, + {"cidr_ipv6": "100::/64"}, + ), + ), + ( + {"group_id": ["sg-123456789", "sg-abcdef1234"]}, + ( + {"group_id": "sg-123456789"}, + {"group_id": "sg-abcdef1234"}, + ), + ), + ( + {"group_name": ["MyExampleGroupName", "AnotherExample"]}, + ( + {"group_name": "MyExampleGroupName"}, + {"group_name": "AnotherExample"}, + ), + ), + ( + {"ip_prefix": ["pl-123456abcde123456", "pl-abcdef12345abcdef"]}, + ({"ip_prefix": "pl-123456abcde123456"}, {"ip_prefix": "pl-abcdef12345abcdef"}), + ), + ( + { + "cidr_ip": ["192.0.2.0/24"], + "cidr_ipv6": ["2001:db8::/32"], + "group_id": ["sg-123456789"], + "group_name": ["MyExampleGroupName"], + "ip_prefix": ["pl-123456abcde123456"], + }, + ( + {"cidr_ip": "192.0.2.0/24"}, + {"cidr_ipv6": "2001:db8::/32"}, + {"group_id": "sg-123456789"}, + {"group_name": "MyExampleGroupName"}, + {"ip_prefix": "pl-123456abcde123456"}, + ), + ), + ( + { + "cidr_ip": ["192.0.2.0/24", "198.51.100.0/24"], + "cidr_ipv6": ["2001:db8::/32", "100::/64"], + "group_id": ["sg-123456789", "sg-abcdef1234"], + "group_name": ["MyExampleGroupName", "AnotherExample"], + "ip_prefix": ["pl-123456abcde123456", "pl-abcdef12345abcdef"], + }, + ( + {"cidr_ip": "192.0.2.0/24"}, + {"cidr_ip": "198.51.100.0/24"}, + {"cidr_ipv6": "2001:db8::/32"}, + {"cidr_ipv6": "100::/64"}, + {"group_id": "sg-123456789"}, + {"group_id": "sg-abcdef1234"}, + {"group_name": "MyExampleGroupName"}, + {"group_name": "AnotherExample"}, + {"ip_prefix": "pl-123456abcde123456"}, + {"ip_prefix": "pl-abcdef12345abcdef"}, + ), + ), +] + +RULE_EXPANSION = [ + ( + {"ports": ["24"], "cidr_ip": ["192.0.2.0/24"], "sentinel": sentinel.RULE_VALUE}, + [ + {"from_port": 24, "to_port": 24, "cidr_ip": "192.0.2.0/24", "sentinel": sentinel.RULE_VALUE}, + ], + ), + ( + {"ports": ["24", "50"], "cidr_ip": ["192.0.2.0/24", "198.51.100.0/24"], "sentinel": sentinel.RULE_VALUE}, + [ + {"from_port": 24, "to_port": 24, "cidr_ip": "192.0.2.0/24", "sentinel": sentinel.RULE_VALUE}, + {"from_port": 24, "to_port": 24, "cidr_ip": "198.51.100.0/24", "sentinel": sentinel.RULE_VALUE}, + {"from_port": 50, "to_port": 50, "cidr_ip": "192.0.2.0/24", "sentinel": sentinel.RULE_VALUE}, + {"from_port": 50, "to_port": 50, "cidr_ip": "198.51.100.0/24", "sentinel": sentinel.RULE_VALUE}, + ], + ), +] + + +@pytest.mark.parametrize("rule, expected", PORT_EXPANSION) +def test_expand_ports_from_rule(rule, expected): + assert ec2_security_group_module.expand_ports_from_rule(rule) == expected + + # We shouldn't care about extra values lurking in the rule definition + rule["junk"] = sentinel.EXTRA_JUNK + assert ec2_security_group_module.expand_ports_from_rule(rule) == expected + + +@pytest.mark.parametrize("rule, expected", SOURCE_EXPANSION) +def test_expand_sources_from_rule(rule, expected): + assert ec2_security_group_module.expand_sources_from_rule(rule) == expected + + # We shouldn't care about extra values lurking in the rule definition + rule["junk"] = sentinel.EXTRA_JUNK + assert ec2_security_group_module.expand_sources_from_rule(rule) == expected + + +@pytest.mark.parametrize("rule, expected", PORTS_EXPANSION) +def test_expand_ports_list(rule, expected): + assert ec2_security_group_module.expand_ports_list(rule) == expected + + +@pytest.mark.skipif( + sys.version_info < (3, 7), + reason="requires Python 3.7 or higher - sentinel doesn't behave well with deepcopy in Python 3.6", +) +@pytest.mark.parametrize("source_type", sorted(ec2_security_group_module.SOURCE_TYPES_ALL)) +def test_strip_rule_source(source_type): + rule = {source_type: sentinel.SOURCE_VALUE} + assert ec2_security_group_module._strip_rule(rule) == {} + assert rule == {source_type: sentinel.SOURCE_VALUE} + + rule = {source_type: sentinel.SOURCE_VALUE, "sentinel": sentinel.SENTINEL_VALUE} + assert ec2_security_group_module._strip_rule(rule) == {"sentinel": sentinel.SENTINEL_VALUE} + assert rule == {source_type: sentinel.SOURCE_VALUE, "sentinel": sentinel.SENTINEL_VALUE} + + +@pytest.mark.skipif( + sys.version_info < (3, 7), + reason="requires Python 3.7 or higher - sentinel doesn't behave well with deepcopy in Python 3.6", +) +@pytest.mark.parametrize("port_type", sorted(ec2_security_group_module.PORT_TYPES_ALL)) +def test_strip_rule_port(port_type): + rule = {port_type: sentinel.PORT_VALUE} + assert ec2_security_group_module._strip_rule(rule) == {} + assert rule == {port_type: sentinel.PORT_VALUE} + + rule = {port_type: sentinel.PORT_VALUE, "sentinel": sentinel.SENTINEL_VALUE} + assert ec2_security_group_module._strip_rule(rule) == {"sentinel": sentinel.SENTINEL_VALUE} + assert rule == {port_type: sentinel.PORT_VALUE, "sentinel": sentinel.SENTINEL_VALUE} + + +@pytest.mark.skipif( + sys.version_info < (3, 7), + reason="requires Python 3.7 or higher - sentinel doesn't behave well with deepcopy in Python 3.6", +) +@pytest.mark.parametrize("rule, expected", RULE_EXPANSION) +def test_rule_expand(rule, expected): + assert ec2_security_group_module.expand_rule(rule) == expected + + +########################################################## +# Examples where we explicitly expect to raise an exception + + +def test_expand_ports_list_bad(): + with pytest.raises(ec2_security_group_module.SecurityGroupError): + ec2_security_group_module.expand_ports_list(["junk"]) + + +def test_expand_sources_from_rule_bad(): + with pytest.raises(ec2_security_group_module.SecurityGroupError): + ec2_security_group_module.expand_sources_from_rule(dict()) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py new file mode 100644 index 000000000..358512a00 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_formatting.py @@ -0,0 +1,239 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import sentinel + +import pytest + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + +SORT_ORDER = [ + (dict(), dict()), + ( + dict(ip_permissions=[], ip_permissions_egress=[]), + dict(ip_permissions=[], ip_permissions_egress=[]), + ), + ( + dict( + ip_permissions=[ + dict( + ip_protocol="tcp", + ip_ranges=[], + ipv6_ranges=[ + dict(cidr_ipv6="2001:DB8:8000::/34"), + dict(cidr_ipv6="2001:DB8:4000::/34"), + ], + prefix_list_ids=[], + user_id_group_pairs=[], + ), + dict( + ip_protocol="-1", + ip_ranges=[ + dict(cidr_ip="198.51.100.0/24"), + dict(cidr_ip="192.0.2.0/24"), + ], + ipv6_ranges=[], + prefix_list_ids=[], + user_id_group_pairs=[], + ), + dict( + from_port="22", + ip_ranges=[], + ipv6_ranges=[], + prefix_list_ids=[], + to_port="22", + user_id_group_pairs=[ + dict(group_id="sg-3950599b", user_id="123456789012"), + dict(group_id="sg-fbfd1e3a", user_id="012345678901"), + dict(group_id="sg-00ec640f", user_id="012345678901"), + ], + ), + dict( + from_port=38, + ip_protocol="tcp", + ip_ranges=[], + ipv6_ranges=[], + prefix_list_ids=[ + dict(prefix_list_id="pl-2263adef"), + dict(prefix_list_id="pl-0a5fccee"), + dict(prefix_list_id="pl-65911ba9"), + ], + to_port=38, + user_id_group_pairs=[], + ), + ], + ip_permissions_egress=[ + dict( + ip_protocol="-1", + ip_ranges=[ + dict(cidr_ip="198.51.100.0/24"), + dict(cidr_ip="192.0.2.0/24"), + ], + ipv6_ranges=[], + prefix_list_ids=[], + user_id_group_pairs=[], + ), + dict( + from_port=443, + ip_protocol="tcp", + ip_ranges=[], + ipv6_ranges=[], + prefix_list_ids=[], + to_port=443, + user_id_group_pairs=[ + dict(group_id="sg-fbfd1e3a", user_id="012345678901"), + dict(group_id="sg-00ec640f", user_id="012345678901"), + ], + ), + ], + ), + dict( + ip_permissions=[ + dict( + ip_protocol="-1", + ip_ranges=[ + dict(cidr_ip="192.0.2.0/24"), + dict(cidr_ip="198.51.100.0/24"), + ], + ipv6_ranges=[], + prefix_list_ids=[], + user_id_group_pairs=[], + ), + dict( + ip_protocol="tcp", + ip_ranges=[], + ipv6_ranges=[ + dict(cidr_ipv6="2001:DB8:4000::/34"), + dict(cidr_ipv6="2001:DB8:8000::/34"), + ], + prefix_list_ids=[], + user_id_group_pairs=[], + ), + dict( + from_port=38, + ip_protocol="tcp", + ip_ranges=[], + ipv6_ranges=[], + prefix_list_ids=[ + dict(prefix_list_id="pl-0a5fccee"), + dict(prefix_list_id="pl-2263adef"), + dict(prefix_list_id="pl-65911ba9"), + ], + to_port=38, + user_id_group_pairs=[], + ), + dict( + from_port="22", + ip_ranges=[], + ipv6_ranges=[], + prefix_list_ids=[], + to_port="22", + user_id_group_pairs=[ + dict(group_id="sg-00ec640f", user_id="012345678901"), + dict(group_id="sg-3950599b", user_id="123456789012"), + dict(group_id="sg-fbfd1e3a", user_id="012345678901"), + ], + ), + ], + ip_permissions_egress=[ + dict( + ip_protocol="-1", + ip_ranges=[ + dict(cidr_ip="192.0.2.0/24"), + dict(cidr_ip="198.51.100.0/24"), + ], + ipv6_ranges=[], + prefix_list_ids=[], + user_id_group_pairs=[], + ), + dict( + from_port=443, + ip_protocol="tcp", + ip_ranges=[], + ipv6_ranges=[], + prefix_list_ids=[], + to_port=443, + user_id_group_pairs=[ + dict(group_id="sg-00ec640f", user_id="012345678901"), + dict(group_id="sg-fbfd1e3a", user_id="012345678901"), + ], + ), + ], + ), + ), +] + + +@pytest.mark.parametrize("group, expected", SORT_ORDER) +def test_sort_security_group(group, expected): + assert ec2_security_group_module.sort_security_group(group) == expected + + # We shouldn't care about extra values lurking in the security group definition + group["junk"] = sentinel.EXTRA_JUNK + expected["junk"] = sentinel.EXTRA_JUNK + assert ec2_security_group_module.sort_security_group(group) == expected + + +def test_get_rule_sort_key(): + # Random text, to try and ensure the content of the string doesn't affect the key returned + dict_to_sort = dict( + cidr_ip="MtY0d3Ps6ePsMM0zB18g", + cidr_ipv6="ffbCwK2xhCsy8cyXqHuz", + prefix_list_id="VXKCoW296XxIRiBrTUw8", + group_id="RZpolpZ5wYPPpbqVo1Db", + sentinel=sentinel.EXTRA_RULE_KEY, + ) + + # Walk through through the keys we use and check that they have the priority we expect + for key_name in ["cidr_ip", "cidr_ipv6", "prefix_list_id", "group_id"]: + assert ec2_security_group_module.get_rule_sort_key(dict_to_sort) == dict_to_sort[key_name] + # Remove the current key so that the next time round another key will have priority + dict_to_sort.pop(key_name) + + assert dict_to_sort == {"sentinel": sentinel.EXTRA_RULE_KEY} + assert ec2_security_group_module.get_rule_sort_key(dict_to_sort) is None + + +def test_get_ip_permissions_sort_key(): + dict_to_sort = dict( + ip_ranges=[ + dict(cidr_ip="198.51.100.0/24", original_index=0), + dict(cidr_ip="192.0.2.0/24", original_index=1), + dict(cidr_ip="203.0.113.0/24", original_index=2), + ], + ipv6_ranges=[ + dict(cidr_ipv6="2001:DB8:4000::/34", original_index=0), + dict(cidr_ipv6="2001:DB8:0000::/34", original_index=1), + dict(cidr_ipv6="2001:DB8:8000::/34", original_index=2), + ], + prefix_list_ids=[ + dict(prefix_list_id="pl-2263adef", original_index=0), + dict(prefix_list_id="pl-0a5fccee", original_index=1), + dict(prefix_list_id="pl-65911ba9", original_index=2), + ], + user_id_group_pairs=[ + dict(group_id="sg-3950599b", original_index=0), + dict(group_id="sg-fbfd1e3a", original_index=1), + dict(group_id="sg-00ec640f", original_index=2), + ], + sentinel=sentinel.EXTRA_RULE_KEY, + ) + + expected_keys = dict( + ip_ranges="ipv4:192.0.2.0/24", + ipv6_ranges="ipv6:2001:DB8:0000::/34", + prefix_list_ids="pl:pl-0a5fccee", + user_id_group_pairs="ugid:sg-00ec640f", + ) + + # Walk through through the keys we use and check that they have the priority we expect + for key_name in ["ip_ranges", "ipv6_ranges", "prefix_list_ids", "user_id_group_pairs"]: + sort_key = ec2_security_group_module.get_ip_permissions_sort_key(dict_to_sort) + assert sort_key == expected_keys[key_name] + # Remove the current key so that the next time round another key will have priority + dict_to_sort.pop(key_name) + + assert dict_to_sort == {"sentinel": sentinel.EXTRA_RULE_KEY} + assert ec2_security_group_module.get_ip_permissions_sort_key(dict_to_sort) is None diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py new file mode 100644 index 000000000..34fa8de1a --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py @@ -0,0 +1,99 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from copy import deepcopy +from unittest.mock import sentinel + +import pytest + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + + +@pytest.fixture +def ec2_security_group(monkeypatch): + # monkey patches various ec2_security_group module functions, we'll separately test the operation of + # these functions, we just care that it's passing the results into the right place in the + # instance spec. + monkeypatch.setattr(ec2_security_group_module, "current_account_id", sentinel.CURRENT_ACCOUNT_ID) + return ec2_security_group_module + + +def test_target_from_rule_with_group_id_local_group(ec2_security_group): + groups = dict() + original_groups = deepcopy(groups) + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id="sg-123456789abcdef01"), + groups, + ) + assert groups == original_groups + assert rule_type == "group" + assert created is False + assert target[0] is sentinel.CURRENT_ACCOUNT_ID + assert target[1] == "sg-123456789abcdef01" + assert target[2] is None + + +def test_target_from_rule_with_group_id_peer_group(ec2_security_group): + groups = dict() + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id="123456789012/sg-123456789abcdef02/example-group-name"), + groups, + ) + assert rule_type == "group" + assert created is False + assert target[0] == "123456789012" + assert target[1] == "sg-123456789abcdef02" + assert target[2] is None + + assert sorted(groups.keys()) == ["example-group-name", "sg-123456789abcdef02"] + rule_by_id = groups["sg-123456789abcdef02"] + rule_by_name = groups["example-group-name"] + + assert rule_by_id is rule_by_name + assert rule_by_id["UserId"] == "123456789012" + assert rule_by_id["GroupId"] == "sg-123456789abcdef02" + assert rule_by_id["GroupName"] == "example-group-name" + + +def test_target_from_rule_with_group_id_elb(ec2_security_group): + groups = dict() + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id="amazon-elb/amazon-elb-sg"), + groups, + ) + assert rule_type == "group" + assert created is False + assert target[0] == "amazon-elb" + assert target[1] is None + assert target[2] == "amazon-elb-sg" + + assert "amazon-elb-sg" in groups.keys() + rule_by_name = groups["amazon-elb-sg"] + + assert rule_by_name["UserId"] == "amazon-elb" + assert rule_by_name["GroupId"] is None + assert rule_by_name["GroupName"] == "amazon-elb-sg" + + +def test_target_from_rule_with_group_id_elb_with_sg(ec2_security_group): + groups = dict() + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id="amazon-elb/sg-5a9c116a/amazon-elb-sg"), + groups, + ) + assert rule_type == "group" + assert created is False + assert target[0] == "amazon-elb" + assert target[1] is None + assert target[2] == "amazon-elb-sg" + + assert sorted(groups.keys()) == ["amazon-elb-sg", "sg-5a9c116a"] + rule_by_id = groups["sg-5a9c116a"] + rule_by_name = groups["amazon-elb-sg"] + + assert rule_by_id is rule_by_name + assert rule_by_id["UserId"] == "amazon-elb" + assert rule_by_id["GroupId"] == "sg-5a9c116a" + assert rule_by_id["GroupName"] == "amazon-elb-sg" diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py new file mode 100644 index 000000000..eb2de7596 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py @@ -0,0 +1,85 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import warnings +from unittest.mock import MagicMock +from unittest.mock import sentinel + +import pytest + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + + +@pytest.fixture +def aws_module(): + aws_module = MagicMock() + aws_module.warn = warnings.warn + return aws_module + + +@pytest.fixture +def ec2_security_group(monkeypatch): + # monkey patches various ec2_security_group module functions, we'll separately test the operation of + # these functions, we just care that it's passing the results into the right place in the + # instance spec. + monkeypatch.setattr(ec2_security_group_module, "current_account_id", sentinel.CURRENT_ACCOUNT_ID) + return ec2_security_group_module + + +IPS_GOOD = [ + ( + "192.0.2.2", + "192.0.2.2", + ), + ( + "192.0.2.1/32", + "192.0.2.1/32", + ), + ( + "192.0.2.1/255.255.255.255", + "192.0.2.1/32", + ), + ( + "192.0.2.0/24", + "192.0.2.0/24", + ), + ( + "192.0.2.0/255.255.255.255", + "192.0.2.0/32", + ), + ( + "2001:db8::1/128", + "2001:db8::1/128", + ), + ( + "2001:db8::/32", + "2001:db8::/32", + ), + ("2001:db8:fe80:b897:8990:8a7c:99bf:323d/128", "2001:db8:fe80:b897:8990:8a7c:99bf:323d/128"), +] + +IPS_WARN = [ + ("192.0.2.1/24", "192.0.2.0/24", "One of your CIDR addresses"), + ("2001:DB8::1/32", "2001:DB8::/32", "One of your IPv6 CIDR addresses"), + ("2001:db8:fe80:b897:8990:8a7c:99bf:323d/64", "2001:db8:fe80:b897::/64", "One of your IPv6 CIDR addresses"), +] + + +@pytest.mark.parametrize("ip,expected", IPS_GOOD) +def test_validate_ip_no_warn(ec2_security_group, aws_module, ip, expected): + with warnings.catch_warnings(): + warnings.simplefilter("error") + result = ec2_security_group.validate_ip(aws_module, ip) + + assert result == expected + + +@pytest.mark.parametrize("ip,expected,warn_msg", IPS_WARN) +def test_validate_ip_warn(ec2_security_group, aws_module, ip, warn_msg, expected): + with pytest.warns(UserWarning, match=warn_msg) as recorded: + result = ec2_security_group.validate_ip(aws_module, ip) + + assert len(recorded) == 1 + assert result == expected diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py new file mode 100644 index 000000000..9949c1b5c --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/ec2_security_group/test_validate_rule.py @@ -0,0 +1,100 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from copy import deepcopy + +import pytest + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + +VALID_RULES = [ + dict( + proto="all", + ), + dict( + proto="tcp", + from_port="1", + to_port="65535", + ), + dict( + proto="icmpv6", + from_port="-1", + to_port="-1", + ), + dict( + proto="icmp", + from_port="-1", + to_port="-1", + ), + dict(proto="icmpv6", icmp_type="8", icmp_code="1"), + dict(proto="icmpv6", icmp_code="1"), + dict(proto="icmpv6", icmp_type="8"), + dict(proto="icmp", icmp_type="8", icmp_code="1"), + dict(proto="icmp", icmp_code="1"), + dict(proto="icmp", icmp_type="8"), +] + +INVALID_RULES = [ + ( + dict( + proto="tcp", + icmp_code="1", + ), + r"Specify proto: icmp or icmpv6", + ), + ( + dict( + proto="tcp", + icmp_type="8", + ), + r"Specify proto: icmp or icmpv6", + ), + ( + dict( + proto="tcp", + icmp_type="8", + icmp_code="1", + ), + r"Specify proto: icmp or icmpv6", + ), + ( + dict( + proto="all", + icmp_code="1", + ), + r"Specify proto: icmp or icmpv6", + ), + ( + dict( + proto="all", + icmp_type="8", + ), + r"Specify proto: icmp or icmpv6", + ), + ( + dict( + proto="all", + icmp_type="8", + icmp_code="1", + ), + r"Specify proto: icmp or icmpv6", + ), +] + + +@pytest.mark.parametrize("rule,error_msg", INVALID_RULES) +def test_validate_rule_invalid(rule, error_msg): + original_rule = deepcopy(rule) + with pytest.raises(ec2_security_group_module.SecurityGroupError, match=error_msg): + ec2_security_group_module.validate_rule(rule) + assert original_rule == rule + + +@pytest.mark.parametrize("rule", VALID_RULES) +def test_validate_rule_valid(rule): + original_rule = deepcopy(rule) + ec2_security_group_module.validate_rule(rule) + # validate_rule shouldn't change the rule + assert original_rule == rule diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/fixtures/certs/__init__.py diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py new file mode 100644 index 000000000..51c495e30 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_backup_restore_job_info.py @@ -0,0 +1,146 @@ +# (c) 2022 Red Hat Inc. + +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict + +from ansible_collections.amazon.aws.plugins.modules import backup_restore_job_info + +module_name = "ansible_collections.amazon.aws.plugins.modules.backup_restore_job_info" + + +@pytest.mark.parametrize( + "account_id, status, created_before, created_after, completed_before, completed_after,expected", + [ + ("", "", "", "", "", "", {}), + ("123456789012", "", "", "", "", "", {"ByAccountId": "123456789012"}), + ( + "123456789012", + "COMPLETED", + "", + "", + "", + "", + {"ByAccountId": "123456789012", "ByStatus": "COMPLETED"}, + ), + ], +) +def test_build_request_args( + account_id, status, created_before, created_after, completed_before, completed_after, expected +): + assert ( + backup_restore_job_info.build_request_args( + account_id, status, created_before, created_after, completed_before, completed_after + ) + == expected + ) + + +def test__describe_restore_job(): + connection = MagicMock() + module = MagicMock() + + restore_job_id = "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD" + restore_job_info = { + "AccountId": "123456789012", + "BackupSizeInBytes": "8589934592", + "CompletionDate": "2023-03-13T15:53:07.172000-07:00", + "CreatedResourceArn": "arn:aws:ec2:us-east-2:123456789012:instance/i-01234567ec51af3f", + "CreationDate": "2023-03-13T15:53:07.172000-07:00", + "IamRoleArn": "arn:aws:iam::123456789012:role/service-role/AWSBackupDefaultServiceRole", + "PercentDone": "0.00%", + "RecoveryPointArn": "arn:aws:ec2:us-east-2::image/ami-01234567ec51af3f", + "ResourceType": "EC2", + "RestoreJobId": "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD", + "Status": "COMPLETED", + } + + connection.describe_restore_job.return_value = restore_job_info + + result = backup_restore_job_info._describe_restore_job(connection, module, restore_job_id) + + assert result == [camel_dict_to_snake_dict(restore_job_info)] + connection.describe_restore_job.assert_called_with(RestoreJobId=restore_job_id) + connection.describe_restore_job.call_count == 1 + + +def test__list_restore_jobs(): + connection = MagicMock() + conn_paginator = MagicMock() + paginate = MagicMock() + + request_args = {"ByAccountId": "123456789012"} + + restore_job = { + "AccountId": "123456789012", + "BackupSizeInBytes": "8589934592", + "CompletionDate": "2023-03-13T15:53:07.172000-07:00", + "CreatedResourceArn": "arn:aws:ec2:us-east-2:123456789012:instance/i-01234567ec51af3f", + "CreationDate": "2023-03-13T15:53:07.172000-07:00", + "IamRoleArn": "arn:aws:iam::123456789012:role/service-role/AWSBackupDefaultServiceRole", + "PercentDone": "0.00%", + "RecoveryPointArn": "arn:aws:ec2:us-east-2::image/ami-01234567ec51af3f", + "ResourceType": "EC2", + "RestoreJobId": "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD", + "Status": "COMPLETED", + } + + connection.get_paginator.return_value = conn_paginator + conn_paginator.paginate.return_value = paginate + + paginate.build_full_result.return_value = {"RestoreJobs": [restore_job]} + + result = backup_restore_job_info._list_restore_jobs(connection=connection, **request_args) + + assert result == paginate.build_full_result.return_value + connection.get_paginator.assert_called_with("list_restore_jobs") + conn_paginator.paginate.assert_called_with(**request_args) + + +@patch(module_name + "._list_restore_jobs") +def test_list_restore_jobs(m__list_restore_jobs): + connection = MagicMock() + module = MagicMock() + + request_args = {"ByAccountId": "123456789012"} + + m__list_restore_jobs.return_value = { + "RestoreJobs": [ + { + "AccountId": "123456789012", + "BackupSizeInBytes": "8589934592", + "CompletionDate": "2023-03-13T15:53:07.172000-07:00", + "CreatedResourceArn": "arn:aws:ec2:us-east-2:123456789012:instance/i-01234567ec51af3f", + "CreationDate": "2023-03-13T15:53:07.172000-07:00", + "IamRoleArn": "arn:aws:iam::123456789012:role/service-role/AWSBackupDefaultServiceRole", + "PercentDone": "0.00%", + "RecoveryPointArn": "arn:aws:ec2:us-east-2::image/ami-01234567ec51af3f", + "ResourceType": "EC2", + "RestoreJobId": "52BEE289-xxxx-xxxx-xxxx-47DCAA2E7ACD", + "Status": "COMPLETED", + } + ] + } + + list_restore_jobs_result = backup_restore_job_info.list_restore_jobs(connection, module, request_args) + + assert m__list_restore_jobs.call_count == 1 + m__list_restore_jobs.assert_called_with(connection, **request_args) + assert len(list_restore_jobs_result) == 1 + + +@patch(module_name + ".AnsibleAWSModule") +def test_main_success(m_AnsibleAWSModule): + m_module = MagicMock() + m_AnsibleAWSModule.return_value = m_module + + backup_restore_job_info.main() + + m_module.client.assert_called_with("backup") + m_module.exit_json.assert_called_with(changed=False, restore_jobs=[{}]) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py index f46bc1113..fd0b7ca75 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_cloudformation.py @@ -3,21 +3,23 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - import pytest -# Magic... -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep, placeboify # pylint: disable=unused-import - from ansible_collections.amazon.aws.plugins.module_utils.botocore import boto_exception -from ansible_collections.amazon.aws.plugins.module_utils.modules import _RetryingBotoClientWrapper from ansible_collections.amazon.aws.plugins.module_utils.retries import AWSRetry - +from ansible_collections.amazon.aws.plugins.module_utils.retries import RetryingBotoClientWrapper from ansible_collections.amazon.aws.plugins.modules import cloudformation as cfn_module +# isort: off +# Magic... +# pylint: disable-next=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep + +# pylint: disable-next=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +# isort: on + basic_yaml_tpl = """ --- AWSTemplateFormatVersion: '2010-09-09' @@ -61,167 +63,153 @@ Resources: default_events_limit = 10 -class FakeModule(object): +class FakeModule: def __init__(self, **kwargs): self.params = kwargs def fail_json(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise Exception('FAIL') + raise Exception("FAIL") def fail_json_aws(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise Exception('FAIL') + raise Exception("FAIL") def exit_json(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise Exception('EXIT') + raise Exception("EXIT") def _create_wrapped_client(placeboify): - connection = placeboify.client('cloudformation') + connection = placeboify.client("cloudformation") retry_decorator = AWSRetry.jittered_backoff() - wrapped_conn = _RetryingBotoClientWrapper(connection, retry_decorator) + wrapped_conn = RetryingBotoClientWrapper(connection, retry_decorator) return wrapped_conn def test_invalid_template_json(placeboify): connection = _create_wrapped_client(placeboify) params = { - 'StackName': 'ansible-test-wrong-json', - 'TemplateBody': bad_json_tpl, + "StackName": "ansible-test-wrong-json", + "TemplateBody": bad_json_tpl, } m = FakeModule(disable_rollback=False) with pytest.raises(Exception) as exc_info: cfn_module.create_stack(m, params, connection, default_events_limit) - pytest.fail('Expected malformed JSON to have caused the call to fail') + pytest.fail("Expected malformed JSON to have caused the call to fail") - assert exc_info.match('FAIL') + assert exc_info.match("FAIL") assert "ValidationError" in boto_exception(m.exit_args[0]) def test_client_request_token_s3_stack(maybe_sleep, placeboify): connection = _create_wrapped_client(placeboify) params = { - 'StackName': 'ansible-test-client-request-token-yaml', - 'TemplateBody': basic_yaml_tpl, - 'ClientRequestToken': '3faf3fb5-b289-41fc-b940-44151828f6cf', + "StackName": "ansible-test-client-request-token-yaml", + "TemplateBody": basic_yaml_tpl, + "ClientRequestToken": "3faf3fb5-b289-41fc-b940-44151828f6cf", } m = FakeModule(disable_rollback=False) result = cfn_module.create_stack(m, params, connection, default_events_limit) - assert result['changed'] - assert len(result['events']) > 1 + assert result["changed"] + assert len(result["events"]) > 1 # require that the final recorded stack state was CREATE_COMPLETE # events are retrieved newest-first, so 0 is the latest - assert 'CREATE_COMPLETE' in result['events'][0] - connection.delete_stack(StackName='ansible-test-client-request-token-yaml') + assert "CREATE_COMPLETE" in result["events"][0] + connection.delete_stack(StackName="ansible-test-client-request-token-yaml") def test_basic_s3_stack(maybe_sleep, placeboify): connection = _create_wrapped_client(placeboify) - params = { - 'StackName': 'ansible-test-basic-yaml', - 'TemplateBody': basic_yaml_tpl - } + params = {"StackName": "ansible-test-basic-yaml", "TemplateBody": basic_yaml_tpl} m = FakeModule(disable_rollback=False) result = cfn_module.create_stack(m, params, connection, default_events_limit) - assert result['changed'] - assert len(result['events']) > 1 + assert result["changed"] + assert len(result["events"]) > 1 # require that the final recorded stack state was CREATE_COMPLETE # events are retrieved newest-first, so 0 is the latest - assert 'CREATE_COMPLETE' in result['events'][0] - connection.delete_stack(StackName='ansible-test-basic-yaml') + assert "CREATE_COMPLETE" in result["events"][0] + connection.delete_stack(StackName="ansible-test-basic-yaml") def test_delete_nonexistent_stack(maybe_sleep, placeboify): connection = _create_wrapped_client(placeboify) # module is only used if we threw an unexpected error module = None - result = cfn_module.stack_operation(module, connection, 'ansible-test-nonexist', 'DELETE', default_events_limit) - assert result['changed'] - assert 'Stack does not exist.' in result['log'] + result = cfn_module.stack_operation(module, connection, "ansible-test-nonexist", "DELETE", default_events_limit) + assert result["changed"] + assert "Stack does not exist." in result["log"] def test_get_nonexistent_stack(placeboify): connection = _create_wrapped_client(placeboify) # module is only used if we threw an unexpected error module = None - assert cfn_module.get_stack_facts(module, connection, 'ansible-test-nonexist') is None + assert cfn_module.get_stack_facts(module, connection, "ansible-test-nonexist") is None def test_missing_template_body(): m = FakeModule() with pytest.raises(Exception) as exc_info: - cfn_module.create_stack( - module=m, - stack_params={}, - cfn=None, - events_limit=default_events_limit - ) - pytest.fail('Expected module to have failed with no template') - - assert exc_info.match('FAIL') + cfn_module.create_stack(module=m, stack_params={}, cfn=None, events_limit=default_events_limit) + pytest.fail("Expected module to have failed with no template") + + assert exc_info.match("FAIL") assert not m.exit_args - assert "Either 'template', 'template_body' or 'template_url' is required when the stack does not exist." == m.exit_kwargs['msg'] + assert ( + "Either 'template', 'template_body' or 'template_url' is required when the stack does not exist." + == m.exit_kwargs["msg"] + ) def test_on_create_failure_delete(maybe_sleep, placeboify): m = FakeModule( - on_create_failure='DELETE', + on_create_failure="DELETE", disable_rollback=False, ) connection = _create_wrapped_client(placeboify) - params = { - 'StackName': 'ansible-test-on-create-failure-delete', - 'TemplateBody': failing_yaml_tpl - } + params = {"StackName": "ansible-test-on-create-failure-delete", "TemplateBody": failing_yaml_tpl} result = cfn_module.create_stack(m, params, connection, default_events_limit) - assert result['changed'] - assert result['failed'] - assert len(result['events']) > 1 + assert result["changed"] + assert result["failed"] + assert len(result["events"]) > 1 # require that the final recorded stack state was DELETE_COMPLETE # events are retrieved newest-first, so 0 is the latest - assert 'DELETE_COMPLETE' in result['events'][0] + assert "DELETE_COMPLETE" in result["events"][0] def test_on_create_failure_rollback(maybe_sleep, placeboify): m = FakeModule( - on_create_failure='ROLLBACK', + on_create_failure="ROLLBACK", disable_rollback=False, ) connection = _create_wrapped_client(placeboify) - params = { - 'StackName': 'ansible-test-on-create-failure-rollback', - 'TemplateBody': failing_yaml_tpl - } + params = {"StackName": "ansible-test-on-create-failure-rollback", "TemplateBody": failing_yaml_tpl} result = cfn_module.create_stack(m, params, connection, default_events_limit) - assert result['changed'] - assert result['failed'] - assert len(result['events']) > 1 + assert result["changed"] + assert result["failed"] + assert len(result["events"]) > 1 # require that the final recorded stack state was ROLLBACK_COMPLETE # events are retrieved newest-first, so 0 is the latest - assert 'ROLLBACK_COMPLETE' in result['events'][0] - connection.delete_stack(StackName=params['StackName']) + assert "ROLLBACK_COMPLETE" in result["events"][0] + connection.delete_stack(StackName=params["StackName"]) def test_on_create_failure_do_nothing(maybe_sleep, placeboify): m = FakeModule( - on_create_failure='DO_NOTHING', + on_create_failure="DO_NOTHING", disable_rollback=False, ) connection = _create_wrapped_client(placeboify) - params = { - 'StackName': 'ansible-test-on-create-failure-do-nothing', - 'TemplateBody': failing_yaml_tpl - } + params = {"StackName": "ansible-test-on-create-failure-do-nothing", "TemplateBody": failing_yaml_tpl} result = cfn_module.create_stack(m, params, connection, default_events_limit) - assert result['changed'] - assert result['failed'] - assert len(result['events']) > 1 + assert result["changed"] + assert result["failed"] + assert len(result["events"]) > 1 # require that the final recorded stack state was CREATE_FAILED # events are retrieved newest-first, so 0 is the latest - assert 'CREATE_FAILED' in result['events'][0] - connection.delete_stack(StackName=params['StackName']) + assert "CREATE_FAILED" in result["events"][0] + connection.delete_stack(StackName=params["StackName"]) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py index 5e8140d4a..b1e23451b 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami.py @@ -1,7 +1,9 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from unittest.mock import MagicMock, Mock, patch, call +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch import pytest @@ -28,17 +30,371 @@ def test_create_image_uefi_data(m_get_image_by_id): "uefi_data": "QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa", } - ec2_ami.create_image(module, connection) + ec2_ami.CreateImage.do(module, connection, None) assert connection.register_image.call_count == 1 connection.register_image.assert_has_calls( [ call( aws_retry=True, - Description=None, Name="my-image", BootMode="uefi", TpmSupport="v2.0", - UefiData="QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa" + UefiData="QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa", ) ] ) + + +def test_get_block_device_mapping_virtual_name(): + image = {"block_device_mappings": [{"device_name": "/dev/sdc", "virtual_name": "ephemeral0"}]} + block_device = ec2_ami.get_block_device_mapping(image) + assert block_device == {"/dev/sdc": {"virtual_name": "ephemeral0"}} + + +def test_get_image_by_id_found(): + connection = MagicMock() + + connection.describe_images.return_value = {"Images": [{"ImageId": "ami-0c7a795306730b288"}]} + + image = ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288") + assert image["ImageId"] == "ami-0c7a795306730b288" + assert connection.describe_images.call_count == 1 + assert connection.describe_image_attribute.call_count == 2 + connection.describe_images.assert_has_calls( + [ + call( + aws_retry=True, + ImageIds=["ami-0c7a795306730b288"], + ) + ] + ) + + +def test_get_image_by_too_many(): + connection = MagicMock() + + connection.describe_images.return_value = { + "Images": [ + {"ImageId": "ami-0c7a795306730b288"}, + {"ImageId": "ami-0c7a795306730b288"}, + ] + } + + with pytest.raises(ec2_ami.Ec2AmiFailure): + ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288") + + +def test_get_image_missing(): + connection = MagicMock() + + connection.describe_images.return_value = {"Images": []} + + image = ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288") + assert image is None + assert connection.describe_images.call_count == 1 + connection.describe_images.assert_has_calls( + [ + call( + aws_retry=True, + ImageIds=["ami-0c7a795306730b288"], + ) + ] + ) + + +@patch( + module_name + ".get_image_by_id", +) +def test_create_image_minimal(m_get_image_by_id): + module = MagicMock() + connection = MagicMock() + + m_get_image_by_id.return_value = {"ImageId": "ami-0c7a795306730b288"} + module.params = { + "name": "my-image", + "instance_id": "i-123456789", + "image_id": "ami-0c7a795306730b288", + } + ec2_ami.CreateImage.do(module, connection, None) + assert connection.create_image.call_count == 1 + connection.create_image.assert_has_calls( + [ + call( + aws_retry=True, + InstanceId="i-123456789", + Name="my-image", + ) + ] + ) + + +def test_validate_params(): + module = MagicMock() + + ec2_ami.validate_params(module) + module.fail_json.assert_any_call("one of the following is required: name, image_id") + assert module.require_botocore_at_least.call_count == 0 + + module = MagicMock() + ec2_ami.validate_params(module, tpm_support=True) + assert module.require_botocore_at_least.call_count == 0 + + module = MagicMock() + ec2_ami.validate_params(module, tpm_support=True, boot_mode="legacy-bios") + assert module.require_botocore_at_least.call_count == 0 + module.fail_json.assert_any_call("To specify 'tpm_support', 'boot_mode' must be 'uefi'.") + + module = MagicMock() + ec2_ami.validate_params(module, state="present", name="bobby") + assert module.require_botocore_at_least.call_count == 0 + module.fail_json.assert_any_call( + "The parameters instance_id or device_mapping (register from EBS snapshot) are required for a new image." + ) + + +def test_rename_item_if_exists(): + dict_object = { + "Paris": True, + "London": {"Heathrow Airport": False}, + } + ec2_ami.rename_item_if_exists(dict_object, "Paris", "NewYork") + assert dict_object == {"London": {"Heathrow Airport": False}, "NewYork": True} + + dict_object = { + "Cities": {}, + "London": "bar", + } + + ec2_ami.rename_item_if_exists(dict_object, "London", "Abidjan", "Cities") + ec2_ami.rename_item_if_exists(dict_object, "Doesnt-exist", "Nowhere", "Cities") + assert dict_object == {"Cities": {"Abidjan": "bar"}} + + +def test_DeregisterImage_defer_purge_snapshots(): + image = {"BlockDeviceMappings": [{"Ebs": {"SnapshotId": "My_snapshot"}}, {}]} + func = ec2_ami.DeregisterImage.defer_purge_snapshots(image) + + connection = MagicMock() + assert list(func(connection)) == ["My_snapshot"] + connection.delete_snapshot.assert_called_with(aws_retry=True, SnapshotId="My_snapshot") + + +@patch(module_name + ".get_image_by_id") +@patch(module_name + ".time.sleep") +def test_DeregisterImage_timeout_success(m_sleep, m_get_image_by_id): + connection = MagicMock() + m_get_image_by_id.side_effect = [{"ImageId": "ami-0c7a795306730b288"}, None] + + ec2_ami.DeregisterImage.timeout(connection, "ami-0c7a795306730b288", 10) + assert m_sleep.call_count == 1 + + +@patch(module_name + ".get_image_by_id") +@patch(module_name + ".time.time") +@patch(module_name + ".time.sleep") +def test_DeregisterImage_timeout_failure(m_sleep, m_time, m_get_image_by_id): + connection = MagicMock() + m_time.side_effect = list(range(1, 30)) + m_get_image_by_id.return_value = {"ImageId": "ami-0c7a795306730b288"} + + with pytest.raises(ec2_ami.Ec2AmiFailure): + ec2_ami.DeregisterImage.timeout(connection, "ami-0c7a795306730b288", 10) + assert m_sleep.call_count == 9 + + +def test_UpdateImage_set_launch_permission_check_mode_no_change(): + connection = MagicMock() + image = {"ImageId": "ami-0c7a795306730b288", "LaunchPermissions": {}} + + changed = ec2_ami.UpdateImage.set_launch_permission(connection, image, launch_permissions={}, check_mode=True) + assert changed is False + assert connection.modify_image_attribute.call_count == 0 + + launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]} + image = { + "ImageId": "ami-0c7a795306730b288", + "LaunchPermissions": [ + {"UserId": "123456789012"}, + {"GroupName": "foo"}, + {"GroupName": "bar"}, + ], + } + + +def test_UpdateImage_set_launch_permission_check_mode_with_change(): + connection = MagicMock() + image = {"ImageId": "ami-0c7a795306730b288", "LaunchPermissions": {}} + launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]} + changed = ec2_ami.UpdateImage.set_launch_permission(connection, image, launch_permissions, check_mode=True) + assert changed is True + assert connection.modify_image_attribute.call_count == 0 + + +def test_UpdateImage_set_launch_permission_with_change(): + connection = MagicMock() + image = {"ImageId": "ami-0c7a795306730b288", "LaunchPermissions": {}} + launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]} + changed = ec2_ami.UpdateImage.set_launch_permission(connection, image, launch_permissions, check_mode=False) + assert changed is True + assert connection.modify_image_attribute.call_count == 1 + connection.modify_image_attribute.assert_called_with( + aws_retry=True, + ImageId="ami-0c7a795306730b288", + Attribute="launchPermission", + LaunchPermission={ + "Add": [{"Group": "bar"}, {"Group": "foo"}, {"UserId": "123456789012"}], + "Remove": [], + }, + ) + + +def test_UpdateImage_set_description(): + connection = MagicMock() + module = MagicMock() + module.check_mode = False + image = {"ImageId": "ami-0c7a795306730b288", "Description": "My description"} + changed = ec2_ami.UpdateImage.set_description(connection, module, image, "My description") + assert changed is False + + changed = ec2_ami.UpdateImage.set_description(connection, module, image, "New description") + assert changed is True + assert connection.modify_image_attribute.call_count == 1 + connection.modify_image_attribute.assert_called_with( + aws_retry=True, + ImageId="ami-0c7a795306730b288", + Attribute="Description", + Description={"Value": "New description"}, + ) + + +def test_UpdateImage_set_description_check_mode(): + connection = MagicMock() + module = MagicMock() + module.check_mode = True + image = {"ImageId": "ami-0c7a795306730b288", "Description": "My description"} + changed = ec2_ami.UpdateImage.set_description(connection, module, image, "My description") + assert changed is False + + changed = ec2_ami.UpdateImage.set_description(connection, module, image, "New description") + assert changed is True + assert connection.modify_image_attribute.call_count == 0 + + +def test_CreateImage_build_block_device_mapping(): + device_mapping = [ + { + "device_name": "/dev/xvda", + "volume_size": 8, + "snapshot_id": "snap-xxxxxxxx", + "delete_on_termination": True, + "volume_type": "gp2", + "no_device": False, + }, + { + "device_name": "/dev/xvdb", + "no_device": True, + }, + ] + result = ec2_ami.CreateImage.build_block_device_mapping(device_mapping) + assert result == [ + { + "Ebs": { + "DeleteOnTermination": True, + "SnapshotId": "snap-xxxxxxxx", + "VolumeSize": 8, + "VolumeType": "gp2", + }, + "DeviceName": "/dev/xvda", + }, + {"DeviceName": "/dev/xvdb", "Ebs": {}, "NoDevice": ""}, + ] + + +def test_CreateImage_do_check_mode_no_change(): + module = MagicMock() + + module.params = {"name": "my-image"} + connection = MagicMock() + connection.describe_images.return_value = { + "Images": [ + { + "InstanceId": "i-123456789", + "Name": "my-image", + } + ] + } + + ec2_ami.CreateImage.do_check_mode(module, connection, None) + module.exit_json.assert_called_with( + changed=False, + msg="Error registering image: AMI name is already in use by another AMI", + ) + + +def test_CreateImage_do_check_mode_with_change(): + module = MagicMock() + + module.params = {"name": "my-image"} + connection = MagicMock() + connection.describe_images.return_value = {"Images": []} + + ec2_ami.CreateImage.do_check_mode(module, connection, None) + module.exit_json.assert_called_with(changed=True, msg="Would have created a AMI if not in check mode.") + + +@patch(module_name + ".get_waiter") +def test_CreateImage_wait(m_get_waiter): + connection = MagicMock() + m_waiter = MagicMock() + m_get_waiter.return_value = m_waiter + + assert ec2_ami.CreateImage.wait(connection, wait_timeout=0, image_id=None) is None + + ec2_ami.CreateImage.wait(connection, wait_timeout=600, image_id="ami-0c7a795306730b288") + assert m_waiter.wait.call_count == 1 + m_waiter.wait.assert_called_with( + ImageIds=["ami-0c7a795306730b288"], + WaiterConfig={"Delay": 15, "MaxAttempts": 40}, + ) + + +@patch(module_name + ".add_ec2_tags") +@patch(module_name + ".get_image_by_id") +def test_CreateImage_set_tags(m_get_image_by_id, m_add_ec2_tags): + connection = MagicMock() + module = MagicMock() + + m_get_image_by_id.return_value = { + "ImageId": "ami-0c7a795306730b288", + "BlockDeviceMappings": [ + {"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": "50"}}, + { + "DeviceName": "/dev/sdm", + "Ebs": {"VolumeSize": "100", "SnapshotId": "snap-066877671789bd71b"}, + }, + {"DeviceName": "/dev/sda2"}, + ], + } + tags = {} + ec2_ami.CreateImage.set_tags(connection, module, tags, image_id="ami-0c7a795306730b288") + assert m_add_ec2_tags.call_count == 0 + + tags = {"metro": "LaSalle"} + ec2_ami.CreateImage.set_tags(connection, module, tags, image_id="ami-0c7a795306730b288") + assert m_add_ec2_tags.call_count == 3 + m_add_ec2_tags.assert_called_with(connection, module, "snap-066877671789bd71b", tags) + + +def test_CreateInage_set_launch_permissions(): + connection = MagicMock() + launch_permissions = {"user_ids": ["123456789012"], "group_names": ["foo", "bar"]} + image_id = "ami-0c7a795306730b288" + ec2_ami.CreateImage.set_launch_permissions(connection, launch_permissions, image_id) + + assert connection.modify_image_attribute.call_count == 1 + connection.modify_image_attribute.assert_called_with( + Attribute="LaunchPermission", + ImageId="ami-0c7a795306730b288", + LaunchPermission={"Add": [{"Group": "foo"}, {"Group": "bar"}, {"UserId": "123456789012"}]}, + aws_retry=True, + ) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py new file mode 100644 index 000000000..a5abc77af --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_ami_info.py @@ -0,0 +1,224 @@ +# (c) 2022 Red Hat Inc. + +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import ANY +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import botocore.exceptions +import pytest + +from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict + +from ansible_collections.amazon.aws.plugins.modules import ec2_ami_info + +module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_ami_info" + + +@pytest.fixture +def ec2_client(): + return MagicMock() + + +@pytest.mark.parametrize( + "executable_users,filters,image_ids,owners,expected", + [ + ([], {}, [], [], {}), + ([], {}, ["ami-1234567890"], [], {"ImageIds": ["ami-1234567890"]}), + ([], {}, [], ["1234567890"], {"Filters": [{"Name": "owner-id", "Values": ["1234567890"]}]}), + ( + [], + {"owner-alias": "test_ami_owner"}, + [], + ["1234567890"], + { + "Filters": [ + {"Name": "owner-alias", "Values": ["test_ami_owner"]}, + {"Name": "owner-id", "Values": ["1234567890"]}, + ] + }, + ), + ([], {"is-public": True}, [], [], {"Filters": [{"Name": "is-public", "Values": ["true"]}]}), + (["self"], {}, [], [], {"ExecutableUsers": ["self"]}), + ([], {}, [], ["self"], {"Owners": ["self"]}), + ], +) +def test_build_request_args(executable_users, filters, image_ids, owners, expected): + assert ec2_ami_info.build_request_args(executable_users, filters, image_ids, owners) == expected + + +def test_get_images(ec2_client): + ec2_client.describe_images.return_value = { + "Images": [ + { + "Architecture": "x86_64", + "BlockDeviceMappings": [ + { + "DeviceName": "/dev/sda1", + "Ebs": { + "DeleteOnTermination": "True", + "Encrypted": "False", + "SnapshotId": "snap-0f00cba784af62428", + "VolumeSize": 10, + "VolumeType": "gp2", + }, + } + ], + "ImageId": "ami-1234567890", + "ImageLocation": "1234567890/test-ami-uefi-boot", + "ImageType": "machine", + "Name": "test-ami-uefi-boot", + "OwnerId": "1234567890", + "PlatformDetails": "Linux/UNIX", + } + ], + } + + request_args = {"ImageIds": ["ami-1234567890"]} + + get_images_result = ec2_ami_info.get_images(ec2_client, request_args) + + ec2_client.describe_images.call_count == 2 + ec2_client.describe_images.assert_called_with(aws_retry=True, **request_args) + assert get_images_result == ec2_client.describe_images.return_value + + +def test_get_image_attribute(): + ec2_client = MagicMock() + + ec2_client.describe_image_attribute.return_value = { + "ImageId": "ami-1234567890", + "LaunchPermissions": [{"UserId": "1234567890"}, {"UserId": "0987654321"}], + } + + image_id = "ami-1234567890" + + get_image_attribute_result = ec2_ami_info.get_image_attribute(ec2_client, image_id) + + ec2_client.describe_image_attribute.call_count == 1 + ec2_client.describe_image_attribute.assert_called_with( + aws_retry=True, Attribute="launchPermission", ImageId=image_id + ) + assert len(get_image_attribute_result["LaunchPermissions"]) == 2 + + +@patch(module_name + ".get_image_attribute") +@patch(module_name + ".get_images") +def test_list_ec2_images(m_get_images, m_get_image_attribute): + module = MagicMock() + + m_get_images.return_value = { + "Images": [ + { + "Architecture": "x86_64", + "BlockDeviceMappings": [ + { + "DeviceName": "/dev/sda1", + "Ebs": { + "DeleteOnTermination": "True", + "Encrypted": "False", + "SnapshotId": "snap-0f00cba784af62428", + "VolumeSize": 10, + "VolumeType": "gp2", + }, + } + ], + "ImageId": "ami-1234567890", + "ImageLocation": "1234567890/test-ami-uefi-boot", + "ImageType": "machine", + "Name": "test-ami-uefi-boot", + "OwnerId": "1234567890", + "OwnerAlias": "test_ami_owner", + "PlatformDetails": "Linux/UNIX", + }, + { + "Architecture": "x86_64", + "BlockDeviceMappings": [ + { + "DeviceName": "/dev/sda1", + "Ebs": { + "DeleteOnTermination": "True", + "Encrypted": "False", + "SnapshotId": "snap-0f00cba784af62428", + "VolumeSize": 10, + "VolumeType": "gp2", + }, + } + ], + "ImageId": "ami-1523498760", + "ImageLocation": "1523498760/test-ami-uefi-boot", + "ImageType": "machine", + "Name": "test-ami-uefi-boot", + "OwnerId": "1234567890", + "OwnerAlias": "test_ami_owner", + "PlatformDetails": "Linux/UNIX", + }, + ], + } + + m_get_image_attribute.return_value = { + "ImageId": "ami-1234567890", + "LaunchPermissions": [{"UserId": "1234567890"}, {"UserId": "0987654321"}], + } + + images = m_get_images.return_value["Images"] + images = [camel_dict_to_snake_dict(image) for image in images] + + request_args = { + "Filters": [ + {"Name": "owner-alias", "Values": ["test_ami_owner"]}, + {"Name": "owner-id", "Values": ["1234567890"]}, + ] + } + + # needed for `assert m_get_image_attribute.call_count == 2` + module.params = {"describe_image_attributes": True} + + list_ec2_images_result = ec2_ami_info.list_ec2_images(ec2_client, module, request_args) + + assert m_get_images.call_count == 1 + m_get_images.assert_called_with(ec2_client, request_args) + + assert m_get_image_attribute.call_count == 2 + m_get_image_attribute.assert_has_calls( + [call(ec2_client, images[0]["image_id"])], + [call(ec2_client, images[1]["image_id"])], + ) + + assert len(list_ec2_images_result) == 2 + assert list_ec2_images_result[0]["image_id"] == "ami-1234567890" + assert list_ec2_images_result[1]["image_id"] == "ami-1523498760" + + +@patch(module_name + ".AnsibleAWSModule") +def test_main_success(m_AnsibleAWSModule): + m_module = MagicMock() + m_AnsibleAWSModule.return_value = m_module + + ec2_ami_info.main() + + m_module.client.assert_called_with("ec2", retry_decorator=ANY) + m_module.exit_json.assert_called_with(images=[]) + + +def a_boto_exception(): + return botocore.exceptions.UnknownServiceError(service_name="Whoops", known_service_names="Oula") + + +def test_api_failure_get_images(ec2_client): + request_args = {} + ec2_client.describe_images.side_effect = a_boto_exception() + + with pytest.raises(ec2_ami_info.AmiInfoFailure): + ec2_ami_info.get_images(ec2_client, request_args) + + +def test_api_failure_get_image_attribute(ec2_client): + image_id = "ami-1234567890" + ec2_client.describe_image_attribute.side_effect = a_boto_exception() + + with pytest.raises(ec2_ami_info.AmiInfoFailure): + ec2_ami_info.get_image_attribute(ec2_client, image_id) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py new file mode 100644 index 000000000..d6323601d --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_eni_info.py @@ -0,0 +1,108 @@ +# (c) 2022 Red Hat Inc. + +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import pytest + +from ansible_collections.amazon.aws.plugins.modules import ec2_eni_info + +module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_eni_info" + + +@pytest.mark.parametrize( + "eni_id,filters,expected", [("", {}, {}), ("eni-1234567890", {}, {"NetworkInterfaceIds": ["eni-1234567890"]})] +) +def test_build_request_args(eni_id, filters, expected): + assert ec2_eni_info.build_request_args(eni_id, filters) == expected + + +def test_get_network_interfaces(): + connection = MagicMock() + module = MagicMock() + + connection.describe_network_interfaces.return_value = { + "NetworkInterfaces": [ + { + "AvailabilityZone": "us-east-2b", + "Description": "", + "NetworkInterfaceId": "eni-1234567890", + "PrivateIpAddresses": [{"Primary": "True", "PrivateIpAddress": "11.22.33.44"}], + "RequesterManaged": False, + "SourceDestCheck": True, + "Status": "available", + "SubnetId": "subnet-07d906b8358869bda", + "TagSet": [], + "VpcId": "vpc-0cb60952be96c9cd8", + } + ] + } + + request_args = {"NetworkInterfaceIds": ["eni-1234567890"]} + + network_interfaces_result = ec2_eni_info.get_network_interfaces(connection, module, request_args) + + connection.describe_network_interfaces.call_count == 1 + connection.describe_network_interfaces.assert_called_with(aws_retry=True, **request_args) + assert len(network_interfaces_result["NetworkInterfaces"]) == 1 + + +@patch(module_name + ".get_network_interfaces") +def test_list_eni(m_get_network_interfaces): + connection = MagicMock() + module = MagicMock() + + m_get_network_interfaces.return_value = { + "NetworkInterfaces": [ + { + "AvailabilityZone": "us-east-2b", + "Description": "", + "NetworkInterfaceId": "eni-1234567890", + "PrivateIpAddresses": [{"Primary": "True", "PrivateIpAddress": "11.22.33.44"}], + "RequesterManaged": False, + "SourceDestCheck": True, + "Status": "available", + "SubnetId": "subnet-07d906b8358869bda", + "TagSet": [], + "VpcId": "vpc-0cb60952be96c9cd8", + }, + { + "AvailabilityZone": "us-east-2b", + "Description": "", + "NetworkInterfaceId": "eni-0987654321", + "PrivateIpAddresses": [{"Primary": "True", "PrivateIpAddress": "11.22.33.44"}], + "RequesterManaged": False, + "SourceDestCheck": True, + "Status": "available", + "SubnetId": "subnet-07d906b8358869bda", + "TagSet": [ + {"Key": "Name", "Value": "my-test-eni-name"}, + ], + "VpcId": "vpc-0cb60952be96c9cd8", + }, + ] + } + + request_args = {"Filters": [{"Name": "owner-id", "Values": ["1234567890"]}]} + + camel_network_interfaces = ec2_eni_info.list_eni(connection, module, request_args) + + m_get_network_interfaces.call_count == 1 + m_get_network_interfaces.assert_has_calls( + [ + call(connection, module, request_args), + ] + ) + assert len(camel_network_interfaces) == 2 + + assert camel_network_interfaces[0]["id"] == "eni-1234567890" + assert camel_network_interfaces[0]["tags"] == {} + assert camel_network_interfaces[0].get("name") is None + + assert camel_network_interfaces[1]["id"] == "eni-0987654321" + assert camel_network_interfaces[1]["tags"] == {"Name": "my-test-eni-name"} + assert camel_network_interfaces[1]["name"] == "my-test-eni-name" diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py new file mode 100644 index 000000000..6830fe358 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_import_image.py @@ -0,0 +1,224 @@ +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import ANY +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from ansible_collections.amazon.aws.plugins.modules import ec2_import_image +from ansible_collections.amazon.aws.plugins.modules import ec2_import_image_info + +module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_import_image" +module_name_info = "ansible_collections.amazon.aws.plugins.modules.ec2_import_image_info" +utils = "ansible_collections.amazon.aws.plugins.module_utils.ec2" + +expected_result = { + "import_task_id": "import-ami-0c207d759080a3dff", + "progress": "19", + "snapshot_details": [ + { + "disk_image_size": 26843545600.0, + "format": "RAW", + "status": "active", + "user_bucket": {"s3_bucket": "clone-vm-s3-bucket", "s3_key": "clone-vm-s3-bucket/ubuntu-vm-clone.raw"}, + } + ], + "status": "active", + "status_message": "converting", + "tags": {"Name": "clone-vm-import-image"}, + "task_name": "clone-vm-import-image", +} + +describe_import_image_tasks = [ + { + "ImportTaskId": "import-ami-0c207d759080a3dff", + "Progress": "19", + "SnapshotDetails": [ + { + "DiskImageSize": 26843545600.0, + "Format": "RAW", + "Status": "active", + "UserBucket": {"S3Bucket": "clone-vm-s3-bucket", "S3Key": "clone-vm-s3-bucket/ubuntu-vm-clone.raw"}, + } + ], + "Status": "active", + "StatusMessage": "converting", + "Tags": [{"Key": "Name", "Value": "clone-vm-import-image"}], + } +] + + +@pytest.fixture +def paginate(): + # Create a MagicMock for the paginate object + paginate_mock = MagicMock() + + return paginate_mock + + +@pytest.fixture +def conn_paginator(paginate): + conn_paginator_mock = MagicMock() + conn_paginator_mock.paginate.return_value = paginate + return conn_paginator_mock + + +@pytest.fixture +def client(conn_paginator): + client_mock = MagicMock() + + # Configure the client.get_paginator to return the conn_paginator + client_mock.get_paginator.return_value = conn_paginator + + return client_mock + + +@pytest.fixture +def module(): + # Create a MagicMock for the module object + module_mock = MagicMock() + module_mock.params = { + "task_name": "clone-vm-import-image", + "disk_containers": [ + { + "format": "raw", + "user_bucket": {"s3_bucket": "clone-vm-s3-bucket", "s3_key": "clone-vm-s3-bucket/ubuntu-vm-clone.raw"}, + } + ], + } + module_mock.check_mode = False + + return module_mock + + +@pytest.mark.parametrize( + "side_effects, expected_result", + [ + ( + [{"ImportImageTasks": []}, {"ImportImageTasks": describe_import_image_tasks}], + {"changed": True, "import_image": expected_result}, + ), + ( + [{"ImportImageTasks": describe_import_image_tasks}, {"ImportImageTasks": describe_import_image_tasks}], + { + "changed": False, + "msg": "An import task with the specified name already exists", + "import_image": expected_result, + }, + ), + ], +) +def test_present_no_check_mode(client, module, paginate, side_effects, expected_result): + paginate.build_full_result.side_effect = side_effects + module.exit_json.side_effect = SystemExit(1) + + with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate): + with pytest.raises(SystemExit): + ec2_import_image.present(client, module) + + module.exit_json.assert_called_with(**expected_result) + + +@pytest.mark.parametrize( + "side_effects, expected_result", + [ + ( + [{"ImportImageTasks": []}, {"ImportImageTasks": describe_import_image_tasks}], + {"changed": True, "msg": "Would have created the import task if not in check mode"}, + ), + ( + [{"ImportImageTasks": describe_import_image_tasks}, {"ImportImageTasks": describe_import_image_tasks}], + { + "changed": False, + "msg": "An import task with the specified name already exists", + "import_image": expected_result, + }, + ), + ], +) +def test_present_check_mode(client, module, paginate, side_effects, expected_result): + paginate.build_full_result.side_effect = side_effects + module.check_mode = True + module.exit_json.side_effect = SystemExit(1) + + with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate): + with pytest.raises(SystemExit): + ec2_import_image.present(client, module) + + module.exit_json.assert_called_with(**expected_result) + + +@pytest.mark.parametrize( + "side_effect, expected_result", + [ + ( + [ + {"ImportImageTasks": []}, + ], + { + "changed": False, + "msg": "The specified import task does not exist or it cannot be cancelled", + "import_image": {}, + }, + ), + ( + [ + {"ImportImageTasks": describe_import_image_tasks}, + ], + {"changed": True, "import_image": expected_result}, + ), + ], +) +def test_absent_no_check_mode(client, module, paginate, side_effect, expected_result): + paginate.build_full_result.side_effect = side_effect + module.exit_json.side_effect = SystemExit(1) + + with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate): + with pytest.raises(SystemExit): + ec2_import_image.absent(client, module) + + module.exit_json.assert_called_with(**expected_result) + + +@pytest.mark.parametrize( + "side_effect, expected_result", + [ + ( + [ + {"ImportImageTasks": []}, + ], + { + "changed": False, + "msg": "The specified import task does not exist or it cannot be cancelled", + "import_image": {}, + }, + ), + ( + [ + {"ImportImageTasks": describe_import_image_tasks}, + ], + {"changed": True, "import_image": expected_result}, + ), + ], +) +def test_present_check_mode(client, module, paginate, side_effect, expected_result): + paginate.build_full_result.side_effect = side_effect + module.exit_json.side_effect = SystemExit(1) + + with patch(utils + ".helper_describe_import_image_tasks", return_value=paginate): + with pytest.raises(SystemExit): + ec2_import_image.absent(client, module) + + module.exit_json.assert_called_with(**expected_result) + + +@patch(module_name_info + ".AnsibleAWSModule") +def test_main_success(m_AnsibleAWSModule): + m_module = MagicMock() + m_AnsibleAWSModule.return_value = m_module + + ec2_import_image_info.main() + + m_module.client.assert_called_with("ec2", retry_decorator=ANY) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py index 2660ced63..cbcf02588 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_key.py @@ -1,17 +1,17 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +import copy +import datetime +from unittest.mock import ANY from unittest.mock import MagicMock from unittest.mock import patch -from unittest.mock import call, ANY -import pytest import botocore -import datetime +import pytest from dateutil.tz import tzutc -from ansible.module_utils._text import to_bytes -from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code +from ansible.module_utils._text import to_bytes from ansible_collections.amazon.aws.plugins.modules import ec2_key @@ -19,47 +19,41 @@ module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_key" def raise_botocore_exception_clienterror(action): - params = { - 'Error': { - 'Code': 1, - 'Message': 'error creating key' - }, - 'ResponseMetadata': { - 'RequestId': '01234567-89ab-cdef-0123-456789abcdef' - } + "Error": {"Code": 1, "Message": "error creating key"}, + "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"}, } - if action == 'create_key_pair': - params['Error']['Message'] = 'error creating key' + if action == "create_key_pair": + params["Error"]["Message"] = "error creating key" - elif action == 'describe_key_pair': - params['Error']['Code'] = 'InvalidKeyPair.NotFound' - params['Error']['Message'] = 'The key pair does not exist' + elif action == "describe_key_pair": + params["Error"]["Code"] = "InvalidKeyPair.NotFound" + params["Error"]["Message"] = "The key pair does not exist" - elif action == 'import_key_pair': - params['Error']['Message'] = 'error importing key' + elif action == "import_key_pair": + params["Error"]["Message"] = "error importing key" - elif action == 'delete_key_pair': - params['Error']['Message'] = 'error deleting key' + elif action == "delete_key_pair": + params["Error"]["Message"] = "error deleting key" return botocore.exceptions.ClientError(params, action) def test__import_key_pair(): ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com" expected_params = { - 'KeyName': name, - 'PublicKeyMaterial': to_bytes(key_material), + "KeyName": name, + "PublicKeyMaterial": to_bytes(key_material), } ec2_client.import_key_pair.return_value = { - 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-012345678905a208d' + "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62", + "KeyName": "my_keypair", + "KeyPairId": "key-012345678905a208d", } result = ec2_key._import_key_pair(ec2_client, name, key_material) @@ -71,22 +65,21 @@ def test__import_key_pair(): def test_api_failure__import_key_pair(): ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com" expected_params = { - 'KeyName': name, - 'PublicKeyMaterial': to_bytes(key_material), + "KeyName": name, + "PublicKeyMaterial": to_bytes(key_material), } - ec2_client.import_key_pair.side_effect = raise_botocore_exception_clienterror('import_key_pair') + ec2_client.import_key_pair.side_effect = raise_botocore_exception_clienterror("import_key_pair") with pytest.raises(ec2_key.Ec2KeyFailure): ec2_key._import_key_pair(ec2_client, name, key_material) def test_extract_key_data_describe_key_pairs(): - key = { "CreateTime": datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()), "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", @@ -96,66 +89,61 @@ def test_extract_key_data_describe_key_pairs(): } key_type = "rsa" - + file_name = MagicMock() expected_result = { "name": "my_keypair", "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "id": "key-043046ef2a9a80b56", "tags": {}, - "type": "rsa" + "type": "rsa", } - result = ec2_key.extract_key_data(key, key_type) + result = ec2_key.extract_key_data(key, key_type, file_name) assert result == expected_result def test_extract_key_data_create_key_pair(): - key = { - 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-043046ef2a9a80b56' + "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", + "KeyName": "my_keypair", + "KeyPairId": "key-043046ef2a9a80b56", } key_type = "rsa" - + file_name = MagicMock() expected_result = { "name": "my_keypair", "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "id": "key-043046ef2a9a80b56", "tags": {}, - "type": "rsa" + "type": "rsa", } - result = ec2_key.extract_key_data(key, key_type) + result = ec2_key.extract_key_data(key, key_type, file_name) assert result == expected_result -@patch(module_name + '.delete_key_pair') -@patch(module_name + '._import_key_pair') -@patch(module_name + '.find_key_pair') +@patch(module_name + ".delete_key_pair") +@patch(module_name + "._import_key_pair") +@patch(module_name + ".find_key_pair") def test_get_key_fingerprint(m_find_key_pair, m_import_key_pair, m_delete_key_pair): - module = MagicMock() ec2_client = MagicMock() + file_name = MagicMock() m_find_key_pair.return_value = None m_import_key_pair.return_value = { - 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-043046ef2a9a80b56' + "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62", + "KeyName": "my_keypair", + "KeyPairId": "key-043046ef2a9a80b56", } - m_delete_key_pair.return_value = { - 'changed': True, - 'key': None, - 'msg': 'key deleted' - } + m_delete_key_pair.return_value = {"changed": True, "key": None, "msg": "key deleted"} - expected_result = 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62' + expected_result = "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62" key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com" @@ -169,17 +157,17 @@ def test_get_key_fingerprint(m_find_key_pair, m_import_key_pair, m_delete_key_pa def test_find_key_pair(): ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" ec2_client.describe_key_pairs.return_value = { - 'KeyPairs': [ + "KeyPairs": [ { - 'CreateTime': datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()), - 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-043046ef2a9a80b56', - 'KeyType': 'rsa', - 'Tags': [] + "CreateTime": datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()), + "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", + "KeyName": "my_keypair", + "KeyPairId": "key-043046ef2a9a80b56", + "KeyType": "rsa", + "Tags": [], } ], } @@ -192,7 +180,7 @@ def test_find_key_pair(): def test_api_failure_find_key_pair(): ec2_client = MagicMock() - name = 'non_existing_keypair' + name = "non_existing_keypair" ec2_client.describe_key_pairs.side_effect = botocore.exceptions.BotoCoreError @@ -202,9 +190,9 @@ def test_api_failure_find_key_pair(): def test_invalid_key_pair_find_key_pair(): ec2_client = MagicMock() - name = 'non_existing_keypair' + name = "non_existing_keypair" - ec2_client.describe_key_pairs.side_effect = raise_botocore_exception_clienterror('describe_key_pair') + ec2_client.describe_key_pairs.side_effect = raise_botocore_exception_clienterror("describe_key_pair") result = ec2_key.find_key_pair(ec2_client, name) @@ -213,11 +201,11 @@ def test_invalid_key_pair_find_key_pair(): def test__create_key_pair(): ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" tag_spec = None key_type = None - expected_params = {'KeyName': name} + expected_params = {"KeyName": name} ec2_client.create_key_pair.return_value = { "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62", @@ -239,33 +227,33 @@ def test__create_key_pair(): def test_api_failure__create_key_pair(): ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" tag_spec = None key_type = None - ec2_client.create_key_pair.side_effect = raise_botocore_exception_clienterror('create_key_pair') + ec2_client.create_key_pair.side_effect = raise_botocore_exception_clienterror("create_key_pair") with pytest.raises(ec2_key.Ec2KeyFailure): ec2_key._create_key_pair(ec2_client, name, tag_spec, key_type) -@patch(module_name + '.extract_key_data') -@patch(module_name + '._import_key_pair') +@patch(module_name + ".extract_key_data") +@patch(module_name + "._import_key_pair") def test_create_new_key_pair_key_material(m_import_key_pair, m_extract_key_data): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com" - key_type = 'rsa' + key_type = "rsa" tags = None - + file_name = MagicMock() module.check_mode = False m_import_key_pair.return_value = { - 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-012345678905a208d' + "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62", + "KeyName": "my_keypair", + "KeyPairId": "key-012345678905a208d", } m_extract_key_data.return_value = { @@ -273,35 +261,36 @@ def test_create_new_key_pair_key_material(m_import_key_pair, m_extract_key_data) "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "id": "key-043046ef2a9a80b56", "tags": {}, - "type": "rsa" + "type": "rsa", } - expected_result = {'changed': True, 'key': m_extract_key_data.return_value, 'msg': 'key pair created'} + expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair created"} - result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, module.check_mode) + result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, file_name, module.check_mode) assert result == expected_result assert m_import_key_pair.call_count == 1 assert m_extract_key_data.call_count == 1 -@patch(module_name + '.extract_key_data') -@patch(module_name + '._create_key_pair') +@patch(module_name + ".extract_key_data") +@patch(module_name + "._create_key_pair") def test_create_new_key_pair_no_key_material(m_create_key_pair, m_extract_key_data): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' - key_type = 'rsa' + name = "my_keypair" + key_type = "rsa" key_material = None tags = None - + file_name = MagicMock() + # TODO. file_name=sth module.check_mode = False m_create_key_pair.return_value = { - 'KeyFingerprint': 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-012345678905a208d' + "KeyFingerprint": "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62", + "KeyName": "my_keypair", + "KeyPairId": "key-012345678905a208d", } m_extract_key_data.return_value = { @@ -309,12 +298,12 @@ def test_create_new_key_pair_no_key_material(m_create_key_pair, m_extract_key_da "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "id": "key-043046ef2a9a80b56", "tags": {}, - "type": "rsa" + "type": "rsa", } - expected_result = {'changed': True, 'key': m_extract_key_data.return_value, 'msg': 'key pair created'} + expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair created"} - result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, module.check_mode) + result = ec2_key.create_new_key_pair(ec2_client, name, key_material, key_type, tags, file_name, module.check_mode) assert result == expected_result assert m_create_key_pair.call_count == 1 @@ -324,7 +313,7 @@ def test_create_new_key_pair_no_key_material(m_create_key_pair, m_extract_key_da def test__delete_key_pair(): ec2_client = MagicMock() - key_name = 'my_keypair' + key_name = "my_keypair" ec2_key._delete_key_pair(ec2_client, key_name) assert ec2_client.delete_key_pair.call_count == 1 @@ -333,23 +322,25 @@ def test__delete_key_pair(): def test_api_failure__delete_key_pair(): ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" - ec2_client.delete_key_pair.side_effect = raise_botocore_exception_clienterror('delete_key_pair') + ec2_client.delete_key_pair.side_effect = raise_botocore_exception_clienterror("delete_key_pair") with pytest.raises(ec2_key.Ec2KeyFailure): ec2_key._delete_key_pair(ec2_client, name) -@patch(module_name + '.extract_key_data') -@patch(module_name + '._import_key_pair') -@patch(module_name + '.delete_key_pair') -@patch(module_name + '.get_key_fingerprint') -def test_update_key_pair_by_key_material_update_needed(m_get_key_fingerprint, m_delete_key_pair, m__import_key_pair, m_extract_key_data): +@patch(module_name + ".extract_key_data") +@patch(module_name + "._import_key_pair") +@patch(module_name + ".delete_key_pair") +@patch(module_name + ".get_key_fingerprint") +def test_update_key_pair_by_key_material_update_needed( + m_get_key_fingerprint, m_delete_key_pair, m__import_key_pair, m_extract_key_data +): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key_material = "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com" tag_spec = None key = { @@ -358,16 +349,15 @@ def test_update_key_pair_by_key_material_update_needed(m_get_key_fingerprint, m_ "KeyPairId": "key-043046ef2a9a80b56", "Tags": {}, } - module.check_mode = False - m_get_key_fingerprint.return_value = 'd7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62' + m_get_key_fingerprint.return_value = "d7:ff:a6:63:18:64:9c:57:a1:ee:ca:a4:ad:c2:81:62" m_delete_key_pair.return_value = None m__import_key_pair.return_value = { - 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-043046ef2a9a80b56', - 'Tags': {}, + "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", + "KeyName": "my_keypair", + "KeyPairId": "key-043046ef2a9a80b56", + "Tags": {}, } m_extract_key_data.return_value = { "name": "my_keypair", @@ -376,7 +366,7 @@ def test_update_key_pair_by_key_material_update_needed(m_get_key_fingerprint, m_ "tags": {}, } - expected_result = {'changed': True, 'key': m_extract_key_data.return_value, 'msg': "key pair updated"} + expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair updated"} result = ec2_key.update_key_pair_by_key_material(module.check_mode, ec2_client, name, key, key_material, tag_spec) @@ -407,7 +397,6 @@ def test_update_key_pair_by_key_material_key_exists(m_get_key_fingerprint, m_ext "KeyPairId": key_id, "Tags": {}, } - check_mode = False m_get_key_fingerprint.return_value = key_fingerprint m_extract_key_data.return_value = { @@ -434,31 +423,31 @@ def test_update_key_pair_by_key_type_update_needed(m_delete_key_pair, m__create_ module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' - key_type = 'rsa' + name = "my_keypair" + key_type = "rsa" tag_spec = None - + file_name = MagicMock() module.check_mode = False m_delete_key_pair.return_value = None m__create_key_pair.return_value = { - 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa', - 'Name': 'my_keypair', - 'Id': 'key-043046ef2a9a80b56', - 'Tags': {}, - 'Type': 'rsa' + "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", + "Name": "my_keypair", + "Id": "key-043046ef2a9a80b56", + "Tags": {}, + "Type": "rsa", } m_extract_key_data.return_value = { "name": "my_keypair", "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "id": "key-043046ef2a9a80b56", "tags": {}, - "type": "rsa" + "type": "rsa", } expected_result = {"changed": True, "key": m_extract_key_data.return_value, "msg": "key pair updated"} - result = ec2_key.update_key_pair_by_key_type(module.check_mode, ec2_client, name, key_type, tag_spec) + result = ec2_key.update_key_pair_by_key_type(module.check_mode, ec2_client, name, key_type, tag_spec, file_name) assert result == expected_result assert m_delete_key_pair.call_count == 1 @@ -466,30 +455,30 @@ def test_update_key_pair_by_key_type_update_needed(m_delete_key_pair, m__create_ assert m_extract_key_data.call_count == 1 m_delete_key_pair.assert_called_with(module.check_mode, ec2_client, name, finish_task=False) m__create_key_pair.assert_called_with(ec2_client, name, tag_spec, key_type) - m_extract_key_data.assert_called_with(m__create_key_pair.return_value, key_type) + m_extract_key_data.assert_called_with(m__create_key_pair.return_value, key_type, file_name) -@patch(module_name + '.update_key_pair_by_key_material') +@patch(module_name + ".update_key_pair_by_key_material") def test_handle_existing_key_pair_update_key_matrial_with_force(m_update_key_pair_by_key_material): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key = { "KeyName": "my_keypair", "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "KeyPairId": "key-043046ef2a9a80b56", "Tags": {}, - "KeyType": "rsa" + "KeyType": "rsa", } module.params = { - 'key_material': "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com", - 'force': True, - 'key_type': 'rsa', - 'tags': None, - 'purge_tags': True, - 'tag_spec': None + "key_material": "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com", + "force": True, + "key_type": "rsa", + "tags": None, + "purge_tags": True, + "tag_spec": None, } key_data = { @@ -499,9 +488,9 @@ def test_handle_existing_key_pair_update_key_matrial_with_force(m_update_key_pai "tags": {}, } - m_update_key_pair_by_key_material.return_value = {'changed': True, 'key': key_data, 'msg': "key pair updated"} + m_update_key_pair_by_key_material.return_value = {"changed": True, "key": key_data, "msg": "key pair updated"} - expected_result = {'changed': True, 'key': key_data, 'msg': "key pair updated"} + expected_result = {"changed": True, "key": key_data, "msg": "key pair updated"} result = ec2_key.handle_existing_key_pair_update(module, ec2_client, name, key) @@ -509,27 +498,27 @@ def test_handle_existing_key_pair_update_key_matrial_with_force(m_update_key_pai assert m_update_key_pair_by_key_material.call_count == 1 -@patch(module_name + '.update_key_pair_by_key_type') +@patch(module_name + ".update_key_pair_by_key_type") def test_handle_existing_key_pair_update_key_type(m_update_key_pair_by_key_type): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key = { "KeyName": "my_keypair", "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "KeyPairId": "key-043046ef2a9a80b56", "Tags": {}, - "KeyType": "ed25519" + "KeyType": "ed25519", } module.params = { - 'key_material': "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com", - 'force': False, - 'key_type': 'rsa', - 'tags': None, - 'purge_tags': True, - 'tag_spec': None + "key_material": "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com", + "force": False, + "key_type": "rsa", + "tags": None, + "purge_tags": True, + "tag_spec": None, } key_data = { @@ -539,9 +528,9 @@ def test_handle_existing_key_pair_update_key_type(m_update_key_pair_by_key_type) "tags": {}, } - m_update_key_pair_by_key_type.return_value = {'changed': True, 'key': key_data, 'msg': "key pair updated"} + m_update_key_pair_by_key_type.return_value = {"changed": True, "key": key_data, "msg": "key pair updated"} - expected_result = {'changed': True, 'key': key_data, 'msg': "key pair updated"} + expected_result = {"changed": True, "key": key_data, "msg": "key pair updated"} result = ec2_key.handle_existing_key_pair_update(module, ec2_client, name, key) @@ -549,27 +538,27 @@ def test_handle_existing_key_pair_update_key_type(m_update_key_pair_by_key_type) assert m_update_key_pair_by_key_type.call_count == 1 -@patch(module_name + '.extract_key_data') +@patch(module_name + ".extract_key_data") def test_handle_existing_key_pair_else(m_extract_key_data): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' + name = "my_keypair" key = { "KeyName": "my_keypair", "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "KeyPairId": "key-043046ef2a9a80b56", "Tags": {}, - "KeyType": "rsa" + "KeyType": "rsa", } module.params = { - 'key_material': "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com", - 'force': False, - 'key_type': 'rsa', - 'tags': None, - 'purge_tags': True, - 'tag_spec': None + "key_material": "ssh-rsa AAAAB3NzaC1yc2EAA email@example.com", + "force": False, + "key_type": "rsa", + "tags": None, + "purge_tags": True, + "tag_spec": None, } m_extract_key_data.return_value = { @@ -577,7 +566,7 @@ def test_handle_existing_key_pair_else(m_extract_key_data): "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", "id": "key-043046ef2a9a80b56", "tags": {}, - "type": "rsa" + "type": "rsa", } expected_result = {"changed": False, "key": m_extract_key_data.return_value, "msg": "key pair already exists"} @@ -588,55 +577,53 @@ def test_handle_existing_key_pair_else(m_extract_key_data): assert m_extract_key_data.call_count == 1 -@patch(module_name + '._delete_key_pair') -@patch(module_name + '.find_key_pair') -def test_delete_key_pair_key_exists(m_find_key_pair, m_delete_key_pair): +@patch(module_name + "._delete_key_pair") +@patch(module_name + ".find_key_pair") +def test_delete_key_pair_key_exists(m_find_key_pair, m_delete_key_pair, tmp_path): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' - + name = "my_keypair" + file_name = tmp_path / "private_key_data.pem" module.check_mode = False m_find_key_pair.return_value = { - 'KeyPairs': [ + "KeyPairs": [ { - 'CreateTime': datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()), - 'KeyFingerprint': '11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa', - 'KeyName': 'my_keypair', - 'KeyPairId': 'key-043046ef2a9a80b56', - 'KeyType': 'rsa', - 'Tags': [] + "CreateTime": datetime.datetime(2022, 9, 15, 20, 10, 15, tzinfo=tzutc()), + "KeyFingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", + "KeyName": "my_keypair", + "KeyPairId": "key-043046ef2a9a80b56", + "KeyType": "rsa", + "Tags": [], } ], } - expected_result = {'changed': True, 'key': None, 'msg': 'key deleted'} - result = ec2_key.delete_key_pair(module.check_mode, ec2_client, name) assert m_find_key_pair.call_count == 1 m_find_key_pair.assert_called_with(ec2_client, name) assert m_delete_key_pair.call_count == 1 m_delete_key_pair.assert_called_with(ec2_client, name) - assert result == expected_result + assert result == {"changed": True, "key": None, "msg": "key deleted"} -@patch(module_name + '._delete_key_pair') -@patch(module_name + '.find_key_pair') +@patch(module_name + "._delete_key_pair") +@patch(module_name + ".find_key_pair") def test_delete_key_pair_key_not_exist(m_find_key_pair, m_delete_key_pair): module = MagicMock() ec2_client = MagicMock() - name = 'my_keypair' - + name = "my_keypair" + file_name = "non_existing_file_path" module.check_mode = False m_find_key_pair.return_value = None - expected_result = {'key': None, 'msg': 'key did not exist'} + expected_result = {"key": None, "msg": "key did not exist"} - result = ec2_key.delete_key_pair(module.check_mode, ec2_client, name) + result = ec2_key.delete_key_pair(module.check_mode, ec2_client, name, file_name) assert m_find_key_pair.call_count == 1 m_find_key_pair.assert_called_with(ec2_client, name) @@ -644,6 +631,24 @@ def test_delete_key_pair_key_not_exist(m_find_key_pair, m_delete_key_pair): assert result == expected_result +def test__write_private_key(tmp_path): + key_data = { + "name": "my_keypair", + "fingerprint": "11:12:13:14:bb:26:85:b2:e8:39:27:bc:ee:aa:ff:ee:dd:cc:bb:aa", + "id": "key-043046ef2a9a80b56", + "tags": {}, + "type": "rsa", + "private_key": "ABCDEFGH", + } + file_name = tmp_path / "id_rsa_key" + saved_key_data = copy.deepcopy(key_data) + result = ec2_key._write_private_key(key_data, str(file_name)) + + assert "private_key" not in result.keys() + del saved_key_data["private_key"] + assert saved_key_data == result + + @patch(module_name + ".AnsibleAWSModule") def test_main_success(m_AnsibleAWSModule): m_module = MagicMock() diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py new file mode 100644 index 000000000..23ba85003 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_metadata_facts.py @@ -0,0 +1,101 @@ +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import gzip +import io +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from ansible_collections.amazon.aws.plugins.modules import ec2_metadata_facts + +module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_metadata_facts" + + +class FailJson(Exception): + pass + + +@pytest.fixture() +def ec2_instance(): + module = MagicMock() + return ec2_metadata_facts.Ec2Metadata(module) + + +@patch(module_name + ".fetch_url") +def test__fetch_401(m_fetch_url, ec2_instance): + ec2_instance.module.fail_json.side_effect = FailJson() + m_fetch_url.return_value = (None, {"status": 401, "msg": "Oops"}) + with pytest.raises(FailJson): + ec2_instance._fetch("http://169.254.169.254/latest/meta-data/") + + +@patch(module_name + ".fetch_url") +def test__fetch_200(m_fetch_url, ec2_instance): + m_fetch_url.return_value = (io.StringIO("my-value"), {"status": 200}) + assert ec2_instance._fetch("http://169.254.169.254/latest/meta-data/ami-id") == "my-value" + + m_fetch_url.return_value = (io.StringIO("1"), {"status": 200}) + assert ec2_instance._fetch("http://169.254.169.254/latest/meta-data/ami-id") == "1" + + +@patch(module_name + ".fetch_url") +def test_fetch(m_fetch_url, ec2_instance): + raw_list = "ami-id\n" + m_fetch_url.side_effect = [ + (io.StringIO(raw_list), {"status": 200}), + (io.StringIO("my-value"), {"status": 200}), + ] + ec2_instance.fetch("http://169.254.169.254/latest/meta-data/") + assert ec2_instance._data == {"http://169.254.169.254/latest/meta-data/ami-id": "my-value"} + + +@patch(module_name + ".fetch_url") +def test_fetch_recusive(m_fetch_url, ec2_instance): + raw_list = "whatever/\n" + m_fetch_url.side_effect = [ + (io.StringIO(raw_list), {"status": 200}), + (io.StringIO("my-key"), {"status": 200}), + (io.StringIO("my-value"), {"status": 200}), + ] + ec2_instance.fetch("http://169.254.169.254/latest/meta-data/") + assert ec2_instance._data == {"http://169.254.169.254/latest/meta-data/whatever/my-key": "my-value"} + + +@patch(module_name + ".fetch_url") +def test__fetch_user_data_compressed(m_fetch_url, ec2_instance): + user_data = b"""Content-Type: multipart/mixed; boundary="MIMEBOUNDARY" +MIME-Version: 1.0 + +--MIMEBOUNDARY +Content-Transfer-Encoding: 7bit +Content-Type: text/cloud-config +Mime-Version: 1.0 + +packages: ['httpie'] + +--MIMEBOUNDARY-- +""" + + m_fetch_url.return_value = (io.BytesIO(gzip.compress(user_data)), {"status": 200}) + assert ec2_instance._fetch("http://169.254.169.254/latest/user-data") == user_data.decode("utf-8") + + +@patch(module_name + ".fetch_url") +def test__fetch_user_data_plain(m_fetch_url, ec2_instance): + user_data = b"""Content-Type: multipart/mixed; boundary="MIMEBOUNDARY" +MIME-Version: 1.0 + +--MIMEBOUNDARY +Content-Transfer-Encoding: 7bit +Content-Type: text/cloud-config +Mime-Version: 1.0 + +packages: ['httpie'] + +--MIMEBOUNDARY-- +""" + + m_fetch_url.return_value = (io.BytesIO(user_data), {"status": 200}) + assert ec2_instance._fetch("http://169.254.169.254/latest/user-data") == user_data.decode("utf-8") diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py index 1ebbe86c6..c47122657 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_security_group.py @@ -1,83 +1,59 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - from ansible_collections.amazon.aws.plugins.modules import ec2_security_group as group_module def test_from_permission(): internal_http = { - 'FromPort': 80, - 'IpProtocol': 'tcp', - 'IpRanges': [ - { - 'CidrIp': '10.0.0.0/8', - 'Description': 'Foo Bar Baz' - }, + "FromPort": 80, + "IpProtocol": "tcp", + "IpRanges": [ + {"CidrIp": "10.0.0.0/8", "Description": "Foo Bar Baz"}, ], - 'Ipv6Ranges': [ - {'CidrIpv6': 'fe80::94cc:8aff:fef6:9cc/64'}, + "Ipv6Ranges": [ + {"CidrIpv6": "fe80::94cc:8aff:fef6:9cc/64"}, ], - 'PrefixListIds': [], - 'ToPort': 80, - 'UserIdGroupPairs': [], + "PrefixListIds": [], + "ToPort": 80, + "UserIdGroupPairs": [], } perms = list(group_module.rule_from_group_permission(internal_http)) assert len(perms) == 2 - assert perms[0].target == '10.0.0.0/8' - assert perms[0].target_type == 'ipv4' - assert perms[0].description == 'Foo Bar Baz' - assert perms[1].target == 'fe80::94cc:8aff:fef6:9cc/64' + assert perms[0].target == "10.0.0.0/8" + assert perms[0].target_type == "ipv4" + assert perms[0].description == "Foo Bar Baz" + assert perms[1].target == "fe80::94cc:8aff:fef6:9cc/64" global_egress = { - 'IpProtocol': '-1', - 'IpRanges': [{'CidrIp': '0.0.0.0/0'}], - 'Ipv6Ranges': [], - 'PrefixListIds': [], - 'UserIdGroupPairs': [] + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": "0.0.0.0/0"}], + "Ipv6Ranges": [], + "PrefixListIds": [], + "UserIdGroupPairs": [], } perms = list(group_module.rule_from_group_permission(global_egress)) assert len(perms) == 1 - assert perms[0].target == '0.0.0.0/0' + assert perms[0].target == "0.0.0.0/0" assert perms[0].port_range == (None, None) internal_prefix_http = { - 'FromPort': 80, - 'IpProtocol': 'tcp', - 'PrefixListIds': [ - {'PrefixListId': 'p-1234'} - ], - 'ToPort': 80, - 'UserIdGroupPairs': [], + "FromPort": 80, + "IpProtocol": "tcp", + "PrefixListIds": [{"PrefixListId": "p-1234"}], + "ToPort": 80, + "UserIdGroupPairs": [], } perms = list(group_module.rule_from_group_permission(internal_prefix_http)) assert len(perms) == 1 - assert perms[0].target == 'p-1234' + assert perms[0].target == "p-1234" def test_rule_to_permission(): tests = [ - group_module.Rule((22, 22), 'udp', 'sg-1234567890', 'group', None), - group_module.Rule((1, 65535), 'tcp', '0.0.0.0/0', 'ipv4', "All TCP from everywhere"), - group_module.Rule((443, 443), 'tcp', 'ip-123456', 'ip_prefix', "Traffic to privatelink IPs"), - group_module.Rule((443, 443), 'tcp', 'feed:dead:::beef/64', 'ipv6', None), + group_module.Rule((22, 22), "udp", "sg-1234567890", "group", None), + group_module.Rule((1, 65535), "tcp", "0.0.0.0/0", "ipv4", "All TCP from everywhere"), + group_module.Rule((443, 443), "tcp", "ip-123456", "ip_prefix", "Traffic to privatelink IPs"), + group_module.Rule((443, 443), "tcp", "feed:dead:::beef/64", "ipv6", None), ] for test in tests: perm = group_module.to_permission(test) - assert perm['FromPort'], perm['ToPort'] == test.port_range - assert perm['IpProtocol'] == test.protocol - - -def test_validate_ip(): - class Warner(object): - def warn(self, msg): - return - ips = [ - ('10.1.1.1/24', '10.1.1.0/24'), - ('192.168.56.101/16', '192.168.0.0/16'), - # Don't modify IPv6 CIDRs, AWS supports /128 and device ranges - ('fc00:8fe0:fe80:b897:8990:8a7c:99bf:323d/128', 'fc00:8fe0:fe80:b897:8990:8a7c:99bf:323d/128'), - ] - - for ip, net in ips: - assert group_module.validate_ip(Warner(), ip) == net + assert perm["FromPort"], perm["ToPort"] == test.port_range + assert perm["IpProtocol"] == test.protocol diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py new file mode 100644 index 000000000..34767d38a --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_snapshot_info.py @@ -0,0 +1,128 @@ +# (c) 2022 Red Hat Inc. + +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import ANY +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import pytest + +from ansible_collections.amazon.aws.plugins.modules import ec2_snapshot_info + +module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_snapshot_info" + + +@pytest.mark.parametrize( + "snapshot_ids,owner_ids,restorable_by_user_ids,filters,max_results,next_token_id,expected", + [([], [], [], {}, None, None, {})], +) +def test_build_request_args( + snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id, expected +): + assert ( + ec2_snapshot_info.build_request_args( + snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id + ) + == expected + ) + + +def test_get_snapshots(): + module = MagicMock() + connection = MagicMock() + + connection.describe_snapshots.return_value = { + "Snapshots": [ + { + "Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890", + "Encrypted": False, + "OwnerId": "123456789000", + "Progress": "100%", + "SnapshotId": "snap-0f00cba1234567890", + "StartTime": "2021-09-30T01:04:49.724000+00:00", + "State": "completed", + "StorageTier": "standard", + "Tags": [ + {"Key": "TagKey", "Value": "TagValue"}, + ], + "VolumeId": "vol-0ae6c5e1234567890", + "VolumeSize": 10, + }, + { + "Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890", + "Encrypted": False, + "OwnerId": "123456789000", + "Progress": "100%", + "SnapshotId": "snap-0f00cba1234567890", + "StartTime": "2021-09-30T01:04:49.724000+00:00", + "State": "completed", + "StorageTier": "standard", + "Tags": [ + {"Key": "TagKey", "Value": "TagValue"}, + ], + "VolumeId": "vol-0ae6c5e1234567890", + "VolumeSize": 10, + }, + ] + } + + request_args = {"SnapshotIds": ["snap-0f00cba1234567890"]} + + snapshot_info = ec2_snapshot_info.get_snapshots(connection, module, request_args) + + assert connection.describe_snapshots.call_count == 1 + connection.describe_snapshots.assert_called_with(aws_retry=True, SnapshotIds=["snap-0f00cba1234567890"]) + assert len(snapshot_info["Snapshots"]) == 2 + + +@patch(module_name + ".build_request_args") +@patch(module_name + ".get_snapshots") +def test_list_ec2_snapshots(m_get_snapshots, m_build_request_args): + module = MagicMock() + connection = MagicMock() + + m_get_snapshots.return_value = { + "Snapshots": [ + { + "Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890", + "Encrypted": False, + "OwnerId": "123456789000", + "Progress": "100%", + "SnapshotId": "snap-0f00cba1234567890", + "StartTime": "2021-09-30T01:04:49.724000+00:00", + "State": "completed", + "StorageTier": "standard", + "Tags": [ + {"Key": "TagKey", "Value": "TagValue"}, + ], + "VolumeId": "vol-0ae6c5e1234567890", + "VolumeSize": 10, + } + ] + } + + m_build_request_args.return_value = {"SnapshotIds": ["snap-0f00cba1234567890"]} + + request_args = ec2_snapshot_info.build_request_args() + + ec2_snapshot_info.list_ec2_snapshots(connection, module, request_args) + + assert m_get_snapshots.call_count == 1 + m_get_snapshots.assert_has_calls( + [ + call(connection, module, m_build_request_args.return_value), + ] + ) + + +@patch(module_name + ".AnsibleAWSModule") +def test_main_success(m_AnsibleAWSModule): + m_module = MagicMock() + m_AnsibleAWSModule.return_value = m_module + + ec2_snapshot_info.main() + + m_module.client.assert_called_with("ec2", retry_decorator=ANY) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py index 73726590f..27517115e 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_ec2_vpc_dhcp_option.py @@ -3,66 +3,71 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -# Magic... Incorrectly identified by pylint as unused -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import -from ansible_collections.amazon.aws.tests.unit.compat.mock import patch +from unittest.mock import patch from ansible_collections.amazon.aws.plugins.modules import ec2_vpc_dhcp_option as dhcp_module from ansible_collections.amazon.aws.tests.unit.plugins.modules.utils import ModuleTestCase -test_module_params = {'domain_name': 'us-west-2.compute.internal', - 'dns_servers': ['AmazonProvidedDNS'], - 'ntp_servers': ['10.10.2.3', '10.10.4.5'], - 'netbios_name_servers': ['10.20.2.3', '10.20.4.5'], - 'netbios_node_type': 2} - -test_create_config = [{'Key': 'domain-name', 'Values': [{'Value': 'us-west-2.compute.internal'}]}, - {'Key': 'domain-name-servers', 'Values': [{'Value': 'AmazonProvidedDNS'}]}, - {'Key': 'ntp-servers', 'Values': [{'Value': '10.10.2.3'}, {'Value': '10.10.4.5'}]}, - {'Key': 'netbios-name-servers', 'Values': [{'Value': '10.20.2.3'}, {'Value': '10.20.4.5'}]}, - {'Key': 'netbios-node-type', 'Values': 2}] - - -test_create_option_set = [{'Key': 'domain-name', 'Values': ['us-west-2.compute.internal']}, - {'Key': 'domain-name-servers', 'Values': ['AmazonProvidedDNS']}, - {'Key': 'ntp-servers', 'Values': ['10.10.2.3', '10.10.4.5']}, - {'Key': 'netbios-name-servers', 'Values': ['10.20.2.3', '10.20.4.5']}, - {'Key': 'netbios-node-type', 'Values': ['2']}] - -test_normalize_config = {'domain-name': ['us-west-2.compute.internal'], - 'domain-name-servers': ['AmazonProvidedDNS'], - 'ntp-servers': ['10.10.2.3', '10.10.4.5'], - 'netbios-name-servers': ['10.20.2.3', '10.20.4.5'], - 'netbios-node-type': '2' - } - - -class FakeModule(object): +# Magic... Incorrectly identified by pylint as unused +# pylint: disable-next=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +test_module_params = { + "domain_name": "us-west-2.compute.internal", + "dns_servers": ["AmazonProvidedDNS"], + "ntp_servers": ["10.10.2.3", "10.10.4.5"], + "netbios_name_servers": ["10.20.2.3", "10.20.4.5"], + "netbios_node_type": 2, +} + +test_create_config = [ + {"Key": "domain-name", "Values": [{"Value": "us-west-2.compute.internal"}]}, + {"Key": "domain-name-servers", "Values": [{"Value": "AmazonProvidedDNS"}]}, + {"Key": "ntp-servers", "Values": [{"Value": "10.10.2.3"}, {"Value": "10.10.4.5"}]}, + {"Key": "netbios-name-servers", "Values": [{"Value": "10.20.2.3"}, {"Value": "10.20.4.5"}]}, + {"Key": "netbios-node-type", "Values": 2}, +] + + +test_create_option_set = [ + {"Key": "domain-name", "Values": ["us-west-2.compute.internal"]}, + {"Key": "domain-name-servers", "Values": ["AmazonProvidedDNS"]}, + {"Key": "ntp-servers", "Values": ["10.10.2.3", "10.10.4.5"]}, + {"Key": "netbios-name-servers", "Values": ["10.20.2.3", "10.20.4.5"]}, + {"Key": "netbios-node-type", "Values": ["2"]}, +] + +test_normalize_config = { + "domain-name": ["us-west-2.compute.internal"], + "domain-name-servers": ["AmazonProvidedDNS"], + "ntp-servers": ["10.10.2.3", "10.10.4.5"], + "netbios-name-servers": ["10.20.2.3", "10.20.4.5"], + "netbios-node-type": "2", +} + + +class FakeModule: def __init__(self, **kwargs): self.params = kwargs def fail_json(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise Exception('FAIL') + raise Exception("FAIL") def fail_json_aws(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise Exception('FAIL') + raise Exception("FAIL") def exit_json(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise Exception('EXIT') + raise Exception("EXIT") -@patch.object(dhcp_module.AnsibleAWSModule, 'client') +@patch.object(dhcp_module.AnsibleAWSModule, "client") class TestDhcpModule(ModuleTestCase): - def test_normalize_config(self, client_mock): result = dhcp_module.normalize_ec2_vpc_dhcp_config(test_create_config) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py index 5a53e2ddb..b2d8e0b50 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_kms_key.py @@ -4,12 +4,11 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -import pytest +from unittest.mock import MagicMock +from unittest.mock import patch -from unittest.mock import MagicMock, call, patch from ansible_collections.amazon.aws.plugins.modules import kms_key - module_name = "ansible_collections.amazon.aws.plugins.modules.kms_key" key_details = { "KeyMetadata": { @@ -59,7 +58,6 @@ key_details = { @patch(module_name + ".get_kms_metadata_with_backoff") def test_fetch_key_metadata(m_get_kms_metadata_with_backoff): - module = MagicMock() kms_client = MagicMock() @@ -69,14 +67,8 @@ def test_fetch_key_metadata(m_get_kms_metadata_with_backoff): def test_validate_params(): - module = MagicMock() - module.params = { - "state": "present", - "multi_region": True - } + module.params = {"state": "present", "multi_region": True} result = kms_key.validate_params(module, key_details["KeyMetadata"]) - module.fail_json.assert_called_with( - msg="You cannot change the multi-region property on an existing key." - ) + module.fail_json.assert_called_with(msg="You cannot change the multi-region property on an existing key.") diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py index 451a61766..cd3032ef7 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer.py @@ -4,12 +4,12 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch import pytest -from unittest.mock import MagicMock, call, patch from ansible_collections.amazon.aws.plugins.modules import lambda_layer @@ -19,155 +19,120 @@ def raise_lambdalayer_exception(e=None, m=None): return lambda_layer.LambdaLayerFailure(exc=e, msg=m) -mod_list_layer = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer.list_layer_versions' -mod_create_layer = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer.create_layer_version' -mod_delete_layer = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer.delete_layer_version' +mod_list_layer = "ansible_collections.amazon.aws.plugins.modules.lambda_layer.list_layer_versions" +mod_create_layer = "ansible_collections.amazon.aws.plugins.modules.lambda_layer.create_layer_version" +mod_delete_layer = "ansible_collections.amazon.aws.plugins.modules.lambda_layer.delete_layer_version" @pytest.mark.parametrize( - "params,api_result,calls,ansible_result", + "params,api_result,calls,_ansible_result", [ + ({"name": "testlayer", "version": 4}, [], [], {"changed": False, "layer_versions": []}), ( - { - "name": "testlayer", - "version": 4 - }, - [], - [], - {"changed": False, "layer_versions": []} - ), - ( - { - "name": "testlayer", - "version": 4 - }, + {"name": "testlayer", "version": 4}, [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], }, { "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 - } + "version": 1, + }, ], [], - {"changed": False, "layer_versions": []} + {"changed": False, "layer_versions": []}, ), ( - { - "name": "testlayer", - "version": 2 - }, + {"name": "testlayer", "version": 2}, [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], }, { "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 - } - ], - [ - call(LayerName='testlayer', VersionNumber=2) + "version": 1, + }, ], + [call(LayerName="testlayer", VersionNumber=2)], { "changed": True, "layer_versions": [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], } - ] - } + ], + }, ), ( - { - "name": "testlayer", - "version": -1 - }, + {"name": "testlayer", "version": -1}, [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], }, { "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 - } - ], - [ - call(LayerName='testlayer', VersionNumber=2), - call(LayerName='testlayer', VersionNumber=1) + "version": 1, + }, ], + [call(LayerName="testlayer", VersionNumber=2), call(LayerName="testlayer", VersionNumber=1)], { "changed": True, "layer_versions": [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], }, { "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 - } - ] - } - ) - ] + "version": 1, + }, + ], + }, + ), + ], ) @patch(mod_list_layer) -def test_delete_layer(m_list_layer, params, api_result, calls, ansible_result): - +def test_delete_layer(m_list_layer, params, api_result, calls, _ansible_result): lambda_client = MagicMock() lambda_client.delete_layer_version.return_value = None m_list_layer.return_value = api_result result = lambda_layer.delete_layer_version(lambda_client, params) - assert result == ansible_result + assert result == _ansible_result - m_list_layer.assert_called_once_with( - lambda_client, params.get("name") - ) + m_list_layer.assert_called_once_with(lambda_client, params.get("name")) if not calls: lambda_client.delete_layer_version.assert_not_called() @@ -177,62 +142,54 @@ def test_delete_layer(m_list_layer, params, api_result, calls, ansible_result): @patch(mod_list_layer) def test_delete_layer_check_mode(m_list_layer): - lambda_client = MagicMock() lambda_client.delete_layer_version.return_value = None m_list_layer.return_value = [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], }, { "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 - } + "version": 1, + }, ] params = {"name": "testlayer", "version": -1} result = lambda_layer.delete_layer_version(lambda_client, params, check_mode=True) - ansible_result = { + _ansible_result = { "changed": True, "layer_versions": [ { - 'compatible_runtimes': ["python3.7"], - 'created_date': "2022-09-29T10:31:35.977+0000", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", + "compatible_runtimes": ["python3.7"], + "created_date": "2022-09-29T10:31:35.977+0000", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:2", "license_info": "MIT", - 'version': 2, - 'compatible_architectures': [ - 'arm64' - ] + "version": 2, + "compatible_architectures": ["arm64"], }, { "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 - } - ] + "version": 1, + }, + ], } - assert result == ansible_result + assert result == _ansible_result - m_list_layer.assert_called_once_with( - lambda_client, params.get("name") - ) + m_list_layer.assert_called_once_with(lambda_client, params.get("name")) lambda_client.delete_layer_version.assert_not_called() @patch(mod_list_layer) def test_delete_layer_failure(m_list_layer): - lambda_client = MagicMock() lambda_client.delete_layer_version.side_effect = raise_lambdalayer_exception() @@ -241,7 +198,7 @@ def test_delete_layer_failure(m_list_layer): "created_date": "2022-09-29T10:31:26.341+0000", "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:testlayer:1", - "version": 1 + "version": 1, } ] params = {"name": "testlayer", "version": 1} @@ -249,42 +206,38 @@ def test_delete_layer_failure(m_list_layer): lambda_layer.delete_layer_version(lambda_client, params) -@pytest.mark.parametrize( - "b_s3content", - [ - (True), - (False) - ] -) +@pytest.mark.parametrize("b_s3content", [(True), (False)]) @patch(mod_list_layer) def test_create_layer(m_list_layer, b_s3content, tmp_path): params = { "name": "testlayer", "description": "ansible units testing sample layer", "content": {}, - "license_info": "MIT" + "license_info": "MIT", } lambda_client = MagicMock() lambda_client.publish_layer_version.return_value = { - 'CompatibleRuntimes': [ - 'python3.6', - 'python3.7', + "CompatibleRuntimes": [ + "python3.6", + "python3.7", ], - 'Content': { - 'CodeSha256': 'tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=', - 'CodeSize': 169, - 'Location': 'https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb', + "Content": { + "CodeSha256": "tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=", + "CodeSize": 169, + "Location": ( + "https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb" + ), }, - 'CreatedDate': '2018-11-14T23:03:52.894+0000', - 'Description': "ansible units testing sample layer", - 'LayerArn': 'arn:aws:lambda:us-west-2:123456789012:layer:my-layer', - 'LayerVersionArn': 'arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1', - 'LicenseInfo': 'MIT', - 'Version': 1, - 'ResponseMetadata': { - 'http_header': 'true', + "CreatedDate": "2018-11-14T23:03:52.894+0000", + "Description": "ansible units testing sample layer", + "LayerArn": "arn:aws:lambda:us-west-2:123456789012:layer:my-layer", + "LayerVersionArn": "arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1", + "LicenseInfo": "MIT", + "Version": 1, + "ResponseMetadata": { + "http_header": "true", }, } @@ -292,33 +245,25 @@ def test_create_layer(m_list_layer, b_s3content, tmp_path): "changed": True, "layer_versions": [ { - 'compatible_runtimes': ['python3.6', 'python3.7'], - 'content': { - 'code_sha256': 'tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=', - 'code_size': 169, - 'location': 'https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb' + "compatible_runtimes": ["python3.6", "python3.7"], + "content": { + "code_sha256": "tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=", + "code_size": 169, + "location": "https://awslambda-us-west-2-layers.s3.us-west-2.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb", }, - 'created_date': '2018-11-14T23:03:52.894+0000', - 'description': 'ansible units testing sample layer', - 'layer_arn': 'arn:aws:lambda:us-west-2:123456789012:layer:my-layer', - 'layer_version_arn': 'arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1', - 'license_info': 'MIT', - 'version': 1 + "created_date": "2018-11-14T23:03:52.894+0000", + "description": "ansible units testing sample layer", + "layer_arn": "arn:aws:lambda:us-west-2:123456789012:layer:my-layer", + "layer_version_arn": "arn:aws:lambda:us-west-2:123456789012:layer:testlayer:1", + "license_info": "MIT", + "version": 1, } - ] + ], } if b_s3content: - params["content"] = { - "s3_bucket": "mybucket", - "s3_key": "mybucket-key", - "s3_object_version": "v1" - } - content_arg = { - "S3Bucket": "mybucket", - "S3Key": "mybucket-key", - "S3ObjectVersion": "v1" - } + params["content"] = {"s3_bucket": "mybucket", "s3_key": "mybucket-key", "s3_object_version": "v1"} + content_arg = {"S3Bucket": "mybucket", "S3Key": "mybucket-key", "S3ObjectVersion": "v1"} else: binary_data = b"simple lambda layer content" test_dir = tmp_path / "lambda_layer" @@ -350,12 +295,8 @@ def test_create_layer_check_mode(m_list_layer): params = { "name": "testlayer", "description": "ansible units testing sample layer", - "content": { - "s3_bucket": "mybucket", - "s3_key": "mybucket-key", - "s3_object_version": "v1" - }, - "license_info": "MIT" + "content": {"s3_bucket": "mybucket", "s3_key": "mybucket-key", "s3_object_version": "v1"}, + "license_info": "MIT", } lambda_client = MagicMock() @@ -371,19 +312,9 @@ def test_create_layer_failure(): params = { "name": "testlayer", "description": "ansible units testing sample layer", - "content": { - "s3_bucket": "mybucket", - "s3_key": "mybucket-key", - "s3_object_version": "v1" - }, - "compatible_runtimes": [ - "nodejs", - "python3.9" - ], - "compatible_architectures": [ - 'x86_64', - 'arm64' - ] + "content": {"s3_bucket": "mybucket", "s3_key": "mybucket-key", "s3_object_version": "v1"}, + "compatible_runtimes": ["nodejs", "python3.9"], + "compatible_architectures": ["x86_64", "arm64"], } lambda_client = MagicMock() lambda_client.publish_layer_version.side_effect = raise_lambdalayer_exception() @@ -399,14 +330,8 @@ def test_create_layer_using_unexisting_file(): "content": { "zip_file": "this_file_does_not_exist", }, - "compatible_runtimes": [ - "nodejs", - "python3.9" - ], - "compatible_architectures": [ - 'x86_64', - 'arm64' - ] + "compatible_runtimes": ["nodejs", "python3.9"], + "compatible_architectures": ["x86_64", "arm64"], } lambda_client = MagicMock() @@ -421,28 +346,15 @@ def test_create_layer_using_unexisting_file(): @pytest.mark.parametrize( "params,failure", [ - ( - {"name": "test-layer"}, - False - ), - ( - {"name": "test-layer", "state": "absent"}, - False - ), - ( - {"name": "test-layer"}, - True - ), - ( - {"name": "test-layer", "state": "absent"}, - True - ), - ] + ({"name": "test-layer"}, False), + ({"name": "test-layer", "state": "absent"}, False), + ({"name": "test-layer"}, True), + ({"name": "test-layer", "state": "absent"}, True), + ], ) @patch(mod_create_layer) @patch(mod_delete_layer) def test_execute_module(m_delete_layer, m_create_layer, params, failure): - module = MagicMock() module.params = params module.check_mode = False @@ -462,9 +374,7 @@ def test_execute_module(m_delete_layer, m_create_layer, params, failure): module.exit_json.assert_called_with(**result) module.fail_json_aws.assert_not_called() - m_create_layer.assert_called_with( - lambda_client, params, module.check_mode - ) + m_create_layer.assert_called_with(lambda_client, params, module.check_mode) m_delete_layer.assert_not_called() elif state == "absent": @@ -474,9 +384,7 @@ def test_execute_module(m_delete_layer, m_create_layer, params, failure): module.exit_json.assert_called_with(**result) module.fail_json_aws.assert_not_called() - m_delete_layer.assert_called_with( - lambda_client, params, module.check_mode - ) + m_delete_layer.assert_called_with(lambda_client, params, module.check_mode) m_create_layer.assert_not_called() else: exc = "lambdalayer_execute_module_exception" @@ -488,6 +396,4 @@ def test_execute_module(m_delete_layer, m_create_layer, params, failure): lambda_layer.execute_module(module, lambda_client) module.exit_json.assert_not_called() - module.fail_json_aws.assert_called_with( - exc, msg=msg - ) + module.fail_json_aws.assert_called_with(exc, msg=msg) diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py index 25a1f15ac..201625401 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_lambda_layer_info.py @@ -4,104 +4,85 @@ # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch import pytest from botocore.exceptions import BotoCoreError -from unittest.mock import MagicMock, call, patch from ansible_collections.amazon.aws.plugins.modules import lambda_layer_info - -mod__list_layer_versions = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layer_versions' -mod__list_layers = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layers' -mod_list_layer_versions = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layer_versions' -mod_list_layers = 'ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layers' +mod__list_layer_versions = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layer_versions" +mod__list_layers = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info._list_layers" +mod_list_layer_versions = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layer_versions" +mod_list_layers = "ansible_collections.amazon.aws.plugins.modules.lambda_layer_info.list_layers" list_layers_paginate_result = { - 'NextMarker': '002', - 'Layers': [ + "NextMarker": "002", + "Layers": [ { - 'LayerName': "test-layer-01", - 'LayerArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01", - 'LatestMatchingVersion': { - 'LayerVersionArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1", - 'Version': 1, - 'Description': "lambda layer created for unit tests", - 'CreatedDate': "2022-09-29T10:31:26.341+0000", - 'CompatibleRuntimes': [ - 'nodejs', - 'nodejs4.3', - 'nodejs6.10' - ], - 'LicenseInfo': 'MIT', - 'CompatibleArchitectures': [ - 'arm64' - ] - } + "LayerName": "test-layer-01", + "LayerArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01", + "LatestMatchingVersion": { + "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1", + "Version": 1, + "Description": "lambda layer created for unit tests", + "CreatedDate": "2022-09-29T10:31:26.341+0000", + "CompatibleRuntimes": ["nodejs", "nodejs4.3", "nodejs6.10"], + "LicenseInfo": "MIT", + "CompatibleArchitectures": ["arm64"], + }, }, { - 'LayerName': "test-layer-02", - 'LayerArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02", - 'LatestMatchingVersion': { - 'LayerVersionArn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1", - 'Version': 1, - 'CreatedDate': "2022-09-29T10:31:26.341+0000", - 'CompatibleArchitectures': [ - 'arm64' - ] - } + "LayerName": "test-layer-02", + "LayerArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02", + "LatestMatchingVersion": { + "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1", + "Version": 1, + "CreatedDate": "2022-09-29T10:31:26.341+0000", + "CompatibleArchitectures": ["arm64"], + }, }, ], - 'ResponseMetadata': { - 'http': 'true', + "ResponseMetadata": { + "http": "true", }, } list_layers_result = [ { - 'layer_name': "test-layer-01", - 'layer_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1", - 'version': 1, - 'description': "lambda layer created for unit tests", - 'created_date': "2022-09-29T10:31:26.341+0000", - 'compatible_runtimes': [ - 'nodejs', - 'nodejs4.3', - 'nodejs6.10' - ], - 'license_info': 'MIT', - 'compatible_architectures': [ - 'arm64' - ] + "layer_name": "test-layer-01", + "layer_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-01:1", + "version": 1, + "description": "lambda layer created for unit tests", + "created_date": "2022-09-29T10:31:26.341+0000", + "compatible_runtimes": ["nodejs", "nodejs4.3", "nodejs6.10"], + "license_info": "MIT", + "compatible_architectures": ["arm64"], }, { - 'layer_name': "test-layer-02", - 'layer_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02", - 'layer_version_arn': "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1", - 'version': 1, - 'created_date': "2022-09-29T10:31:26.341+0000", - 'compatible_architectures': [ - 'arm64' - ] - } + "layer_name": "test-layer-02", + "layer_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02", + "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:test-layer-02:1", + "version": 1, + "created_date": "2022-09-29T10:31:26.341+0000", + "compatible_architectures": ["arm64"], + }, ] list_layers_versions_paginate_result = { - 'LayerVersions': [ + "LayerVersions": [ { - 'CompatibleRuntimes': ["python3.7"], - 'CreatedDate': "2022-09-29T10:31:35.977+0000", - 'LayerVersionArn': "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:2", + "CompatibleRuntimes": ["python3.7"], + "CreatedDate": "2022-09-29T10:31:35.977+0000", + "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:2", "LicenseInfo": "MIT", - 'Version': 2, - 'CompatibleArchitectures': [ - 'arm64' - ] + "Version": 2, + "CompatibleArchitectures": ["arm64"], }, { "CompatibleRuntimes": ["python3.7"], @@ -109,13 +90,13 @@ list_layers_versions_paginate_result = { "Description": "lambda layer first version", "LayerVersionArn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:1", "LicenseInfo": "GPL-3.0-only", - "Version": 1 - } + "Version": 1, + }, ], - 'ResponseMetadata': { - 'http': 'true', + "ResponseMetadata": { + "http": "true", }, - 'NextMarker': '001', + "NextMarker": "001", } @@ -126,9 +107,7 @@ list_layers_versions_result = [ "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:2", "license_info": "MIT", "version": 2, - 'compatible_architectures': [ - 'arm64' - ] + "compatible_architectures": ["arm64"], }, { "compatible_runtimes": ["python3.7"], @@ -136,8 +115,8 @@ list_layers_versions_result = [ "description": "lambda layer first version", "layer_version_arn": "arn:aws:lambda:eu-west-2:123456789012:layer:layer-01:1", "license_info": "GPL-3.0-only", - "version": 1 - } + "version": 1, + }, ] @@ -145,14 +124,8 @@ list_layers_versions_result = [ "params,call_args", [ ( - { - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - }, - { - "CompatibleRuntime": "nodejs", - "CompatibleArchitecture": "arm64" - } + {"compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, + {"CompatibleRuntime": "nodejs", "CompatibleArchitecture": "arm64"}, ), ( { @@ -160,34 +133,20 @@ list_layers_versions_result = [ }, { "CompatibleRuntime": "nodejs", - } - ), - ( - { - "compatible_architecture": "arm64" }, - { - "CompatibleArchitecture": "arm64" - } ), - ( - {}, {} - ) - ] + ({"compatible_architecture": "arm64"}, {"CompatibleArchitecture": "arm64"}), + ({}, {}), + ], ) @patch(mod__list_layers) def test_list_layers_with_latest_version(m__list_layers, params, call_args): - lambda_client = MagicMock() m__list_layers.return_value = list_layers_paginate_result layers = lambda_layer_info.list_layers(lambda_client, **params) - m__list_layers.assert_has_calls( - [ - call(lambda_client, **call_args) - ] - ) + m__list_layers.assert_has_calls([call(lambda_client, **call_args)]) assert layers == list_layers_result @@ -195,16 +154,8 @@ def test_list_layers_with_latest_version(m__list_layers, params, call_args): "params,call_args", [ ( - { - "name": "layer-01", - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - }, - { - "LayerName": "layer-01", - "CompatibleRuntime": "nodejs", - "CompatibleArchitecture": "arm64" - } + {"name": "layer-01", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, + {"LayerName": "layer-01", "CompatibleRuntime": "nodejs", "CompatibleArchitecture": "arm64"}, ), ( { @@ -214,36 +165,23 @@ def test_list_layers_with_latest_version(m__list_layers, params, call_args): { "LayerName": "layer-01", "CompatibleRuntime": "nodejs", - } - ), - ( - { - "name": "layer-01", - "compatible_architecture": "arm64" }, - { - "LayerName": "layer-01", - "CompatibleArchitecture": "arm64" - } ), ( - {"name": "layer-01"}, {"LayerName": "layer-01"} - ) - ] + {"name": "layer-01", "compatible_architecture": "arm64"}, + {"LayerName": "layer-01", "CompatibleArchitecture": "arm64"}, + ), + ({"name": "layer-01"}, {"LayerName": "layer-01"}), + ], ) @patch(mod__list_layer_versions) def test_list_layer_versions(m__list_layer_versions, params, call_args): - lambda_client = MagicMock() m__list_layer_versions.return_value = list_layers_versions_paginate_result layers = lambda_layer_info.list_layer_versions(lambda_client, **params) - m__list_layer_versions.assert_has_calls( - [ - call(lambda_client, **call_args) - ] - ) + m__list_layer_versions.assert_has_calls([call(lambda_client, **call_args)]) assert layers == list_layers_versions_result @@ -251,28 +189,69 @@ def raise_botocore_exception(): return BotoCoreError(error="failed", operation="list_layers") +def test_get_layer_version_success(): + aws_layer_version = { + "CompatibleRuntimes": ["python3.8"], + "Content": { + "CodeSha256": "vqxKx6nTW31obVcB4MYaTWv5H3fBQTn2PHklL9+mF9E=", + "CodeSize": 9492621, + "Location": "https://test.s3.us-east-1.amazonaws.com/snapshots/123456789012/test-79b29d149e06?versionId=nmEKA3ZgiP7hce3J", + }, + "CreatedDate": "2022-12-05T10:47:32.379+0000", + "Description": "Python units test layer", + "LayerArn": "arn:aws:lambda:us-east-1:123456789012:layer:test", + "LayerVersionArn": "arn:aws:lambda:us-east-1:123456789012:layer:test:2", + "LicenseInfo": "GPL-3.0-only", + "Version": 2, + "ResponseMetadata": {"some-metadata": "some-result"}, + } + + ansible_layer_version = { + "compatible_runtimes": ["python3.8"], + "content": { + "code_sha256": "vqxKx6nTW31obVcB4MYaTWv5H3fBQTn2PHklL9+mF9E=", + "code_size": 9492621, + "location": "https://test.s3.us-east-1.amazonaws.com/snapshots/123456789012/test-79b29d149e06?versionId=nmEKA3ZgiP7hce3J", + }, + "created_date": "2022-12-05T10:47:32.379+0000", + "description": "Python units test layer", + "layer_arn": "arn:aws:lambda:us-east-1:123456789012:layer:test", + "layer_version_arn": "arn:aws:lambda:us-east-1:123456789012:layer:test:2", + "license_info": "GPL-3.0-only", + "version": 2, + } + + lambda_client = MagicMock() + lambda_client.get_layer_version.return_value = aws_layer_version + + layer_name = "test" + layer_version = 2 + + assert [ansible_layer_version] == lambda_layer_info.get_layer_version(lambda_client, layer_name, layer_version) + lambda_client.get_layer_version.assert_called_once_with(LayerName=layer_name, VersionNumber=layer_version) + + +def test_get_layer_version_failure(): + lambda_client = MagicMock() + lambda_client.get_layer_version.side_effect = raise_botocore_exception() + + layer_name = MagicMock() + layer_version = MagicMock() + + with pytest.raises(lambda_layer_info.LambdaLayerInfoFailure): + lambda_layer_info.get_layer_version(lambda_client, layer_name, layer_version) + + @pytest.mark.parametrize( "params", [ - ( - { - "name": "test-layer", - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - } - ), - ( - { - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - } - ) - ] + ({"name": "test-layer", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}), + ({"compatible_runtime": "nodejs", "compatible_architecture": "arm64"}), + ], ) @patch(mod__list_layers) @patch(mod__list_layer_versions) def test_list_layers_with_failure(m__list_layer_versions, m__list_layers, params): - lambda_client = MagicMock() if "name" in params: @@ -293,35 +272,14 @@ def raise_layer_info_exception(exc, msg): @pytest.mark.parametrize( "params,failure", [ - ( - { - "name": "test-layer", - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - }, - False - ), - ( - { - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - }, - False - ), - ( - { - "name": "test-layer", - "compatible_runtime": "nodejs", - "compatible_architecture": "arm64" - }, - True - ) - ] + ({"name": "test-layer", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, False), + ({"compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, False), + ({"name": "test-layer", "compatible_runtime": "nodejs", "compatible_architecture": "arm64"}, True), + ], ) @patch(mod_list_layers) @patch(mod_list_layer_versions) def test_execute_module(m_list_layer_versions, m_list_layers, params, failure): - lambda_client = MagicMock() module = MagicMock() @@ -351,8 +309,6 @@ def test_execute_module(m_list_layer_versions, m_list_layers, params, failure): with pytest.raises(SystemExit): lambda_layer_info.execute_module(module, lambda_client) - module.exit_json.assert_called_with( - changed=False, layers_versions=result - ) + module.exit_json.assert_called_with(changed=False, layers_versions=result) method_called.assert_called_with(lambda_client, **params) method_not_called.list_layers.assert_not_called() diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py new file mode 100644 index 000000000..8db20f1a0 --- /dev/null +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_rds_instance_info.py @@ -0,0 +1,121 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from unittest.mock import ANY +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import botocore.exceptions +import pytest + +from ansible_collections.amazon.aws.plugins.modules import rds_instance_info + +mod_name = "ansible_collections.amazon.aws.plugins.modules.rds_instance_info" + + +def a_boto_exception(): + return botocore.exceptions.UnknownServiceError(service_name="Whoops", known_service_names="Oula") + + +@patch(mod_name + "._describe_db_instances") +@patch(mod_name + ".get_instance_tags") +def test_instance_info_one_instance(m_get_instance_tags, m_describe_db_instances): + conn = MagicMock() + instance_name = "my-instance" + m_get_instance_tags.return_value = [] + m_describe_db_instances.return_value = [ + { + "DBInstanceIdentifier": instance_name, + "DBInstanceArn": "arn:aws:rds:us-east-2:123456789012:og:" + instance_name, + } + ] + rds_instance_info.instance_info(conn, instance_name, filters={}) + + m_describe_db_instances.assert_called_with(conn, DBInstanceIdentifier=instance_name) + m_get_instance_tags.assert_called_with(conn, arn="arn:aws:rds:us-east-2:123456789012:og:" + instance_name) + + +@patch(mod_name + "._describe_db_instances") +@patch(mod_name + ".get_instance_tags") +def test_instance_info_all_instances(m_get_instance_tags, m_describe_db_instances): + conn = MagicMock() + m_get_instance_tags.return_value = [] + m_describe_db_instances.return_value = [ + { + "DBInstanceIdentifier": "first-instance", + "DBInstanceArn": "arn:aws:rds:us-east-2:123456789012:og:first-instance", + }, + { + "DBInstanceIdentifier": "second-instance", + "DBInstanceArn": "arn:aws:rds:us-east-2:123456789012:og:second-instance", + }, + ] + rds_instance_info.instance_info(conn, instance_name=None, filters={"engine": "postgres"}) + + m_describe_db_instances.assert_called_with(conn, Filters=[{"Name": "engine", "Values": ["postgres"]}]) + assert m_get_instance_tags.call_count == 2 + m_get_instance_tags.assert_has_calls( + [ + call(conn, arn="arn:aws:rds:us-east-2:123456789012:og:first-instance"), + call(conn, arn="arn:aws:rds:us-east-2:123456789012:og:second-instance"), + ] + ) + + +def test_get_instance_tags(): + conn = MagicMock() + conn.list_tags_for_resource.return_value = { + "TagList": [ + {"Key": "My-tag", "Value": "the-value$"}, + ], + "NextToken": "some-token", + } + + tags = rds_instance_info.get_instance_tags(conn, "arn:aws:rds:us-east-2:123456789012:og:second-instance") + conn.list_tags_for_resource.assert_called_with( + ResourceName="arn:aws:rds:us-east-2:123456789012:og:second-instance", + aws_retry=True, + ) + assert tags == {"My-tag": "the-value$"} + + +def test_api_failure_get_tag(): + conn = MagicMock() + conn.list_tags_for_resource.side_effect = a_boto_exception() + + with pytest.raises(rds_instance_info.RdsInstanceInfoFailure): + rds_instance_info.get_instance_tags(conn, "arn:blabla") + + +def test_api_failure_describe(): + conn = MagicMock() + conn.get_paginator.side_effect = a_boto_exception() + + with pytest.raises(rds_instance_info.RdsInstanceInfoFailure): + rds_instance_info.instance_info(conn, None, {}) + + +@patch(mod_name + ".AnsibleAWSModule") +def test_main_success(m_AnsibleAWSModule): + m_module = MagicMock() + m_AnsibleAWSModule.return_value = m_module + + rds_instance_info.main() + + m_module.client.assert_called_with("rds", retry_decorator=ANY) + m_module.exit_json.assert_called_with(changed=False, instances=[]) + + +@patch(mod_name + "._describe_db_instances") +@patch(mod_name + ".AnsibleAWSModule") +def test_main_failure(m_AnsibleAWSModule, m_describe_db_instances): + m_module = MagicMock() + m_AnsibleAWSModule.return_value = m_module + m_describe_db_instances.side_effect = a_boto_exception() + + rds_instance_info.main() + + m_module.client.assert_called_with("rds", retry_decorator=ANY) + m_module.fail_json_aws.assert_called_with(ANY, "Couldn't get instance information") diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py index b02513072..deeb1c4a0 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/test_s3_object.py @@ -1,29 +1,156 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +# (c) 2022 Red Hat Inc. -from ansible.module_utils.six.moves.urllib.parse import urlparse +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import MagicMock +from unittest.mock import patch + +import botocore.exceptions +import pytest from ansible_collections.amazon.aws.plugins.modules import s3_object +module_name = "ansible_collections.amazon.aws.plugins.modules.s3_object" +utils = "ansible_collections.amazon.aws.plugins.module_utils.ec2" + + +@patch(module_name + ".paginated_list") +def test_list_keys_success(m_paginated_list): + s3 = MagicMock() + + m_paginated_list.return_value = ["delete.txt"] + + assert ["delete.txt"] == s3_object.list_keys(s3, "a987e6b6026ab04e4717", "", "", 1000) + m_paginated_list.assert_called_once() + + +@patch(module_name + ".paginated_list") +def test_list_keys_failure(m_paginated_list): + s3 = MagicMock() + + m_paginated_list.side_effect = botocore.exceptions.BotoCoreError + + with pytest.raises(s3_object.S3ObjectFailure): + s3_object.list_keys(s3, "a987e6b6026ab04e4717", "", "", 1000) + + +@patch(module_name + ".delete_key") +def test_s3_object_do_delobj_success(m_delete_key): + module = MagicMock() + s3 = MagicMock() + var_dict = { + "object": "/usr/local/myfile.txt", + "bucket": "a987e6b6026ab04e4717", + } + s3_object.s3_object_do_delobj(module, s3, s3, var_dict) + assert m_delete_key.call_count == 1 + module.exit_json.assert_called_with(msg="Object deleted from bucket a987e6b6026ab04e4717.", changed=True) + + +@patch(module_name + ".delete_key") +def test_s3_object_do_delobj_failure_nobucket(m_delete_key): + module = MagicMock() + s3 = MagicMock() + + var_dict = {"object": "/usr/local/myfile.txt", "bucket": ""} + s3_object.s3_object_do_delobj(module, s3, s3, var_dict) + assert m_delete_key.call_count == 0 + module.fail_json.assert_called_with(msg="Bucket parameter is required.") + + +@patch(module_name + ".delete_key") +def test_s3_object_do_delobj_failure_noobj(m_delete_key): + module = MagicMock() + s3 = MagicMock() + var_dict = {"bucket": "a987e6b6026ab04e4717", "object": ""} + s3_object.s3_object_do_delobj(module, s3, s3, var_dict) + assert m_delete_key.call_count == 0 + module.fail_json.assert_called_with(msg="object parameter is required") + + +@patch(module_name + ".paginated_list") +@patch(module_name + ".list_keys") +def test_s3_object_do_list_success(m_paginated_list, m_list_keys): + module = MagicMock() + s3 = MagicMock() + + m_paginated_list.return_value = ["delete.txt"] + var_dict = { + "bucket": "a987e6b6026ab04e4717", + "prefix": "", + "marker": "", + "max_keys": 1000, + "bucketrtn": True, + } + + s3_object.s3_object_do_list(module, s3, s3, var_dict) + assert m_paginated_list.call_count == 1 + # assert m_list_keys.call_count == 1 + # module.exit_json.assert_called_with(msg="LIST operation complete", s3_keys=['delete.txt']) + + +@patch(utils + ".get_aws_connection_info") +def test_populate_params(m_get_aws_connection_info): + module = MagicMock() + m_get_aws_connection_info.return_value = ( + "us-east-1", + None, + { + "aws_access_key_id": "xxxx", + "aws_secret_access_key": "yyyy", + "aws_session_token": "zzzz", + "verify": True, + }, + ) -class TestUrlparse(): + module.params = { + "bucket": "4a6cfe3c17b798613fa77b462e402984", + "ceph": False, + "content": None, + "content_base64": None, + "copy_src": None, + "debug_botocore_endpoint_logs": True, + "dest": None, + "dualstack": False, + "encrypt": True, + "encryption_kms_key_id": None, + "encryption_mode": "AES256", + "endpoint_url": None, + "expiry": 600, + "headers": None, + "ignore_nonexistent_bucket": False, + "marker": "", + "max_keys": 1000, + "metadata": None, + "mode": "create", + "object": None, + "overwrite": "latest", + "permission": ["private"], + "prefix": "", + "profile": None, + "purge_tags": True, + "region": "us-east-1", + "retries": 0, + "sig_v4": True, + "src": None, + "tags": None, + "validate_bucket_name": False, + "validate_certs": True, + "version": None, + } + result = s3_object.populate_params(module) + for k, v in module.params.items(): + assert result[k] == v - def test_urlparse(self): - actual = urlparse("http://test.com/here") - assert actual.scheme == "http" - assert actual.netloc == "test.com" - assert actual.path == "/here" + module.params.update({"object": "example.txt", "mode": "get"}) + result = s3_object.populate_params(module) + assert result["object"] == "example.txt" - def test_is_fakes3(self): - actual = s3_object.is_fakes3("fakes3://bla.blubb") - assert actual is True + module.params.update({"object": "/example.txt", "mode": "get"}) + result = s3_object.populate_params(module) + assert result["object"] == "example.txt" - def test_get_s3_connection(self): - aws_connect_kwargs = dict(aws_access_key_id="access_key", - aws_secret_access_key="secret_key") - location = None - rgw = True - s3_url = "http://bla.blubb" - actual = s3_object.get_s3_connection(None, aws_connect_kwargs, location, rgw, s3_url) - assert "bla.blubb" in str(actual._endpoint) + module.params.update({"object": "example.txt", "mode": "delete"}) + result = s3_object.populate_params(module) + module.fail_json.assert_called_with(msg="Parameter object cannot be used with mode=delete") diff --git a/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py b/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py index 058a5b605..72b3b887e 100644 --- a/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py +++ b/ansible_collections/amazon/aws/tests/unit/plugins/modules/utils.py @@ -1,21 +1,18 @@ -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - import json +import unittest +from unittest.mock import patch -from ansible_collections.amazon.aws.tests.unit.compat import unittest -from ansible_collections.amazon.aws.tests.unit.compat.mock import patch from ansible.module_utils import basic from ansible.module_utils._text import to_bytes def set_module_args(args): - if '_ansible_remote_tmp' not in args: - args['_ansible_remote_tmp'] = '/tmp' - if '_ansible_keep_remote_files' not in args: - args['_ansible_keep_remote_files'] = False + if "_ansible_remote_tmp" not in args: + args["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in args: + args["_ansible_keep_remote_files"] = False - args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + args = json.dumps({"ANSIBLE_MODULE_ARGS": args}) basic._ANSIBLE_ARGS = to_bytes(args) @@ -28,22 +25,21 @@ class AnsibleFailJson(Exception): def exit_json(*args, **kwargs): - if 'changed' not in kwargs: - kwargs['changed'] = False + if "changed" not in kwargs: + kwargs["changed"] = False raise AnsibleExitJson(kwargs) def fail_json(*args, **kwargs): - kwargs['failed'] = True + kwargs["failed"] = True raise AnsibleFailJson(kwargs) class ModuleTestCase(unittest.TestCase): - def setUp(self): self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json) self.mock_module.start() - self.mock_sleep = patch('time.sleep') + self.mock_sleep = patch("time.sleep") self.mock_sleep.start() set_module_args({}) self.addCleanup(self.mock_module.stop) |