diff options
Diffstat (limited to 'ansible_collections/amazon/aws/tests/unit/module_utils')
5 files changed, 214 insertions, 70 deletions
diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py b/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py index 9f3e4194b..a5ce452fc 100644 --- a/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py +++ b/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py @@ -209,3 +209,71 @@ class TestIsBoto3ErrorCode: assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) assert issubclass(returned_exception, Exception) assert returned_exception.__name__ == "NeverEverRaisedException" + + def test_is_boto3_error_code_tuple__pass__client(self): + passed_exception = self._make_denied_exception() + returned_exception = is_boto3_error_code(("NotAccessDenied", "AccessDenied"), e=passed_exception) + assert isinstance(passed_exception, returned_exception) + assert issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ != "NeverEverRaisedException" + + returned_exception = is_boto3_error_code(("AccessDenied", "NotAccessDenied"), e=passed_exception) + assert isinstance(passed_exception, returned_exception) + assert issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ != "NeverEverRaisedException" + + def test_is_boto3_error_code_tuple__pass__unexpected(self): + passed_exception = self._make_unexpected_exception() + returned_exception = is_boto3_error_code(("NotAccessDenied", "AccessDenied"), e=passed_exception) + assert not isinstance(passed_exception, returned_exception) + assert not issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ == "NeverEverRaisedException" + + def test_is_boto3_error_code_tuple__pass__botocore(self): + passed_exception = self._make_botocore_exception() + returned_exception = is_boto3_error_code(("NotAccessDenied", "AccessDenied"), e=passed_exception) + assert not isinstance(passed_exception, returned_exception) + assert not issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ == "NeverEverRaisedException" + + def test_is_boto3_error_code_set__pass__client(self): + passed_exception = self._make_denied_exception() + returned_exception = is_boto3_error_code({"NotAccessDenied", "AccessDenied"}, e=passed_exception) + assert isinstance(passed_exception, returned_exception) + assert issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ != "NeverEverRaisedException" + + returned_exception = is_boto3_error_code({"AccessDenied", "NotAccessDenied"}, e=passed_exception) + assert isinstance(passed_exception, returned_exception) + assert issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ != "NeverEverRaisedException" + + def test_is_boto3_error_code_set__pass__unexpected(self): + passed_exception = self._make_unexpected_exception() + returned_exception = is_boto3_error_code({"NotAccessDenied", "AccessDenied"}, e=passed_exception) + assert not isinstance(passed_exception, returned_exception) + assert not issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ == "NeverEverRaisedException" + + def test_is_boto3_error_code_set__pass__botocore(self): + passed_exception = self._make_botocore_exception() + returned_exception = is_boto3_error_code({"NotAccessDenied", "AccessDenied"}, e=passed_exception) + assert not isinstance(passed_exception, returned_exception) + assert not issubclass(returned_exception, botocore.exceptions.ClientError) + assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError) + assert issubclass(returned_exception, Exception) + assert returned_exception.__name__ == "NeverEverRaisedException" diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py b/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py index 28090f993..0a6830311 100644 --- a/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py +++ b/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py @@ -451,10 +451,10 @@ class TestIamResourceToAnsibleDict: OUTPUT = { "arn": "arn:aws:iam::123456789012:role/ansible-test-76640355", "assume_role_policy_document": { - "statement": [ - {"action": "sts:AssumeRole", "effect": "Deny", "principal": {"service": "ec2.amazonaws.com"}} + "Statement": [ + {"Action": "sts:AssumeRole", "Effect": "Deny", "Principal": {"Service": "ec2.amazonaws.com"}} ], - "version": "2012-10-17", + "Version": "2012-10-17", }, "assume_role_policy_document_raw": { "Statement": [ diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py b/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py index c61de1391..688514f59 100644 --- a/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py +++ b/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py @@ -70,7 +70,7 @@ def test_region(monkeypatch, stdin): aws_module = utils_module.AnsibleAWSModule(argument_spec=dict()) assert aws_module.region is sentinel.RETURNED_REGION - assert get_aws_region.call_args == call(aws_module, True) + assert get_aws_region.call_args == call(aws_module) @pytest.mark.parametrize("stdin", [{}], indirect=["stdin"]) @@ -129,7 +129,7 @@ def test_client_no_wrapper(monkeypatch, stdin): aws_module = utils_module.AnsibleAWSModule(argument_spec=dict()) assert aws_module.client(sentinel.PARAM_SERVICE) is sentinel.BOTO3_CONN - assert get_aws_connection_info.call_args == call(aws_module, boto3=True) + assert get_aws_connection_info.call_args == call(aws_module) assert boto3_conn.call_args == call( aws_module, conn_type="client", @@ -153,7 +153,7 @@ def test_client_wrapper(monkeypatch, stdin): wrapped_conn = aws_module.client(sentinel.PARAM_SERVICE, sentinel.PARAM_WRAPPER) assert wrapped_conn.client is sentinel.BOTO3_CONN assert wrapped_conn.retry is sentinel.PARAM_WRAPPER - assert get_aws_connection_info.call_args == call(aws_module, boto3=True) + assert get_aws_connection_info.call_args == call(aws_module) assert boto3_conn.call_args == call( aws_module, conn_type="client", @@ -166,7 +166,7 @@ def test_client_wrapper(monkeypatch, stdin): wrapped_conn = aws_module.client(sentinel.PARAM_SERVICE, sentinel.PARAM_WRAPPER, region=sentinel.PARAM_REGION) assert wrapped_conn.client is sentinel.BOTO3_CONN assert wrapped_conn.retry is sentinel.PARAM_WRAPPER - assert get_aws_connection_info.call_args == call(aws_module, boto3=True) + assert get_aws_connection_info.call_args == call(aws_module) assert boto3_conn.call_args == call( aws_module, conn_type="client", @@ -188,7 +188,7 @@ def test_resource(monkeypatch, stdin): aws_module = utils_module.AnsibleAWSModule(argument_spec=dict()) assert aws_module.resource(sentinel.PARAM_SERVICE) is sentinel.BOTO3_CONN - assert get_aws_connection_info.call_args == call(aws_module, boto3=True) + assert get_aws_connection_info.call_args == call(aws_module) assert boto3_conn.call_args == call( aws_module, conn_type="resource", @@ -199,7 +199,7 @@ def test_resource(monkeypatch, stdin): # Check that we can override parameters assert aws_module.resource(sentinel.PARAM_SERVICE, region=sentinel.PARAM_REGION) is sentinel.BOTO3_CONN - assert get_aws_connection_info.call_args == call(aws_module, boto3=True) + assert get_aws_connection_info.call_args == call(aws_module) assert boto3_conn.call_args == call( aws_module, conn_type="resource", diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py b/ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py deleted file mode 100644 index 8829f332c..000000000 --- a/ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py +++ /dev/null @@ -1,61 +0,0 @@ -# (c) 2022 Red Hat Inc. -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from ansible_collections.amazon.aws.plugins.module_utils.policy import sort_json_policy_dict - - -def test_nothing_to_sort(): - simple_dict = {"key1": "a"} - nested_dict = {"key1": {"key2": "a"}} - very_nested_dict = {"key1": {"key2": {"key3": "a"}}} - assert sort_json_policy_dict(simple_dict) == simple_dict - assert sort_json_policy_dict(nested_dict) == nested_dict - assert sort_json_policy_dict(very_nested_dict) == very_nested_dict - - -def test_basic_sort(): - simple_dict = {"key1": [1, 2, 3, 4], "key2": [9, 8, 7, 6]} - sorted_dict = {"key1": [1, 2, 3, 4], "key2": [6, 7, 8, 9]} - assert sort_json_policy_dict(simple_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict - simple_dict = {"key1": ["a", "b", "c", "d"], "key2": ["z", "y", "x", "w"]} - sorted_dict = {"key1": ["a", "b", "c", "d"], "key2": ["w", "x", "y", "z"]} - assert sort_json_policy_dict(sorted_dict) == sorted_dict - - -def test_nested_list_sort(): - nested_dict = {"key1": {"key2": [9, 8, 7, 6]}} - sorted_dict = {"key1": {"key2": [6, 7, 8, 9]}} - assert sort_json_policy_dict(nested_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict - nested_dict = {"key1": {"key2": ["z", "y", "x", "w"]}} - sorted_dict = {"key1": {"key2": ["w", "x", "y", "z"]}} - assert sort_json_policy_dict(nested_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict - - -def test_nested_dict_list_sort(): - nested_dict = {"key1": {"key2": {"key3": [9, 8, 7, 6]}}} - sorted_dict = {"key1": {"key2": {"key3": [6, 7, 8, 9]}}} - assert sort_json_policy_dict(nested_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict - nested_dict = {"key1": {"key2": {"key3": ["z", "y", "x", "w"]}}} - sorted_dict = {"key1": {"key2": {"key3": ["w", "x", "y", "z"]}}} - assert sort_json_policy_dict(nested_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict - - -def test_list_of_dict_sort(): - nested_dict = {"key1": [{"key2": [4, 3, 2, 1]}, {"key3": [9, 8, 7, 6]}]} - sorted_dict = {"key1": [{"key2": [1, 2, 3, 4]}, {"key3": [6, 7, 8, 9]}]} - assert sort_json_policy_dict(nested_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict - - -def test_list_of_list_sort(): - nested_dict = {"key1": [[4, 3, 2, 1], [9, 8, 7, 6]]} - sorted_dict = {"key1": [[1, 2, 3, 4], [6, 7, 8, 9]]} - assert sort_json_policy_dict(nested_dict) == sorted_dict - assert sort_json_policy_dict(sorted_dict) == sorted_dict diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py b/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py index d7293f0ce..0d2f3c153 100644 --- a/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py +++ b/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py @@ -6,6 +6,8 @@ from unittest.mock import MagicMock +import pytest + from ansible_collections.amazon.aws.plugins.module_utils import elbv2 one_action = [ @@ -159,3 +161,138 @@ class TestElBV2Utils: actual_elb_attributes = self.elbv2obj.get_elb_attributes() # Assert we got the expected result assert actual_elb_attributes == expected_elb_attributes + + +class TestELBListeners: + DEFAULT_PORT = 80 + DEFAULT_PROTOCOL = "TCP" + + def createListener(self, **kwargs): + result = {"Port": self.DEFAULT_PORT, "Protocol": self.DEFAULT_PROTOCOL} + if kwargs.get("port"): + result["Port"] = kwargs.get("port") + if kwargs.get("protocol"): + result["Protocol"] = kwargs.get("protocol") + if kwargs.get("certificate_arn") and kwargs.get("protocol") in ("TLS", "HTTPS"): + result["Certificates"] = [{"CertificateArn": kwargs.get("certificate_arn")}] + if kwargs.get("sslPolicy") and kwargs.get("protocol") in ("TLS", "HTTPS"): + result["SslPolicy"] = kwargs.get("sslPolicy") + if kwargs.get("alpnPolicy") and kwargs.get("protocol") == "TLS": + result["AlpnPolicy"] = kwargs.get("alpnPolicy") + return result + + @pytest.mark.parametrize("current_protocol", ["TCP", "TLS", "UDP"]) + @pytest.mark.parametrize( + "current_alpn,new_alpn", + [ + (None, "None"), + (None, "HTTP1Only"), + ("HTTP1Only", "HTTP2Only"), + ("HTTP1Only", "HTTP1Only"), + ], + ) + def test__compare_listener_alpn_policy(self, current_protocol, current_alpn, new_alpn): + current_listener = self.createListener(protocol=current_protocol, alpnPolicy=[current_alpn]) + new_listener = self.createListener(protocol="TLS", alpnPolicy=[new_alpn]) + result = None + if current_protocol != "TLS": + result = {"Protocol": "TLS"} + if new_alpn and any((current_protocol != "TLS", not current_alpn, current_alpn and current_alpn != new_alpn)): + result = result or {} + result["AlpnPolicy"] = [new_alpn] + + assert result == elbv2.ELBListeners._compare_listener(current_listener, new_listener) + + @pytest.mark.parametrize( + "current_protocol,new_protocol", + [ + ("TCP", "TCP"), + ("TLS", "HTTPS"), + ("HTTPS", "HTTPS"), + ("TLS", "TLS"), + ("HTTPS", "TLS"), + ("HTTPS", "TCP"), + ("TLS", "TCP"), + ], + ) + @pytest.mark.parametrize( + "current_ssl,new_ssl", + [ + (None, "ELBSecurityPolicy-TLS-1-0-2015-04"), + ("ELBSecurityPolicy-TLS13-1-2-Ext2-2021-06", "ELBSecurityPolicy-TLS-1-0-2015-04"), + ("ELBSecurityPolicy-TLS-1-0-2015-04", None), + ("ELBSecurityPolicy-TLS-1-0-2015-04", "ELBSecurityPolicy-TLS-1-0-2015-04"), + ], + ) + def test__compare_listener_sslpolicy(self, current_protocol, new_protocol, current_ssl, new_ssl): + current_listener = self.createListener(protocol=current_protocol, sslPolicy=current_ssl) + + new_listener = self.createListener(protocol=new_protocol, sslPolicy=new_ssl) + + expected = None + if new_protocol != current_protocol: + expected = {"Protocol": new_protocol} + if new_protocol in ("HTTPS", "TLS") and new_ssl and new_ssl != current_ssl: + expected = expected or {} + expected["SslPolicy"] = new_ssl + assert expected == elbv2.ELBListeners._compare_listener(current_listener, new_listener) + + @pytest.mark.parametrize( + "current_protocol,new_protocol", + [ + ("TCP", "TCP"), + ("TLS", "HTTPS"), + ("HTTPS", "HTTPS"), + ("TLS", "TLS"), + ("HTTPS", "TLS"), + ("HTTPS", "TCP"), + ("TLS", "TCP"), + ], + ) + @pytest.mark.parametrize( + "current_certificate,new_certificate", + [ + (None, "arn:aws:iam::012345678901:server-certificate/ansible-test-1"), + ( + "arn:aws:iam::012345678901:server-certificate/ansible-test-1", + "arn:aws:iam::012345678901:server-certificate/ansible-test-2", + ), + ("arn:aws:iam::012345678901:server-certificate/ansible-test-1", None), + ( + "arn:aws:iam::012345678901:server-certificate/ansible-test-1", + "arn:aws:iam::012345678901:server-certificate/ansible-test-1", + ), + ], + ) + def test__compare_listener_certificates(self, current_protocol, new_protocol, current_certificate, new_certificate): + current_listener = self.createListener(protocol=current_protocol, certificate_arn=current_certificate) + + new_listener = self.createListener(protocol=new_protocol, certificate_arn=new_certificate) + + expected = None + if new_protocol != current_protocol: + expected = {"Protocol": new_protocol} + if new_protocol in ("HTTPS", "TLS") and new_certificate and new_certificate != current_certificate: + expected = expected or {} + expected["Certificates"] = [{"CertificateArn": new_certificate}] + assert expected == elbv2.ELBListeners._compare_listener(current_listener, new_listener) + + @pytest.mark.parametrize( + "are_equals", + [True, False], + ) + def test__compare_listener_port(self, are_equals): + current_listener = self.createListener() + new_port = MagicMock() if not are_equals else None + new_listener = self.createListener(port=new_port) + + result = elbv2.ELBListeners._compare_listener(current_listener, new_listener) + expected = None + if not are_equals: + expected = {"Port": new_port} + assert result == expected + + def test_ensure_listeners_alpn_policy(self): + listeners = [{"Port": self.DEFAULT_PORT, "AlpnPolicy": "HTTP2Optional"}] + expected = [{"Port": self.DEFAULT_PORT, "AlpnPolicy": ["HTTP2Optional"]}] + assert expected == elbv2.ELBListeners._ensure_listeners_alpn_policy(listeners) |