summaryrefslogtreecommitdiffstats
path: root/ansible_collections/amazon/aws/tests/unit/module_utils
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/amazon/aws/tests/unit/module_utils')
-rw-r--r--ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py68
-rw-r--r--ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py6
-rw-r--r--ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py12
-rw-r--r--ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py61
-rw-r--r--ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py137
5 files changed, 214 insertions, 70 deletions
diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py b/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py
index 9f3e4194b..a5ce452fc 100644
--- a/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py
+++ b/ansible_collections/amazon/aws/tests/unit/module_utils/botocore/test_is_boto3_error_code.py
@@ -209,3 +209,71 @@ class TestIsBoto3ErrorCode:
assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
assert issubclass(returned_exception, Exception)
assert returned_exception.__name__ == "NeverEverRaisedException"
+
+ def test_is_boto3_error_code_tuple__pass__client(self):
+ passed_exception = self._make_denied_exception()
+ returned_exception = is_boto3_error_code(("NotAccessDenied", "AccessDenied"), e=passed_exception)
+ assert isinstance(passed_exception, returned_exception)
+ assert issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ != "NeverEverRaisedException"
+
+ returned_exception = is_boto3_error_code(("AccessDenied", "NotAccessDenied"), e=passed_exception)
+ assert isinstance(passed_exception, returned_exception)
+ assert issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ != "NeverEverRaisedException"
+
+ def test_is_boto3_error_code_tuple__pass__unexpected(self):
+ passed_exception = self._make_unexpected_exception()
+ returned_exception = is_boto3_error_code(("NotAccessDenied", "AccessDenied"), e=passed_exception)
+ assert not isinstance(passed_exception, returned_exception)
+ assert not issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ == "NeverEverRaisedException"
+
+ def test_is_boto3_error_code_tuple__pass__botocore(self):
+ passed_exception = self._make_botocore_exception()
+ returned_exception = is_boto3_error_code(("NotAccessDenied", "AccessDenied"), e=passed_exception)
+ assert not isinstance(passed_exception, returned_exception)
+ assert not issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ == "NeverEverRaisedException"
+
+ def test_is_boto3_error_code_set__pass__client(self):
+ passed_exception = self._make_denied_exception()
+ returned_exception = is_boto3_error_code({"NotAccessDenied", "AccessDenied"}, e=passed_exception)
+ assert isinstance(passed_exception, returned_exception)
+ assert issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ != "NeverEverRaisedException"
+
+ returned_exception = is_boto3_error_code({"AccessDenied", "NotAccessDenied"}, e=passed_exception)
+ assert isinstance(passed_exception, returned_exception)
+ assert issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ != "NeverEverRaisedException"
+
+ def test_is_boto3_error_code_set__pass__unexpected(self):
+ passed_exception = self._make_unexpected_exception()
+ returned_exception = is_boto3_error_code({"NotAccessDenied", "AccessDenied"}, e=passed_exception)
+ assert not isinstance(passed_exception, returned_exception)
+ assert not issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ == "NeverEverRaisedException"
+
+ def test_is_boto3_error_code_set__pass__botocore(self):
+ passed_exception = self._make_botocore_exception()
+ returned_exception = is_boto3_error_code({"NotAccessDenied", "AccessDenied"}, e=passed_exception)
+ assert not isinstance(passed_exception, returned_exception)
+ assert not issubclass(returned_exception, botocore.exceptions.ClientError)
+ assert not issubclass(returned_exception, botocore.exceptions.BotoCoreError)
+ assert issubclass(returned_exception, Exception)
+ assert returned_exception.__name__ == "NeverEverRaisedException"
diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py b/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py
index 28090f993..0a6830311 100644
--- a/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py
+++ b/ansible_collections/amazon/aws/tests/unit/module_utils/iam/test_iam_resource_transforms.py
@@ -451,10 +451,10 @@ class TestIamResourceToAnsibleDict:
OUTPUT = {
"arn": "arn:aws:iam::123456789012:role/ansible-test-76640355",
"assume_role_policy_document": {
- "statement": [
- {"action": "sts:AssumeRole", "effect": "Deny", "principal": {"service": "ec2.amazonaws.com"}}
+ "Statement": [
+ {"Action": "sts:AssumeRole", "Effect": "Deny", "Principal": {"Service": "ec2.amazonaws.com"}}
],
- "version": "2012-10-17",
+ "Version": "2012-10-17",
},
"assume_role_policy_document_raw": {
"Statement": [
diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py b/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py
index c61de1391..688514f59 100644
--- a/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py
+++ b/ansible_collections/amazon/aws/tests/unit/module_utils/modules/ansible_aws_module/test_passthrough.py
@@ -70,7 +70,7 @@ def test_region(monkeypatch, stdin):
aws_module = utils_module.AnsibleAWSModule(argument_spec=dict())
assert aws_module.region is sentinel.RETURNED_REGION
- assert get_aws_region.call_args == call(aws_module, True)
+ assert get_aws_region.call_args == call(aws_module)
@pytest.mark.parametrize("stdin", [{}], indirect=["stdin"])
@@ -129,7 +129,7 @@ def test_client_no_wrapper(monkeypatch, stdin):
aws_module = utils_module.AnsibleAWSModule(argument_spec=dict())
assert aws_module.client(sentinel.PARAM_SERVICE) is sentinel.BOTO3_CONN
- assert get_aws_connection_info.call_args == call(aws_module, boto3=True)
+ assert get_aws_connection_info.call_args == call(aws_module)
assert boto3_conn.call_args == call(
aws_module,
conn_type="client",
@@ -153,7 +153,7 @@ def test_client_wrapper(monkeypatch, stdin):
wrapped_conn = aws_module.client(sentinel.PARAM_SERVICE, sentinel.PARAM_WRAPPER)
assert wrapped_conn.client is sentinel.BOTO3_CONN
assert wrapped_conn.retry is sentinel.PARAM_WRAPPER
- assert get_aws_connection_info.call_args == call(aws_module, boto3=True)
+ assert get_aws_connection_info.call_args == call(aws_module)
assert boto3_conn.call_args == call(
aws_module,
conn_type="client",
@@ -166,7 +166,7 @@ def test_client_wrapper(monkeypatch, stdin):
wrapped_conn = aws_module.client(sentinel.PARAM_SERVICE, sentinel.PARAM_WRAPPER, region=sentinel.PARAM_REGION)
assert wrapped_conn.client is sentinel.BOTO3_CONN
assert wrapped_conn.retry is sentinel.PARAM_WRAPPER
- assert get_aws_connection_info.call_args == call(aws_module, boto3=True)
+ assert get_aws_connection_info.call_args == call(aws_module)
assert boto3_conn.call_args == call(
aws_module,
conn_type="client",
@@ -188,7 +188,7 @@ def test_resource(monkeypatch, stdin):
aws_module = utils_module.AnsibleAWSModule(argument_spec=dict())
assert aws_module.resource(sentinel.PARAM_SERVICE) is sentinel.BOTO3_CONN
- assert get_aws_connection_info.call_args == call(aws_module, boto3=True)
+ assert get_aws_connection_info.call_args == call(aws_module)
assert boto3_conn.call_args == call(
aws_module,
conn_type="resource",
@@ -199,7 +199,7 @@ def test_resource(monkeypatch, stdin):
# Check that we can override parameters
assert aws_module.resource(sentinel.PARAM_SERVICE, region=sentinel.PARAM_REGION) is sentinel.BOTO3_CONN
- assert get_aws_connection_info.call_args == call(aws_module, boto3=True)
+ assert get_aws_connection_info.call_args == call(aws_module)
assert boto3_conn.call_args == call(
aws_module,
conn_type="resource",
diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py b/ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py
deleted file mode 100644
index 8829f332c..000000000
--- a/ansible_collections/amazon/aws/tests/unit/module_utils/policy/test_sort_json_policy_dict.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# (c) 2022 Red Hat Inc.
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from ansible_collections.amazon.aws.plugins.module_utils.policy import sort_json_policy_dict
-
-
-def test_nothing_to_sort():
- simple_dict = {"key1": "a"}
- nested_dict = {"key1": {"key2": "a"}}
- very_nested_dict = {"key1": {"key2": {"key3": "a"}}}
- assert sort_json_policy_dict(simple_dict) == simple_dict
- assert sort_json_policy_dict(nested_dict) == nested_dict
- assert sort_json_policy_dict(very_nested_dict) == very_nested_dict
-
-
-def test_basic_sort():
- simple_dict = {"key1": [1, 2, 3, 4], "key2": [9, 8, 7, 6]}
- sorted_dict = {"key1": [1, 2, 3, 4], "key2": [6, 7, 8, 9]}
- assert sort_json_policy_dict(simple_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
- simple_dict = {"key1": ["a", "b", "c", "d"], "key2": ["z", "y", "x", "w"]}
- sorted_dict = {"key1": ["a", "b", "c", "d"], "key2": ["w", "x", "y", "z"]}
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
-
-
-def test_nested_list_sort():
- nested_dict = {"key1": {"key2": [9, 8, 7, 6]}}
- sorted_dict = {"key1": {"key2": [6, 7, 8, 9]}}
- assert sort_json_policy_dict(nested_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
- nested_dict = {"key1": {"key2": ["z", "y", "x", "w"]}}
- sorted_dict = {"key1": {"key2": ["w", "x", "y", "z"]}}
- assert sort_json_policy_dict(nested_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
-
-
-def test_nested_dict_list_sort():
- nested_dict = {"key1": {"key2": {"key3": [9, 8, 7, 6]}}}
- sorted_dict = {"key1": {"key2": {"key3": [6, 7, 8, 9]}}}
- assert sort_json_policy_dict(nested_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
- nested_dict = {"key1": {"key2": {"key3": ["z", "y", "x", "w"]}}}
- sorted_dict = {"key1": {"key2": {"key3": ["w", "x", "y", "z"]}}}
- assert sort_json_policy_dict(nested_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
-
-
-def test_list_of_dict_sort():
- nested_dict = {"key1": [{"key2": [4, 3, 2, 1]}, {"key3": [9, 8, 7, 6]}]}
- sorted_dict = {"key1": [{"key2": [1, 2, 3, 4]}, {"key3": [6, 7, 8, 9]}]}
- assert sort_json_policy_dict(nested_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
-
-
-def test_list_of_list_sort():
- nested_dict = {"key1": [[4, 3, 2, 1], [9, 8, 7, 6]]}
- sorted_dict = {"key1": [[1, 2, 3, 4], [6, 7, 8, 9]]}
- assert sort_json_policy_dict(nested_dict) == sorted_dict
- assert sort_json_policy_dict(sorted_dict) == sorted_dict
diff --git a/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py b/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py
index d7293f0ce..0d2f3c153 100644
--- a/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py
+++ b/ansible_collections/amazon/aws/tests/unit/module_utils/test_elbv2.py
@@ -6,6 +6,8 @@
from unittest.mock import MagicMock
+import pytest
+
from ansible_collections.amazon.aws.plugins.module_utils import elbv2
one_action = [
@@ -159,3 +161,138 @@ class TestElBV2Utils:
actual_elb_attributes = self.elbv2obj.get_elb_attributes()
# Assert we got the expected result
assert actual_elb_attributes == expected_elb_attributes
+
+
+class TestELBListeners:
+ DEFAULT_PORT = 80
+ DEFAULT_PROTOCOL = "TCP"
+
+ def createListener(self, **kwargs):
+ result = {"Port": self.DEFAULT_PORT, "Protocol": self.DEFAULT_PROTOCOL}
+ if kwargs.get("port"):
+ result["Port"] = kwargs.get("port")
+ if kwargs.get("protocol"):
+ result["Protocol"] = kwargs.get("protocol")
+ if kwargs.get("certificate_arn") and kwargs.get("protocol") in ("TLS", "HTTPS"):
+ result["Certificates"] = [{"CertificateArn": kwargs.get("certificate_arn")}]
+ if kwargs.get("sslPolicy") and kwargs.get("protocol") in ("TLS", "HTTPS"):
+ result["SslPolicy"] = kwargs.get("sslPolicy")
+ if kwargs.get("alpnPolicy") and kwargs.get("protocol") == "TLS":
+ result["AlpnPolicy"] = kwargs.get("alpnPolicy")
+ return result
+
+ @pytest.mark.parametrize("current_protocol", ["TCP", "TLS", "UDP"])
+ @pytest.mark.parametrize(
+ "current_alpn,new_alpn",
+ [
+ (None, "None"),
+ (None, "HTTP1Only"),
+ ("HTTP1Only", "HTTP2Only"),
+ ("HTTP1Only", "HTTP1Only"),
+ ],
+ )
+ def test__compare_listener_alpn_policy(self, current_protocol, current_alpn, new_alpn):
+ current_listener = self.createListener(protocol=current_protocol, alpnPolicy=[current_alpn])
+ new_listener = self.createListener(protocol="TLS", alpnPolicy=[new_alpn])
+ result = None
+ if current_protocol != "TLS":
+ result = {"Protocol": "TLS"}
+ if new_alpn and any((current_protocol != "TLS", not current_alpn, current_alpn and current_alpn != new_alpn)):
+ result = result or {}
+ result["AlpnPolicy"] = [new_alpn]
+
+ assert result == elbv2.ELBListeners._compare_listener(current_listener, new_listener)
+
+ @pytest.mark.parametrize(
+ "current_protocol,new_protocol",
+ [
+ ("TCP", "TCP"),
+ ("TLS", "HTTPS"),
+ ("HTTPS", "HTTPS"),
+ ("TLS", "TLS"),
+ ("HTTPS", "TLS"),
+ ("HTTPS", "TCP"),
+ ("TLS", "TCP"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "current_ssl,new_ssl",
+ [
+ (None, "ELBSecurityPolicy-TLS-1-0-2015-04"),
+ ("ELBSecurityPolicy-TLS13-1-2-Ext2-2021-06", "ELBSecurityPolicy-TLS-1-0-2015-04"),
+ ("ELBSecurityPolicy-TLS-1-0-2015-04", None),
+ ("ELBSecurityPolicy-TLS-1-0-2015-04", "ELBSecurityPolicy-TLS-1-0-2015-04"),
+ ],
+ )
+ def test__compare_listener_sslpolicy(self, current_protocol, new_protocol, current_ssl, new_ssl):
+ current_listener = self.createListener(protocol=current_protocol, sslPolicy=current_ssl)
+
+ new_listener = self.createListener(protocol=new_protocol, sslPolicy=new_ssl)
+
+ expected = None
+ if new_protocol != current_protocol:
+ expected = {"Protocol": new_protocol}
+ if new_protocol in ("HTTPS", "TLS") and new_ssl and new_ssl != current_ssl:
+ expected = expected or {}
+ expected["SslPolicy"] = new_ssl
+ assert expected == elbv2.ELBListeners._compare_listener(current_listener, new_listener)
+
+ @pytest.mark.parametrize(
+ "current_protocol,new_protocol",
+ [
+ ("TCP", "TCP"),
+ ("TLS", "HTTPS"),
+ ("HTTPS", "HTTPS"),
+ ("TLS", "TLS"),
+ ("HTTPS", "TLS"),
+ ("HTTPS", "TCP"),
+ ("TLS", "TCP"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "current_certificate,new_certificate",
+ [
+ (None, "arn:aws:iam::012345678901:server-certificate/ansible-test-1"),
+ (
+ "arn:aws:iam::012345678901:server-certificate/ansible-test-1",
+ "arn:aws:iam::012345678901:server-certificate/ansible-test-2",
+ ),
+ ("arn:aws:iam::012345678901:server-certificate/ansible-test-1", None),
+ (
+ "arn:aws:iam::012345678901:server-certificate/ansible-test-1",
+ "arn:aws:iam::012345678901:server-certificate/ansible-test-1",
+ ),
+ ],
+ )
+ def test__compare_listener_certificates(self, current_protocol, new_protocol, current_certificate, new_certificate):
+ current_listener = self.createListener(protocol=current_protocol, certificate_arn=current_certificate)
+
+ new_listener = self.createListener(protocol=new_protocol, certificate_arn=new_certificate)
+
+ expected = None
+ if new_protocol != current_protocol:
+ expected = {"Protocol": new_protocol}
+ if new_protocol in ("HTTPS", "TLS") and new_certificate and new_certificate != current_certificate:
+ expected = expected or {}
+ expected["Certificates"] = [{"CertificateArn": new_certificate}]
+ assert expected == elbv2.ELBListeners._compare_listener(current_listener, new_listener)
+
+ @pytest.mark.parametrize(
+ "are_equals",
+ [True, False],
+ )
+ def test__compare_listener_port(self, are_equals):
+ current_listener = self.createListener()
+ new_port = MagicMock() if not are_equals else None
+ new_listener = self.createListener(port=new_port)
+
+ result = elbv2.ELBListeners._compare_listener(current_listener, new_listener)
+ expected = None
+ if not are_equals:
+ expected = {"Port": new_port}
+ assert result == expected
+
+ def test_ensure_listeners_alpn_policy(self):
+ listeners = [{"Port": self.DEFAULT_PORT, "AlpnPolicy": "HTTP2Optional"}]
+ expected = [{"Port": self.DEFAULT_PORT, "AlpnPolicy": ["HTTP2Optional"]}]
+ assert expected == elbv2.ELBListeners._ensure_listeners_alpn_policy(listeners)