summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/aws/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/community/aws/tests/unit')
-rw-r--r--ansible_collections/community/aws/tests/unit/compat/builtins.py9
-rw-r--r--ansible_collections/community/aws/tests/unit/compat/mock.py122
-rw-r--r--ansible_collections/community/aws/tests/unit/compat/unittest.py38
-rw-r--r--ansible_collections/community/aws/tests/unit/constraints.txt6
-rw-r--r--ansible_collections/community/aws/tests/unit/mock/loader.py17
-rw-r--r--ansible_collections/community/aws/tests/unit/mock/path.py5
-rw-r--r--ansible_collections/community/aws/tests/unit/mock/procenv.py19
-rw-r--r--ansible_collections/community/aws/tests/unit/mock/vault_helper.py16
-rw-r--r--ansible_collections/community/aws/tests/unit/mock/yaml_helper.py37
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py95
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py0
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py638
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py20
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py97
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py43
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py196
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py80
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py61
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py91
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py227
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py261
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py59
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py30
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py109
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py37
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py240
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py117
-rw-r--r--ansible_collections/community/aws/tests/unit/plugins/modules/utils.py26
-rw-r--r--ansible_collections/community/aws/tests/unit/requirements.yml5
29 files changed, 1804 insertions, 897 deletions
diff --git a/ansible_collections/community/aws/tests/unit/compat/builtins.py b/ansible_collections/community/aws/tests/unit/compat/builtins.py
index 349d310e8..3df85be4f 100644
--- a/ansible_collections/community/aws/tests/unit/compat/builtins.py
+++ b/ansible_collections/community/aws/tests/unit/compat/builtins.py
@@ -16,7 +16,10 @@
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
#
@@ -28,6 +31,6 @@ __metaclass__ = type
try:
import __builtin__ # pylint: disable=unused-import
except ImportError:
- BUILTINS = 'builtins'
+ BUILTINS = "builtins"
else:
- BUILTINS = '__builtin__'
+ BUILTINS = "__builtin__"
diff --git a/ansible_collections/community/aws/tests/unit/compat/mock.py b/ansible_collections/community/aws/tests/unit/compat/mock.py
deleted file mode 100644
index 0972cd2e8..000000000
--- a/ansible_collections/community/aws/tests/unit/compat/mock.py
+++ /dev/null
@@ -1,122 +0,0 @@
-# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-'''
-Compat module for Python3.x's unittest.mock module
-'''
-import sys
-
-# Python 2.7
-
-# Note: Could use the pypi mock library on python3.x as well as python2.x. It
-# is the same as the python3 stdlib mock library
-
-try:
- # Allow wildcard import because we really do want to import all of mock's
- # symbols into this compat shim
- # pylint: disable=wildcard-import,unused-wildcard-import
- from unittest.mock import *
-except ImportError:
- # Python 2
- # pylint: disable=wildcard-import,unused-wildcard-import
- try:
- from mock import *
- except ImportError:
- print('You need the mock library installed on python2.x to run tests')
-
-
-# Prior to 3.4.4, mock_open cannot handle binary read_data
-if sys.version_info >= (3,) and sys.version_info < (3, 4, 4):
- file_spec = None
-
- def _iterate_read_data(read_data):
- # Helper for mock_open:
- # Retrieve lines from read_data via a generator so that separate calls to
- # readline, read, and readlines are properly interleaved
- sep = b'\n' if isinstance(read_data, bytes) else '\n'
- data_as_list = [l + sep for l in read_data.split(sep)]
-
- if data_as_list[-1] == sep:
- # If the last line ended in a newline, the list comprehension will have an
- # extra entry that's just a newline. Remove this.
- data_as_list = data_as_list[:-1]
- else:
- # If there wasn't an extra newline by itself, then the file being
- # emulated doesn't have a newline to end the last line remove the
- # newline that our naive format() added
- data_as_list[-1] = data_as_list[-1][:-1]
-
- for line in data_as_list:
- yield line
-
- def mock_open(mock=None, read_data=''):
- """
- A helper function to create a mock to replace the use of `open`. It works
- for `open` called directly or used as a context manager.
-
- The `mock` argument is the mock object to configure. If `None` (the
- default) then a `MagicMock` will be created for you, with the API limited
- to methods or attributes available on standard file handles.
-
- `read_data` is a string for the `read` methoddline`, and `readlines` of the
- file handle to return. This is an empty string by default.
- """
- def _readlines_side_effect(*args, **kwargs):
- if handle.readlines.return_value is not None:
- return handle.readlines.return_value
- return list(_data)
-
- def _read_side_effect(*args, **kwargs):
- if handle.read.return_value is not None:
- return handle.read.return_value
- return type(read_data)().join(_data)
-
- def _readline_side_effect():
- if handle.readline.return_value is not None:
- while True:
- yield handle.readline.return_value
- for line in _data:
- yield line
-
- global file_spec
- if file_spec is None:
- import _io
- file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
-
- if mock is None:
- mock = MagicMock(name='open', spec=open)
-
- handle = MagicMock(spec=file_spec)
- handle.__enter__.return_value = handle
-
- _data = _iterate_read_data(read_data)
-
- handle.write.return_value = None
- handle.read.return_value = None
- handle.readline.return_value = None
- handle.readlines.return_value = None
-
- handle.read.side_effect = _read_side_effect
- handle.readline.side_effect = _readline_side_effect()
- handle.readlines.side_effect = _readlines_side_effect
-
- mock.return_value = handle
- return mock
diff --git a/ansible_collections/community/aws/tests/unit/compat/unittest.py b/ansible_collections/community/aws/tests/unit/compat/unittest.py
deleted file mode 100644
index 98f08ad6a..000000000
--- a/ansible_collections/community/aws/tests/unit/compat/unittest.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-'''
-Compat module for Python2.7's unittest module
-'''
-
-import sys
-
-# Allow wildcard import because we really do want to import all of
-# unittests's symbols into this compat shim
-# pylint: disable=wildcard-import,unused-wildcard-import
-if sys.version_info < (2, 7):
- try:
- # Need unittest2 on python2.6
- from unittest2 import *
- except ImportError:
- print('You need unittest2 installed on python2.6.x to run tests')
-else:
- from unittest import *
diff --git a/ansible_collections/community/aws/tests/unit/constraints.txt b/ansible_collections/community/aws/tests/unit/constraints.txt
index cd546e7c2..5708323f1 100644
--- a/ansible_collections/community/aws/tests/unit/constraints.txt
+++ b/ansible_collections/community/aws/tests/unit/constraints.txt
@@ -1,7 +1,7 @@
# Specifically run tests against the oldest versions that we support
-boto3==1.18.0
-botocore==1.21.0
+botocore==1.29.0
+boto3==1.26.0
# AWS CLI has `botocore==` dependencies, provide the one that matches botocore
# to avoid needing to download over a years worth of awscli wheels.
-awscli==1.20.0
+awscli==1.27.0
diff --git a/ansible_collections/community/aws/tests/unit/mock/loader.py b/ansible_collections/community/aws/tests/unit/mock/loader.py
index 00a584127..339a1918c 100644
--- a/ansible_collections/community/aws/tests/unit/mock/loader.py
+++ b/ansible_collections/community/aws/tests/unit/mock/loader.py
@@ -16,21 +16,24 @@
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import os
from ansible.errors import AnsibleParserError
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils._text import to_text
from ansible.parsing.dataloader import DataLoader
-from ansible.module_utils._text import to_bytes, to_text
class DictDataLoader(DataLoader):
-
def __init__(self, file_mapping=None):
file_mapping = {} if file_mapping is None else file_mapping
- assert type(file_mapping) == dict
+ assert isinstance(file_mapping, dict)
super(DictDataLoader, self).__init__()
@@ -51,7 +54,7 @@ class DictDataLoader(DataLoader):
if file_name in self._file_mapping:
return (to_bytes(self._file_mapping[file_name]), False)
else:
- raise AnsibleParserError("file not found: %s" % file_name)
+ raise AnsibleParserError(f"file not found: {file_name}")
def path_exists(self, path):
path = to_text(path)
@@ -68,7 +71,7 @@ class DictDataLoader(DataLoader):
def list_directory(self, path):
ret = []
path = to_text(path)
- for x in (list(self._file_mapping.keys()) + self._known_directories):
+ for x in list(self._file_mapping.keys()) + self._known_directories:
if x.startswith(path):
if os.path.dirname(x) == path:
ret.append(os.path.basename(x))
@@ -86,7 +89,7 @@ class DictDataLoader(DataLoader):
self._known_directories = []
for path in self._file_mapping:
dirname = os.path.dirname(path)
- while dirname not in ('/', ''):
+ while dirname not in ("/", ""):
self._add_known_directory(dirname)
dirname = os.path.dirname(dirname)
diff --git a/ansible_collections/community/aws/tests/unit/mock/path.py b/ansible_collections/community/aws/tests/unit/mock/path.py
index 676b35ab8..8057e5a58 100644
--- a/ansible_collections/community/aws/tests/unit/mock/path.py
+++ b/ansible_collections/community/aws/tests/unit/mock/path.py
@@ -1,10 +1,7 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from unittest.mock import MagicMock
-from ansible_collections.community.aws.tests.unit.compat.mock import MagicMock
from ansible.utils.path import unfrackpath
-
mock_unfrackpath_noop = MagicMock(spec_set=unfrackpath, side_effect=lambda x, *args, **kwargs: x)
diff --git a/ansible_collections/community/aws/tests/unit/mock/procenv.py b/ansible_collections/community/aws/tests/unit/mock/procenv.py
index e516a9458..0d8547f50 100644
--- a/ansible_collections/community/aws/tests/unit/mock/procenv.py
+++ b/ansible_collections/community/aws/tests/unit/mock/procenv.py
@@ -16,22 +16,19 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import sys
import json
-
+import sys
+import unittest
from contextlib import contextmanager
-from io import BytesIO, StringIO
-from ansible_collections.community.aws.tests.unit.compat import unittest
-from ansible.module_utils.six import PY3
+from io import BytesIO
+from io import StringIO
+
from ansible.module_utils._text import to_bytes
+from ansible.module_utils.six import PY3
@contextmanager
-def swap_stdin_and_argv(stdin_data='', argv_data=tuple()):
+def swap_stdin_and_argv(stdin_data="", argv_data=tuple()):
"""
context manager that temporarily masks the test runner's values for stdin and argv
"""
@@ -77,7 +74,7 @@ def swap_stdout():
class ModuleTestCase(unittest.TestCase):
def setUp(self, module_args=None):
if module_args is None:
- module_args = {'_ansible_remote_tmp': '/tmp', '_ansible_keep_remote_files': False}
+ module_args = {"_ansible_remote_tmp": "/tmp", "_ansible_keep_remote_files": False}
args = json.dumps(dict(ANSIBLE_MODULE_ARGS=module_args))
diff --git a/ansible_collections/community/aws/tests/unit/mock/vault_helper.py b/ansible_collections/community/aws/tests/unit/mock/vault_helper.py
index b54629da4..c55228c88 100644
--- a/ansible_collections/community/aws/tests/unit/mock/vault_helper.py
+++ b/ansible_collections/community/aws/tests/unit/mock/vault_helper.py
@@ -1,27 +1,29 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
from ansible.module_utils._text import to_bytes
-
from ansible.parsing.vault import VaultSecret
class TextVaultSecret(VaultSecret):
- '''A secret piece of text. ie, a password. Tracks text encoding.
+ """A secret piece of text. ie, a password. Tracks text encoding.
The text encoding of the text may not be the default text encoding so
- we keep track of the encoding so we encode it to the same bytes.'''
+ we keep track of the encoding so we encode it to the same bytes."""
def __init__(self, text, encoding=None, errors=None, _bytes=None):
super(TextVaultSecret, self).__init__()
self.text = text
- self.encoding = encoding or 'utf-8'
+ self.encoding = encoding or "utf-8"
self._bytes = _bytes
- self.errors = errors or 'strict'
+ self.errors = errors or "strict"
@property
def bytes(self):
- '''The text encoded with encoding, unless we specifically set _bytes.'''
+ """The text encoded with encoding, unless we specifically set _bytes."""
return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors)
diff --git a/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py b/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py
index a646c0241..8c99ef40f 100644
--- a/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py
+++ b/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py
@@ -1,18 +1,23 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import io
+
import yaml
from ansible.module_utils.six import PY3
-from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.parsing.yaml.dumper import AnsibleDumper
+from ansible.parsing.yaml.loader import AnsibleLoader
class YamlTestUtils(object):
"""Mixin class to combine with a unittest.TestCase subclass."""
+
def _loader(self, stream):
"""Vault related tests will want to override this.
@@ -45,8 +50,7 @@ class YamlTestUtils(object):
obj_2 = loader.get_data()
# dump the gen 2 objects directory to strings
- string_from_object_dump_2 = self._dump_string(obj_2,
- dumper=AnsibleDumper)
+ string_from_object_dump_2 = self._dump_string(obj_2, dumper=AnsibleDumper)
# The gen 1 and gen 2 yaml strings
self.assertEqual(string_from_object_dump, string_from_object_dump_2)
@@ -66,7 +70,7 @@ class YamlTestUtils(object):
self.assertEqual(string_from_object_dump, string_from_object_dump_3)
def _old_dump_load_cycle(self, obj):
- '''Dump the passed in object to yaml, load it back up, dump again, compare.'''
+ """Dump the passed in object to yaml, load it back up, dump again, compare."""
stream = io.StringIO()
yaml_string = self._dump_string(obj, dumper=AnsibleDumper)
@@ -111,16 +115,23 @@ class YamlTestUtils(object):
assert yaml_string == yaml_string_obj_from_stream
assert yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string
- assert (yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string == yaml_string_stream_obj_from_stream ==
- yaml_string_stream_obj_from_string)
+ assert (
+ yaml_string
+ == yaml_string_obj_from_stream
+ == yaml_string_obj_from_string
+ == yaml_string_stream_obj_from_stream
+ == yaml_string_stream_obj_from_string
+ )
assert obj == obj_from_stream
assert obj == obj_from_string
assert obj == yaml_string_obj_from_stream
assert obj == yaml_string_obj_from_string
assert obj == obj_from_stream == obj_from_string == yaml_string_obj_from_stream == yaml_string_obj_from_string
- return {'obj': obj,
- 'yaml_string': yaml_string,
- 'yaml_string_from_stream': yaml_string_from_stream,
- 'obj_from_stream': obj_from_stream,
- 'obj_from_string': obj_from_string,
- 'yaml_string_obj_from_string': yaml_string_obj_from_string}
+ return {
+ "obj": obj,
+ "yaml_string": yaml_string,
+ "yaml_string_from_stream": yaml_string_from_stream,
+ "obj_from_stream": obj_from_stream,
+ "obj_from_string": obj_from_string,
+ "yaml_string_obj_from_string": yaml_string_obj_from_string,
+ }
diff --git a/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py b/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py
index 579cafc16..d5fcb4b1e 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py
@@ -1,11 +1,11 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from io import StringIO
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
import pytest
-from ansible_collections.community.aws.tests.unit.compat.mock import patch, MagicMock
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import connection_loader
@@ -15,46 +15,45 @@ if not HAS_BOTO3:
pytestmark = pytest.mark.skip("test_data_pipeline.py requires the python modules 'boto3' and 'botocore'")
-class TestConnectionBaseClass():
-
- @patch('os.path.exists')
- @patch('subprocess.Popen')
- @patch('select.poll')
- @patch('boto3.client')
+class TestConnectionBaseClass:
+ @patch("os.path.exists")
+ @patch("subprocess.Popen")
+ @patch("select.poll")
+ @patch("boto3.client")
def test_plugins_connection_aws_ssm_start_session(self, boto_client, s_poll, s_popen, mock_ospe):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.get_option = MagicMock()
- conn.get_option.side_effect = ['i1234', 'executable', 'abcd', 'i1234']
- conn.host = 'abc'
+ conn.get_option.side_effect = ["i1234", "executable", "abcd", "i1234"]
+ conn.host = "abc"
mock_ospe.return_value = True
boto3 = MagicMock()
- boto3.client('ssm').return_value = MagicMock()
+ boto3.client("ssm").return_value = MagicMock()
conn.start_session = MagicMock()
conn._session_id = MagicMock()
- conn._session_id.return_value = 's1'
+ conn._session_id.return_value = "s1"
s_popen.return_value.stdin.write = MagicMock()
s_poll.return_value = MagicMock()
s_poll.return_value.register = MagicMock()
s_popen.return_value.poll = MagicMock()
s_popen.return_value.poll.return_value = None
conn._stdin_readline = MagicMock()
- conn._stdin_readline.return_value = 'abc123'
- conn.SESSION_START = 'abc'
+ conn._stdin_readline.return_value = "abc123"
+ conn.SESSION_START = "abc"
conn.start_session()
- @patch('random.choice')
+ @patch("random.choice")
def test_plugins_connection_aws_ssm_exec_command(self, r_choice):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
- r_choice.side_effect = ['a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b']
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
+ r_choice.side_effect = ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"]
conn.MARK_LENGTH = 5
conn._session = MagicMock()
conn._session.stdin.write = MagicMock()
conn._wrap_command = MagicMock()
- conn._wrap_command.return_value = 'cmd1'
+ conn._wrap_command.return_value = "cmd1"
conn._flush_stderr = MagicMock()
conn._windows = MagicMock()
conn._windows.return_value = True
@@ -67,44 +66,44 @@ class TestConnectionBaseClass():
conn._session.stdout = MagicMock()
conn._session.stdout.readline = MagicMock()
conn._post_process = MagicMock()
- conn._post_process.return_value = 'test'
- conn._session.stdout.readline.side_effect = iter(['aaaaa\n', 'Hi\n', '0\n', 'bbbbb\n'])
+ conn._post_process.return_value = "test"
+ conn._session.stdout.readline.side_effect = iter(["aaaaa\n", "Hi\n", "0\n", "bbbbb\n"])
conn.get_option = MagicMock()
conn.get_option.return_value = 1
- returncode = 'a'
- stdout = 'b'
+ returncode = "a"
+ stdout = "b"
return (returncode, stdout, conn._flush_stderr)
def test_plugins_connection_aws_ssm_prepare_terminal(self):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.is_windows = MagicMock()
conn.is_windows.return_value = True
def test_plugins_connection_aws_ssm_wrap_command(self):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.is_windows = MagicMock()
conn.is_windows.return_value = True
- return 'windows1'
+ return "windows1"
def test_plugins_connection_aws_ssm_post_process(self):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.is_windows = MagicMock()
conn.is_windows.return_value = True
conn.stdout = MagicMock()
returncode = 0
return returncode, conn.stdout
- @patch('subprocess.Popen')
+ @patch("subprocess.Popen")
def test_plugins_connection_aws_ssm_flush_stderr(self, s_popen):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.poll_stderr = MagicMock()
conn.poll_stderr.register = MagicMock()
conn.stderr = None
@@ -121,37 +120,37 @@ class TestConnectionBaseClass():
# boto3.generate_presigned_url.return_value = MagicMock()
# return (boto3.generate_presigned_url.return_value)
- @patch('os.path.exists')
+ @patch("os.path.exists")
def test_plugins_connection_aws_ssm_put_file(self, mock_ospe):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn._connect = MagicMock()
conn._file_transport_command = MagicMock()
- conn._file_transport_command.return_value = (0, 'stdout', 'stderr')
- conn.put_file('/in/file', '/out/file')
+ conn._file_transport_command.return_value = (0, "stdout", "stderr")
+ conn.put_file("/in/file", "/out/file")
def test_plugins_connection_aws_ssm_fetch_file(self):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn._connect = MagicMock()
conn._file_transport_command = MagicMock()
- conn._file_transport_command.return_value = (0, 'stdout', 'stderr')
- conn.fetch_file('/in/file', '/out/file')
+ conn._file_transport_command.return_value = (0, "stdout", "stderr")
+ conn.fetch_file("/in/file", "/out/file")
- @patch('subprocess.check_output')
- @patch('boto3.client')
+ @patch("subprocess.check_output")
+ @patch("boto3.client")
def test_plugins_connection_file_transport_command(self, boto_client, s_check_output):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.get_option = MagicMock()
- conn.get_option.side_effect = ['1', '2', '3', '4', '5']
+ conn.get_option.side_effect = ["1", "2", "3", "4", "5"]
conn._get_url = MagicMock()
- conn._get_url.side_effect = ['url1', 'url2']
+ conn._get_url.side_effect = ["url1", "url2"]
boto3 = MagicMock()
- boto3.client('s3').return_value = MagicMock()
+ boto3.client("s3").return_value = MagicMock()
conn.get_option.return_value = 1
get_command = MagicMock()
put_command = MagicMock()
@@ -161,11 +160,11 @@ class TestConnectionBaseClass():
conn.exec_command(put_command, in_data=None, sudoable=False)
conn.exec_command(get_command, in_data=None, sudoable=False)
- @patch('subprocess.check_output')
+ @patch("subprocess.check_output")
def test_plugins_connection_aws_ssm_close(self, s_check_output):
pc = PlayContext()
new_stdin = StringIO()
- conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin)
+ conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin)
conn.instance_id = "i-12345"
conn._session_id = True
conn.get_option = MagicMock()
@@ -174,8 +173,8 @@ class TestConnectionBaseClass():
conn._session.terminate = MagicMock()
conn._session.communicate = MagicMock()
conn._terminate_session = MagicMock()
- conn._terminate_session.return_value = ''
+ conn._terminate_session.return_value = ""
conn._session_id = MagicMock()
- conn._session_id.return_value = 'a'
+ conn._session_id.return_value = "a"
conn._client = MagicMock()
conn.close()
diff --git a/ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py b/ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py
diff --git a/ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py b/ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py
new file mode 100644
index 000000000..8969b4a03
--- /dev/null
+++ b/ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py
@@ -0,0 +1,638 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2023 Ali AlKhalidi <@doteast>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+import copy
+import random
+import string
+from unittest.mock import MagicMock
+from unittest.mock import call
+from unittest.mock import patch
+
+import pytest
+
+try:
+ import botocore
+except ImportError:
+ # Handled by HAS_BOTO3
+ pass
+
+from ansible.errors import AnsibleError
+
+from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
+
+from ansible_collections.community.aws.plugins.inventory.aws_mq import InventoryModule
+from ansible_collections.community.aws.plugins.inventory.aws_mq import _add_details_to_hosts
+from ansible_collections.community.aws.plugins.inventory.aws_mq import _find_hosts_matching_statuses
+from ansible_collections.community.aws.plugins.inventory.aws_mq import _get_broker_host_tags
+
+if not HAS_BOTO3:
+ pytestmark = pytest.mark.skip("test_aws_mq.py requires the python modules 'boto3' and 'botocore'")
+
+
+def make_clienterror_exception(code="AccessDenied"):
+ return botocore.exceptions.ClientError(
+ {
+ "Error": {"Code": code, "Message": "User is not authorized to perform: xxx on resource: user yyyy"},
+ "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"},
+ },
+ "getXXX",
+ )
+
+
+@pytest.fixture()
+def inventory():
+ inventory = InventoryModule()
+ inventory.inventory = MagicMock()
+ inventory.inventory.set_variable = MagicMock()
+
+ inventory.all_clients = MagicMock()
+ inventory.get_option = MagicMock()
+
+ inventory._populate_host_vars = MagicMock()
+ inventory._set_composite_vars = MagicMock()
+ inventory._add_host_to_composed_groups = MagicMock()
+ inventory._add_host_to_keyed_groups = MagicMock()
+
+ inventory.get_cache_key = MagicMock()
+
+ inventory._cache = {}
+
+ return inventory
+
+
+@pytest.fixture()
+def connection():
+ conn = MagicMock()
+ return conn
+
+
+@pytest.mark.parametrize(
+ "suffix,result",
+ [
+ ("aws_mq.yml", True),
+ ("aws_mq.yaml", True),
+ ("aws_MQ.yml", False),
+ ("AWS_mq.yaml", False),
+ ],
+)
+def test_inventory_verify_file_suffix(inventory, suffix, result, tmp_path):
+ test_dir = tmp_path / "test_aws_mq"
+ test_dir.mkdir()
+ inventory_file = "inventory" + suffix
+ inventory_file = test_dir / inventory_file
+ inventory_file.write_text("my inventory")
+ assert result == inventory.verify_file(str(inventory_file))
+
+
+def test_inventory_verify_file_with_missing_file(inventory):
+ inventory_file = "this_file_does_not_exist_aws_mq.yml"
+ assert not inventory.verify_file(inventory_file)
+
+
+def generate_random_string(with_digits=True, with_punctuation=True, length=16):
+ data = string.ascii_letters
+ if with_digits:
+ data += string.digits
+ if with_punctuation:
+ data += string.punctuation
+ return "".join([random.choice(data) for i in range(length)])
+
+
+@pytest.mark.parametrize(
+ "hosts,statuses,expected",
+ [
+ (
+ [
+ {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"},
+ {"host": "host2", "BrokerState": "RUNNING"},
+ {"host": "host3", "BrokerState": "REBOOT_IN_PROGRESS"},
+ {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"},
+ {"host": "host5", "BrokerState": "CREATION_FAILED"},
+ {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"},
+ ],
+ ["RUNNING"],
+ [{"host": "host2", "BrokerState": "RUNNING"}],
+ ),
+ (
+ [
+ {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"},
+ {"host": "host2", "BrokerState": "RUNNING"},
+ {"host": "host3", "BrokerState": "REBOOT_IN_PROGRESS"},
+ {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"},
+ {"host": "host5", "BrokerState": "CREATION_FAILED"},
+ {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"},
+ ],
+ ["all"],
+ [
+ {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"},
+ {"host": "host2", "BrokerState": "RUNNING"},
+ {"host": "host3", "BrokerState": "REBOOT_IN_PROGRESS"},
+ {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"},
+ {"host": "host5", "BrokerState": "CREATION_FAILED"},
+ {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"},
+ ],
+ ),
+ (
+ [
+ {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"},
+ {"host": "host2", "BrokerState": "RUNNING"},
+ {"host": "host3", "BrokerState": "CREATION_FAILED"},
+ {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"},
+ {"host": "host5", "BrokerState": "RUNNING"},
+ {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"},
+ ],
+ ["RUNNING"],
+ [
+ {"host": "host2", "BrokerState": "RUNNING"},
+ {"host": "host5", "BrokerState": "RUNNING"},
+ ],
+ ),
+ ],
+)
+def test_find_hosts_matching_statuses(hosts, statuses, expected):
+ assert expected == _find_hosts_matching_statuses(hosts, statuses)
+
+
+@pytest.mark.parametrize("hosts", ["", "host1", "host2,host3", "host2,host3,host1"])
+@patch("ansible_collections.community.aws.plugins.inventory.aws_mq._get_mq_hostname")
+def test_inventory_format_inventory(m_get_mq_hostname, inventory, hosts):
+ hosts_vars = {
+ "host1": {"var10": "value10"},
+ "host2": {"var20": "value20", "var21": "value21"},
+ "host3": {"var30": "value30", "var31": "value31", "var32": "value32"},
+ }
+
+ m_get_mq_hostname.side_effect = lambda h: h["name"]
+
+ class _inventory_host(object):
+ def __init__(self, name, host_vars):
+ self.name = name
+ self.vars = host_vars
+
+ inventory.inventory = MagicMock()
+ inventory.inventory.get_host.side_effect = lambda x: _inventory_host(name=x, host_vars=hosts_vars.get(x))
+
+ hosts = [{"name": x} for x in hosts.split(",") if x]
+ expected = {
+ "_meta": {"hostvars": {x["name"]: hosts_vars.get(x["name"]) for x in hosts}},
+ "aws_mq": {"hosts": [x["name"] for x in hosts]},
+ }
+
+ assert expected == inventory._format_inventory(hosts)
+ if hosts == []:
+ m_get_mq_hostname.assert_not_called()
+
+
+@pytest.mark.parametrize("length", range(0, 10, 2))
+def test_inventory_populate(inventory, length):
+ group = "aws_mq"
+ hosts = [f"host_{int(i)}" for i in range(length)]
+
+ inventory._add_hosts = MagicMock()
+ inventory._populate(hosts=hosts)
+
+ inventory.inventory.add_group.assert_called_with("aws_mq")
+
+ if len(hosts) == 0:
+ inventory.inventory._add_hosts.assert_not_called()
+ inventory.inventory.add_child.assert_not_called()
+ else:
+ inventory._add_hosts.assert_called_with(hosts=hosts, group=group)
+ inventory.inventory.add_child.assert_called_with("all", group)
+
+
+def test_inventory_populate_from_cache(inventory):
+ cache_data = {
+ "_meta": {
+ "hostvars": {
+ "broker_A": {"var10": "value10"},
+ "broker_B": {"var2": "value2"},
+ "broker_C": {"var3": ["value30", "value31", "value32"]},
+ }
+ },
+ "all": {"hosts": ["broker_A", "broker_D", "broker_B", "broker_C"]},
+ "aws_broker_group_A": {"hosts": ["broker_A", "broker_D"]},
+ "aws_broker_group_B": {"hosts": ["broker_B"]},
+ "aws_broker_group_C": {"hosts": ["broker_C"]},
+ }
+
+ inventory._populate_from_cache(cache_data)
+ inventory.inventory.add_group.assert_has_calls(
+ [
+ call("aws_broker_group_A"),
+ call("aws_broker_group_B"),
+ call("aws_broker_group_C"),
+ ],
+ any_order=True,
+ )
+ inventory.inventory.add_child.assert_has_calls(
+ [
+ call("all", "aws_broker_group_A"),
+ call("all", "aws_broker_group_B"),
+ call("all", "aws_broker_group_C"),
+ ],
+ any_order=True,
+ )
+
+ inventory._populate_host_vars.assert_has_calls(
+ [
+ call(["broker_A"], {"var10": "value10"}, "aws_broker_group_A"),
+ call(["broker_D"], {}, "aws_broker_group_A"),
+ call(["broker_B"], {"var2": "value2"}, "aws_broker_group_B"),
+ call(["broker_C"], {"var3": ["value30", "value31", "value32"]}, "aws_broker_group_C"),
+ ],
+ any_order=True,
+ )
+
+
+@pytest.mark.parametrize("detail", [{}, {"Tags": {"tag1": "value1", "tag2": "value2", "Tag3": "Value2"}}])
+def test_get_broker_host_tags(detail):
+ expected_tags = [
+ {"Key": "tag1", "Value": "value1"},
+ {"Key": "tag2", "Value": "value2"},
+ {"Key": "Tag3", "Value": "Value2"},
+ ]
+
+ tags = _get_broker_host_tags(detail)
+
+ if not detail:
+ assert tags == []
+ else:
+ assert tags == expected_tags
+
+
+@pytest.mark.parametrize("strict", [True, False])
+def test_add_details_to_hosts_with_no_hosts(connection, strict):
+ hosts = []
+
+ _add_details_to_hosts(connection, hosts, strict)
+ connection.describe_broker.assert_not_called()
+
+
+def test_add_details_to_hosts_with_failure_not_strict(connection):
+ hosts = [{"BrokerId": "1"}]
+
+ connection.describe_broker.side_effect = make_clienterror_exception()
+
+ _add_details_to_hosts(connection, hosts, strict=False)
+
+ assert hosts == [{"BrokerId": "1"}]
+
+
+def test_add_details_to_hosts_with_failure_strict(connection):
+ hosts = [{"BrokerId": "1"}]
+
+ connection.describe_broker.side_effect = make_clienterror_exception()
+
+ with pytest.raises(AnsibleError):
+ _add_details_to_hosts(connection, hosts, strict=True)
+
+
+def test_add_details_to_hosts_with_hosts(connection):
+ hosts = [{"BrokerId": "1"}, {"BrokerId": "2"}]
+ broker_hosts_tags = {
+ "1": {"Tags": {"tag10": "value10", "tag11": "value11"}},
+ "2": {"Tags": {"tag20": "value20", "tag21": "value21", "tag22": "value22"}},
+ }
+ connection.describe_broker.side_effect = lambda **kwargs: broker_hosts_tags.get(kwargs.get("BrokerId"))
+
+ _add_details_to_hosts(connection, hosts, strict=False)
+
+ assert hosts == [
+ {
+ "BrokerId": "1",
+ "Tags": [
+ {"Key": "tag10", "Value": "value10"},
+ {"Key": "tag11", "Value": "value11"},
+ ],
+ },
+ {
+ "BrokerId": "2",
+ "Tags": [
+ {"Key": "tag20", "Value": "value20"},
+ {"Key": "tag21", "Value": "value21"},
+ {"Key": "tag22", "Value": "value22"},
+ ],
+ },
+ ]
+
+
+ADD_DETAILS_TO_HOSTS = "ansible_collections.community.aws.plugins.inventory.aws_mq._add_details_to_hosts"
+
+
+@patch(ADD_DETAILS_TO_HOSTS)
+def test_get_broker_hosts(m_add_details_to_hosts, inventory, connection):
+ broker = {
+ "BrokerArn": "arn:xxx:xxxx",
+ "BrokerId": "resource_id",
+ "BrokerName": "brk1",
+ "BrokerState": "RUNNING",
+ "EngineType": "RABBITMQ",
+ "DeploymentMode": "CLUSTER_MULTI_AZ",
+ }
+
+ conn_paginator = MagicMock()
+ paginate = MagicMock()
+
+ connection.get_paginator.return_value = conn_paginator
+ conn_paginator.paginate.return_value = paginate
+
+ paginate.build_full_result.side_effect = lambda **kwargs: {"BrokerSummaries": [broker]}
+
+ connection.describe_broker.return_value = {}
+ connection.list_brokers.return_value = {"BrokerSummaries": [broker]}
+
+ strict = False
+
+ result = inventory._get_broker_hosts(connection=connection, strict=strict)(paginate.build_full_result)
+
+ assert result == [broker]
+
+ m_add_details_to_hosts.assert_called_with(connection, result, strict)
+
+
+@pytest.mark.parametrize("strict", [True, False])
+@patch(ADD_DETAILS_TO_HOSTS)
+def test_get_broker_hosts_with_access_denied(m_add_details_to_hosts, inventory, connection, strict):
+ conn_paginator = MagicMock()
+ paginate = MagicMock()
+
+ connection.get_paginator.return_value = conn_paginator
+ conn_paginator.paginate.return_value = paginate
+
+ paginate.build_full_result.side_effect = make_clienterror_exception()
+
+ if strict:
+ with pytest.raises(AnsibleError):
+ inventory._get_broker_hosts(connection=connection, strict=strict)(paginate.build_full_result)
+ else:
+ assert inventory._get_broker_hosts(connection=connection, strict=strict)(paginate.build_full_result) == []
+
+ m_add_details_to_hosts.assert_not_called()
+
+
+@patch(ADD_DETAILS_TO_HOSTS)
+def test_get_broker_hosts_with_client_error(m_add_details_to_hosts, inventory, connection):
+ conn_paginator = MagicMock()
+ paginate = MagicMock()
+
+ connection.get_paginator.return_value = conn_paginator
+ conn_paginator.paginate.return_value = paginate
+
+ paginate.build_full_result.side_effect = make_clienterror_exception(code="Unknown")
+
+ with pytest.raises(AnsibleError):
+ inventory._get_broker_hosts(connection=connection, strict=False)(paginate.build_full_result)
+
+ m_add_details_to_hosts.assert_not_called()
+
+
+FIND_HOSTS_MATCHING_STATUSES = (
+ "ansible_collections.community.aws.plugins.inventory.aws_mq._find_hosts_matching_statuses"
+)
+
+
+@pytest.mark.parametrize("regions", range(1, 5))
+@patch(FIND_HOSTS_MATCHING_STATUSES)
+def test_inventory_get_all_hosts(m_find_hosts, inventory, regions):
+ params = {
+ "regions": [f"us-east-{int(i)}" for i in range(regions)],
+ "strict": random.choice((True, False)),
+ "statuses": [
+ random.choice(
+ [
+ "RUNNING",
+ "CREATION_IN_PROGRESS",
+ "REBOOT_IN_PROGRESS",
+ "DELETION_IN_PROGRESS",
+ "CRITICAL_ACTION_REQUIRED",
+ ]
+ )
+ for i in range(3)
+ ],
+ }
+
+ connections = [MagicMock() for i in range(regions)]
+
+ inventory.all_clients.return_value = [(connections[i], f"us-east-{int(i)}") for i in range(regions)]
+
+ ids = list(reversed(range(regions)))
+ broker_hosts = [{"BrokerName": f"broker_00{int(i)}"} for i in ids]
+
+ inventory._get_broker_hosts = MagicMock()
+ inventory._get_broker_hosts._boto3_paginate_wrapper = MagicMock()
+ inventory._get_broker_hosts._boto3_paginate_wrapper.side_effect = [[i] for i in broker_hosts]
+ inventory._get_broker_hosts.return_value = inventory._get_broker_hosts._boto3_paginate_wrapper
+
+ result = list(sorted(broker_hosts, key=lambda x: x["BrokerName"]))
+
+ m_find_hosts.return_value = result
+
+ assert result == inventory._get_all_hosts(**params)
+ inventory.all_clients.assert_called_with("mq")
+ inventory._get_broker_hosts.assert_has_calls(
+ [call(connections[i], params["strict"]) for i in range(regions)], any_order=True
+ )
+
+ m_find_hosts.assert_called_with(result, params["statuses"])
+
+
+@pytest.mark.parametrize("hostvars_prefix", [True])
+@pytest.mark.parametrize("hostvars_suffix", [True])
+@patch("ansible_collections.community.aws.plugins.inventory.aws_mq._get_mq_hostname")
+def test_inventory_add_hosts(m_get_mq_hostname, inventory, hostvars_prefix, hostvars_suffix):
+ _options = {
+ "strict": random.choice((False, True)),
+ "compose": random.choice((False, True)),
+ "keyed_groups": "keyed_group_test_inventory_add_hosts",
+ "groups": ["all", "test_inventory_add_hosts"],
+ }
+
+ if hostvars_prefix:
+ _options["hostvars_prefix"] = f"prefix_{generate_random_string(length=8, with_punctuation=False)}"
+ if hostvars_suffix:
+ _options["hostvars_suffix"] = f"suffix_{generate_random_string(length=8, with_punctuation=False)}"
+
+ def _get_option_side_effect(x):
+ return _options.get(x)
+
+ inventory.get_option.side_effect = _get_option_side_effect
+
+ m_get_mq_hostname.side_effect = lambda h: h["BrokerName"]
+
+ hosts = [
+ {
+ "BrokerName": "broker_i_001",
+ "Tags": [{"Key": "Name", "Value": "broker_001"}, {"Key": "RunningEngine", "Value": "ActiveMQ"}],
+ "availability_zone": "us-east-1a",
+ },
+ {
+ "BrokerName": "broker_i_002",
+ "Tags": [{"Key": "ClusterName", "Value": "test_cluster"}, {"Key": "RunningOS", "Value": "CoreOS"}],
+ },
+ {
+ "BrokerName": "test_cluster",
+ "Tags": [{"Key": "CluserVersionOrigin", "Value": "2.0"}, {"Key": "Provider", "Value": "RedHat"}],
+ },
+ {
+ "BrokerName": "another_cluster",
+ "Tags": [{"Key": "TestingPurpose", "Value": "Ansible"}],
+ "availability_zones": ["us-west-1a", "us-east-1b"],
+ },
+ ]
+
+ group = f"test_add_hosts_group_{generate_random_string(length=10, with_punctuation=False)}"
+ inventory._add_hosts(hosts, group)
+
+ m_get_mq_hostname.assert_has_calls([call(h) for h in hosts], any_order=True)
+
+ hosts_names = ["broker_i_001", "broker_i_002", "test_cluster", "another_cluster"]
+ inventory.inventory.add_host.assert_has_calls([call(name, group=group) for name in hosts_names], any_order=True)
+
+ camel_hosts = [
+ {
+ "broker_name": "broker_i_001",
+ "tags": {"Name": "broker_001", "RunningEngine": "ActiveMQ"},
+ "availability_zone": "us-east-1a",
+ },
+ {"broker_name": "broker_i_002", "tags": {"ClusterName": "test_cluster", "RunningOS": "CoreOS"}},
+ {"broker_name": "test_cluster", "tags": {"CluserVersionOrigin": "2.0", "Provider": "RedHat"}},
+ {
+ "broker_name": "another_cluster",
+ "tags": {"TestingPurpose": "Ansible"},
+ "availability_zones": ["us-west-1a", "us-east-1b"],
+ },
+ ]
+
+ set_variable_calls = []
+ for i in range(len(camel_hosts)):
+ for var, value in camel_hosts[i].items():
+ if hostvars_prefix:
+ var = _options["hostvars_prefix"] + var
+ if hostvars_suffix:
+ var += _options["hostvars_suffix"]
+ set_variable_calls.append(call(hosts_names[i], var, value))
+
+ inventory.get_option.assert_has_calls([call("hostvars_prefix"), call("hostvars_suffix")])
+ inventory.inventory.set_variable.assert_has_calls(set_variable_calls)
+
+ if hostvars_prefix or hostvars_suffix:
+ tmp = []
+ for host in camel_hosts:
+ new_host = copy.deepcopy(host)
+ for key in host:
+ new_key = key
+ if hostvars_prefix:
+ new_key = _options["hostvars_prefix"] + new_key
+ if hostvars_suffix:
+ new_key += _options["hostvars_suffix"]
+ new_host[new_key] = host[key]
+ tmp.append(new_host)
+ camel_hosts = tmp
+
+ inventory._set_composite_vars.assert_has_calls(
+ [
+ call(_options["compose"], camel_hosts[i], hosts_names[i], strict=_options["strict"])
+ for i in range(len(camel_hosts))
+ ],
+ any_order=True,
+ )
+ inventory._add_host_to_composed_groups.assert_has_calls(
+ [
+ call(_options["groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"])
+ for i in range(len(camel_hosts))
+ ],
+ any_order=True,
+ )
+ inventory._add_host_to_keyed_groups.assert_has_calls(
+ [
+ call(_options["keyed_groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"])
+ for i in range(len(camel_hosts))
+ ],
+ any_order=True,
+ )
+
+
+BASE_INVENTORY_PARSE = "ansible_collections.community.aws.plugins.inventory.aws_mq.AWSInventoryBase.parse"
+
+
+@pytest.mark.parametrize("user_cache_directive", [True, False])
+@pytest.mark.parametrize("cache", [True, False])
+@pytest.mark.parametrize("cache_hit", [True, False])
+@patch(BASE_INVENTORY_PARSE)
+def test_inventory_parse(m_parse, inventory, user_cache_directive, cache, cache_hit):
+ inventory_data = MagicMock()
+ loader = MagicMock()
+ path = generate_random_string(with_punctuation=False, with_digits=False)
+
+ options = {}
+ options["regions"] = [f"us-east-{d}" for d in range(random.randint(1, 5))]
+ options["strict_permissions"] = random.choice((True, False))
+ options["statuses"] = generate_random_string(with_punctuation=False)
+
+ options["cache"] = user_cache_directive
+
+ def get_option_side_effect(v):
+ return options.get(v)
+
+ inventory.get_option.side_effect = get_option_side_effect
+
+ cache_key = path + generate_random_string()
+ inventory.get_cache_key.return_value = cache_key
+
+ cache_key_value = generate_random_string()
+ if cache_hit:
+ inventory._cache[cache_key] = cache_key_value
+
+ inventory._populate = MagicMock()
+ inventory._populate_from_cache = MagicMock()
+ inventory._get_all_hosts = MagicMock()
+ all_hosts = [
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ {"host": f"host_{int(random.randint(1, 1000))}"},
+ ]
+ inventory._get_all_hosts.return_value = all_hosts
+
+ format_cache_key_value = f"format_inventory_{all_hosts}"
+ inventory._format_inventory = MagicMock()
+ inventory._format_inventory.return_value = format_cache_key_value
+
+ inventory.parse(inventory_data, loader, path, cache)
+
+ m_parse.assert_called_with(inventory_data, loader, path, cache=cache)
+
+ if not cache or not user_cache_directive or (cache and user_cache_directive and not cache_hit):
+ inventory._get_all_hosts.assert_called_with(
+ options["regions"],
+ options["strict_permissions"],
+ options["statuses"],
+ )
+ inventory._populate.assert_called_with(all_hosts)
+ inventory._format_inventory.assert_called_with(all_hosts)
+ else:
+ inventory._get_all_hosts.assert_not_called()
+
+ if cache and user_cache_directive and cache_hit:
+ inventory._populate_from_cache.assert_called_with(cache_key_value)
+
+ if cache and user_cache_directive and not cache_hit or (not cache and user_cache_directive):
+ # validate that cache was populated
+ assert inventory._cache[cache_key] == format_cache_key_value
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py b/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py
index a7d1e0475..ba4a1adc3 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py
@@ -1,16 +1,14 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
import json
import pytest
-from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_bytes
from ansible.module_utils.common._collections_compat import MutableMapping
+from ansible.module_utils.six import string_types
@pytest.fixture
@@ -18,14 +16,14 @@ def patch_ansible_module(request, mocker):
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
- if 'ANSIBLE_MODULE_ARGS' not in request.param:
- request.param = {'ANSIBLE_MODULE_ARGS': request.param}
- if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
- request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
- if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
- request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
+ if "ANSIBLE_MODULE_ARGS" not in request.param:
+ request.param = {"ANSIBLE_MODULE_ARGS": request.param}
+ if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]:
+ request.param["ANSIBLE_MODULE_ARGS"]["_ansible_remote_tmp"] = "/tmp"
+ if "_ansible_keep_remote_files" not in request.param["ANSIBLE_MODULE_ARGS"]:
+ request.param["ANSIBLE_MODULE_ARGS"]["_ansible_keep_remote_files"] = False
args = json.dumps(request.param)
else:
- raise Exception('Malformed data to the patch_ansible_module pytest fixture')
+ raise Exception("Malformed data to the patch_ansible_module pytest fixture")
- mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))
+ mocker.patch("ansible.module_utils.basic._ANSIBLE_ARGS", to_bytes(args))
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py
index 726601fe8..608246217 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py
@@ -15,18 +15,21 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
from pprint import pprint
+from ansible.module_utils._text import to_text
+
from ansible_collections.community.aws.plugins.modules.acm_certificate import chain_compare
from ansible_collections.community.aws.plugins.modules.acm_certificate import pem_chain_split
-from ansible.module_utils._text import to_text
def test_chain_compare():
-
# The functions we're testing take module as an argument
# Just so they can call module.fail_json
# Let's just use None for the unit tests,
@@ -34,14 +37,14 @@ def test_chain_compare():
# And if they do, fail_json is not applicable
module = None
- fixture_suffix = 'tests/unit/plugins/modules/fixtures/certs'
+ fixture_suffix = "tests/unit/plugins/modules/fixtures/certs"
# Test chain split function on super simple (invalid) certs
- expected = ['aaa', 'bbb', 'ccc']
+ expected = ["aaa", "bbb", "ccc"]
- for fname in ['simple-chain-a.cert', 'simple-chain-b.cert']:
- path = fixture_suffix + '/' + fname
- with open(path, 'r') as f:
+ for fname in ["simple-chain-a.cert", "simple-chain-b.cert"]:
+ path = fixture_suffix + "/" + fname
+ with open(path, "r") as f:
pem = to_text(f.read())
actual = pem_chain_split(module, pem)
actual = [a.strip() for a in actual]
@@ -50,76 +53,60 @@ def test_chain_compare():
pprint(expected)
print("Actual:")
pprint(actual)
- raise AssertionError("Failed to properly split %s" % fname)
+ raise AssertionError(f"Failed to properly split {fname}")
# Now test real chains
# chains with same same_as should be considered equal
test_chains = [
- { # Original Cert chain
- 'path': fixture_suffix + '/chain-1.0.cert',
- 'same_as': 1,
- 'length': 3
- },
- { # Same as 1.0, but longer PEM lines
- 'path': fixture_suffix + '/chain-1.1.cert',
- 'same_as': 1,
- 'length': 3
- },
+ {"path": fixture_suffix + "/chain-1.0.cert", "same_as": 1, "length": 3}, # Original Cert chain
+ {"path": fixture_suffix + "/chain-1.1.cert", "same_as": 1, "length": 3}, # Same as 1.0, but longer PEM lines
{ # Same as 1.0, but without the stuff before each --------
- 'path': fixture_suffix + '/chain-1.2.cert',
- 'same_as': 1,
- 'length': 3
+ "path": fixture_suffix + "/chain-1.2.cert",
+ "same_as": 1,
+ "length": 3,
},
{ # Same as 1.0, but in a different order, so should be considered different
- 'path': fixture_suffix + '/chain-1.3.cert',
- 'same_as': 2,
- 'length': 3
+ "path": fixture_suffix + "/chain-1.3.cert",
+ "same_as": 2,
+ "length": 3,
},
{ # Same as 1.0, but with last link missing
- 'path': fixture_suffix + '/chain-1.4.cert',
- 'same_as': 3,
- 'length': 2
+ "path": fixture_suffix + "/chain-1.4.cert",
+ "same_as": 3,
+ "length": 2,
},
{ # Completely different cert chain to all the others
- 'path': fixture_suffix + '/chain-4.cert',
- 'same_as': 4,
- 'length': 3
- },
- { # Single cert
- 'path': fixture_suffix + '/a.pem',
- 'same_as': 5,
- 'length': 1
+ "path": fixture_suffix + "/chain-4.cert",
+ "same_as": 4,
+ "length": 3,
},
- { # a different, single cert
- 'path': fixture_suffix + '/b.pem',
- 'same_as': 6,
- 'length': 1
- }
+ {"path": fixture_suffix + "/a.pem", "same_as": 5, "length": 1}, # Single cert
+ {"path": fixture_suffix + "/b.pem", "same_as": 6, "length": 1}, # a different, single cert
]
for chain in test_chains:
- with open(chain['path'], 'r') as f:
- chain['pem_text'] = to_text(f.read())
+ with open(chain["path"], "r") as f:
+ chain["pem_text"] = to_text(f.read())
# Test to make sure our regex isn't too greedy
- chain['split'] = pem_chain_split(module, chain['pem_text'])
- if len(chain['split']) != chain['length']:
+ chain["split"] = pem_chain_split(module, chain["pem_text"])
+ if len(chain["split"]) != chain["length"]:
print("Cert before split")
- print(chain['pem_text'])
+ print(chain["pem_text"])
print("Cert after split")
- pprint(chain['split'])
- print("path: %s" % chain['path'])
- print("Expected chain length: %d" % chain['length'])
- print("Actual chain length: %d" % len(chain['split']))
- raise AssertionError("Chain %s was not split properly" % chain['path'])
+ pprint(chain["split"])
+ print(f"path: {chain['path']}")
+ print(f"Expected chain length: {int(chain['length'])}")
+ print(f"Actual chain length: {len(chain['split'])}")
+ raise AssertionError(f"Chain {chain['path']} was not split properly")
for chain_a in test_chains:
for chain_b in test_chains:
- expected = (chain_a['same_as'] == chain_b['same_as'])
+ expected = chain_a["same_as"] == chain_b["same_as"]
# Now test the comparison function
- actual = chain_compare(module, chain_a['pem_text'], chain_b['pem_text'])
+ actual = chain_compare(module, chain_a["pem_text"], chain_b["pem_text"])
if expected != actual:
- print("Error, unexpected comparison result between \n%s\nand\n%s" % (chain_a['path'], chain_b['path']))
- print("Expected %s got %s" % (str(expected), str(actual)))
+ print(f"Error, unexpected comparison result between \n{chain_a['path']}\nand\n{chain_b['path']}")
+ print(f"Expected {str(expected)} got {str(actual)}")
assert expected == actual
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py
index a6f2c3e91..f0d9de8fa 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py
@@ -5,17 +5,21 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import sys
+
import pytest
from ansible_collections.amazon.aws.plugins.module_utils import modules as aws_modules
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
-from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args
import ansible_collections.community.aws.plugins.modules.api_gateway as agw
+from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args
if not HAS_BOTO3:
pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules")
@@ -25,7 +29,7 @@ exit_return_dict = {}
def fake_exit_json(self, **kwargs):
- """ store the kwargs given to exit_json rather than putting them out to stdout"""
+ """store the kwargs given to exit_json rather than putting them out to stdout"""
global exit_return_dict
exit_return_dict = kwargs
sys.exit(0)
@@ -33,7 +37,6 @@ def fake_exit_json(self, **kwargs):
def test_upload_api(monkeypatch):
class FakeConnection:
-
def put_rest_api(self, *args, **kwargs):
assert kwargs["body"] == "the-swagger-text-is-fake"
return {"msg": "success!"}
@@ -46,25 +49,29 @@ def test_upload_api(monkeypatch):
monkeypatch.setattr(aws_modules, "boto3_conn", return_fake_connection)
monkeypatch.setattr(aws_modules.AnsibleAWSModule, "exit_json", fake_exit_json)
- set_module_args({
- "api_id": "fred",
- "state": "present",
- "swagger_text": "the-swagger-text-is-fake",
- "region": 'mars-north-1',
- "_ansible_tmpdir": "/tmp/ansibl-abcdef",
- })
+ set_module_args(
+ {
+ "api_id": "fred",
+ "state": "present",
+ "swagger_text": "the-swagger-text-is-fake",
+ "region": "mars-north-1",
+ "_ansible_tmpdir": "/tmp/ansibl-abcdef",
+ }
+ )
with pytest.raises(SystemExit):
agw.main()
assert exit_return_dict["changed"]
def test_warn_if_region_not_specified():
-
- set_module_args({
- "name": "api_gateway",
- "state": "present",
- "runtime": 'python2.7',
- "role": 'arn:aws:iam::123456789012:role/lambda_basic_execution',
- "handler": 'lambda_python.my_handler'})
+ set_module_args(
+ {
+ "name": "api_gateway",
+ "state": "present",
+ "runtime": "python2.7",
+ "role": "arn:aws:iam::123456789012:role/lambda_basic_execution",
+ "handler": "lambda_python.my_handler",
+ }
+ )
with pytest.raises(SystemExit):
print(agw.main())
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py
index 1a188e8ed..a2bd06ad8 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py
@@ -4,12 +4,16 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import collections
-import os
import json
+import os
+
import pytest
from ansible.module_utils._text import to_text
@@ -19,11 +23,18 @@ try:
except ImportError:
pass
+from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
+
# Magic... Incorrectly identified by pylint as unused
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import
+# isort: off
+# pylint: disable=unused-import
+
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+# pylint: enable=unused-import
+# isort: on
-from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
from ansible_collections.community.aws.plugins.modules import data_pipeline
if not HAS_BOTO3:
@@ -34,7 +45,7 @@ class FailException(Exception):
pass
-@pytest.fixture(scope='module')
+@pytest.fixture(scope="module")
def dp_setup():
"""
Yield a FakeModule object, data pipeline id of a vanilla data pipeline, and data pipeline objects
@@ -44,41 +55,41 @@ def dp_setup():
Dependencies = collections.namedtuple("Dependencies", ["module", "data_pipeline_id", "objects"])
# get objects to use to test populating and activating the data pipeline
- if not os.getenv('PLACEBO_RECORD'):
- objects = [{"name": "Every 1 day",
- "id": "DefaultSchedule",
- "fields": []},
- {"name": "Default",
- "id": "Default",
- "fields": []}]
+ if not os.getenv("PLACEBO_RECORD"):
+ objects = [
+ {"name": "Every 1 day", "id": "DefaultSchedule", "fields": []},
+ {"name": "Default", "id": "Default", "fields": []},
+ ]
else:
- s3 = boto3.client('s3')
+ s3 = boto3.client("s3")
data = s3.get_object(Bucket="ansible-test-datapipeline", Key="pipeline-object/new.json")
- objects = json.loads(to_text(data['Body'].read()))
+ objects = json.loads(to_text(data["Body"].read()))
# create a module with vanilla data pipeline parameters
- params = {'name': 'ansible-test-create-pipeline',
- 'description': 'ansible-datapipeline-unit-test',
- 'state': 'present',
- 'timeout': 300,
- 'objects': [],
- 'tags': {},
- 'parameters': [],
- 'values': []}
+ params = {
+ "name": "ansible-test-create-pipeline",
+ "description": "ansible-datapipeline-unit-test",
+ "state": "present",
+ "timeout": 300,
+ "objects": [],
+ "tags": {},
+ "parameters": [],
+ "values": [],
+ }
module = FakeModule(**params)
# yield a module, the data pipeline id, and the data pipeline objects (that are not yet defining the vanilla data pipeline)
- if not os.getenv('PLACEBO_RECORD'):
- yield Dependencies(module=module, data_pipeline_id='df-0590406117G8DPQZY2HA', objects=objects)
+ if not os.getenv("PLACEBO_RECORD"):
+ yield Dependencies(module=module, data_pipeline_id="df-0590406117G8DPQZY2HA", objects=objects)
else:
- connection = boto3.client('datapipeline')
+ connection = boto3.client("datapipeline")
_changed, result = data_pipeline.create_pipeline(connection, module)
- data_pipeline_id = result['data_pipeline']['pipeline_id']
+ data_pipeline_id = result["data_pipeline"]["pipeline_id"]
yield Dependencies(module=module, data_pipeline_id=data_pipeline_id, objects=objects)
# remove data pipeline
- if os.getenv('PLACEBO_RECORD'):
- module.params.update(state='absent')
+ if os.getenv("PLACEBO_RECORD"):
+ module.params.update(state="absent")
data_pipeline.delete_pipeline(connection, module)
@@ -89,7 +100,7 @@ class FakeModule(object):
def fail_json(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise FailException('FAIL')
+ raise FailException("FAIL")
def exit_json(self, *args, **kwargs):
self.exit_args = args
@@ -97,91 +108,101 @@ class FakeModule(object):
def test_create_pipeline_already_exists(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
changed, result = data_pipeline.create_pipeline(connection, dp_setup.module)
assert changed is False
- assert "Data Pipeline ansible-test-create-pipeline is present" in result['msg']
+ assert "Data Pipeline ansible-test-create-pipeline is present" in result["msg"]
def test_pipeline_field(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
pipeline_field_info = data_pipeline.pipeline_field(connection, dp_setup.data_pipeline_id, "@pipelineState")
assert pipeline_field_info == "PENDING"
def test_define_pipeline(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- changed, result = data_pipeline.define_pipeline(connection, dp_setup.module, dp_setup.objects, dp_setup.data_pipeline_id)
+ connection = placeboify.client("datapipeline")
+ changed, result = data_pipeline.define_pipeline(
+ connection, dp_setup.module, dp_setup.objects, dp_setup.data_pipeline_id
+ )
assert changed is True
- assert 'has been updated' in result
+ assert "has been updated" in result
def test_deactivate_pipeline(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
_changed, result = data_pipeline.deactivate_pipeline(connection, dp_setup.module)
# XXX possible bug
# assert changed is True
- assert "Data Pipeline ansible-test-create-pipeline deactivated" in result['msg']
+ assert "Data Pipeline ansible-test-create-pipeline deactivated" in result["msg"]
def test_activate_without_population(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
with pytest.raises(FailException):
_changed, _result = data_pipeline.activate_pipeline(connection, dp_setup.module)
- assert dp_setup.module.exit_kwargs.get('msg') == "You need to populate your pipeline before activation."
+ assert dp_setup.module.exit_kwargs.get("msg") == "You need to populate your pipeline before activation."
def test_create_pipeline(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-unittest-create-pipeline',
- 'description': 'ansible-datapipeline-unit-test',
- 'state': 'present',
- 'timeout': 300,
- 'tags': {}}
+ connection = placeboify.client("datapipeline")
+ params = {
+ "name": "ansible-unittest-create-pipeline",
+ "description": "ansible-datapipeline-unit-test",
+ "state": "present",
+ "timeout": 300,
+ "tags": {},
+ }
m = FakeModule(**params)
changed, result = data_pipeline.create_pipeline(connection, m)
assert changed is True
- assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline created."
+ assert result["msg"] == "Data Pipeline ansible-unittest-create-pipeline created."
data_pipeline.delete_pipeline(connection, m)
def test_create_pipeline_with_tags(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-unittest-create-pipeline_tags',
- 'description': 'ansible-datapipeline-unit-test',
- 'state': 'present',
- 'tags': {'ansible': 'test'},
- 'timeout': 300}
+ connection = placeboify.client("datapipeline")
+ params = {
+ "name": "ansible-unittest-create-pipeline_tags",
+ "description": "ansible-datapipeline-unit-test",
+ "state": "present",
+ "tags": {"ansible": "test"},
+ "timeout": 300,
+ }
m = FakeModule(**params)
changed, result = data_pipeline.create_pipeline(connection, m)
assert changed is True
- assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline_tags created."
+ assert result["msg"] == "Data Pipeline ansible-unittest-create-pipeline_tags created."
data_pipeline.delete_pipeline(connection, m)
def test_delete_nonexistent_pipeline(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-test-nonexistent',
- 'description': 'ansible-test-nonexistent',
- 'state': 'absent',
- 'objects': [],
- 'tags': {'ansible': 'test'},
- 'timeout': 300}
+ connection = placeboify.client("datapipeline")
+ params = {
+ "name": "ansible-test-nonexistent",
+ "description": "ansible-test-nonexistent",
+ "state": "absent",
+ "objects": [],
+ "tags": {"ansible": "test"},
+ "timeout": 300,
+ }
m = FakeModule(**params)
changed, _result = data_pipeline.delete_pipeline(connection, m)
assert changed is False
def test_delete_pipeline(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-test-nonexistent',
- 'description': 'ansible-test-nonexistent',
- 'state': 'absent',
- 'objects': [],
- 'tags': {'ansible': 'test'},
- 'timeout': 300}
+ connection = placeboify.client("datapipeline")
+ params = {
+ "name": "ansible-test-nonexistent",
+ "description": "ansible-test-nonexistent",
+ "state": "absent",
+ "objects": [],
+ "tags": {"ansible": "test"},
+ "timeout": 300,
+ }
m = FakeModule(**params)
data_pipeline.create_pipeline(connection, m)
changed, _result = data_pipeline.delete_pipeline(connection, m)
@@ -189,29 +210,29 @@ def test_delete_pipeline(placeboify, maybe_sleep):
def test_build_unique_id_different():
- m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id'})
- m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id-different'})
+ m = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id"})
+ m2 = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id-different"})
assert data_pipeline.build_unique_id(m) != data_pipeline.build_unique_id(m2)
def test_build_unique_id_same():
- m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}})
- m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}})
+ m = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id", "tags": {"ansible": "test"}})
+ m2 = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id", "tags": {"ansible": "test"}})
assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2)
def test_build_unique_id_obj():
# check that the object can be different and the unique id should be the same; should be able to modify objects
- m = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'first': 'object'}]})
- m2 = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'second': 'object'}]})
+ m = FakeModule(**{"name": "ansible-unittest-1", "objects": [{"first": "object"}]})
+ m2 = FakeModule(**{"name": "ansible-unittest-1", "objects": [{"second": "object"}]})
assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2)
def test_format_tags():
- unformatted_tags = {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'}
+ unformatted_tags = {"key1": "val1", "key2": "val2", "key3": "val3"}
formatted_tags = data_pipeline.format_tags(unformatted_tags)
for tag_set in formatted_tags:
- assert unformatted_tags[tag_set['key']] == tag_set['value']
+ assert unformatted_tags[tag_set["key"]] == tag_set["value"]
def test_format_empty_tags():
@@ -221,45 +242,44 @@ def test_format_empty_tags():
def test_pipeline_description(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
dp_id = dp_setup.data_pipeline_id
pipelines = data_pipeline.pipeline_description(connection, dp_id)
- assert dp_id == pipelines['pipelineDescriptionList'][0]['pipelineId']
+ assert dp_id == pipelines["pipelineDescriptionList"][0]["pipelineId"]
def test_pipeline_description_nonexistent(placeboify, maybe_sleep):
hypothetical_pipeline_id = "df-015440025PF7YGLDK47C"
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
with pytest.raises(data_pipeline.DataPipelineNotFound):
data_pipeline.pipeline_description(connection, hypothetical_pipeline_id)
def test_check_dp_exists_true(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
exists = data_pipeline.check_dp_exists(connection, dp_setup.data_pipeline_id)
assert exists is True
def test_check_dp_exists_false(placeboify, maybe_sleep):
hypothetical_pipeline_id = "df-015440025PF7YGLDK47C"
- connection = placeboify.client('datapipeline')
+ connection = placeboify.client("datapipeline")
exists = data_pipeline.check_dp_exists(connection, hypothetical_pipeline_id)
assert exists is False
def test_check_dp_status(placeboify, maybe_sleep, dp_setup):
- inactive_states = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING']
- connection = placeboify.client('datapipeline')
+ inactive_states = ["INACTIVE", "PENDING", "FINISHED", "DELETING"]
+ connection = placeboify.client("datapipeline")
state = data_pipeline.check_dp_status(connection, dp_setup.data_pipeline_id, inactive_states)
assert state is True
def test_activate_pipeline(placeboify, maybe_sleep, dp_setup):
# use objects to define pipeline before activating
- connection = placeboify.client('datapipeline')
- data_pipeline.define_pipeline(connection,
- module=dp_setup.module,
- objects=dp_setup.objects,
- dp_id=dp_setup.data_pipeline_id)
+ connection = placeboify.client("datapipeline")
+ data_pipeline.define_pipeline(
+ connection, module=dp_setup.module, objects=dp_setup.objects, dp_id=dp_setup.data_pipeline_id
+ )
changed, _result = data_pipeline.activate_pipeline(connection, dp_setup.module)
assert changed is True
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py
index 63804415d..f65648dad 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py
@@ -1,28 +1,30 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import call
+from unittest.mock import patch
import pytest
+
try:
from botocore.exceptions import ClientError
except ImportError:
pass
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
-from ansible_collections.community.aws.tests.unit.compat.mock import call
-from ansible_collections.community.aws.tests.unit.compat.mock import patch
+
+from ansible_collections.community.aws.plugins.modules import directconnect_confirm_connection
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleExitJson
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleFailJson
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import ModuleTestCase
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args
-from ansible_collections.community.aws.plugins.modules import directconnect_confirm_connection
-
if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules")
+ pytestmark = pytest.mark.skip(
+ "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules"
+ )
-@patch('ansible_collections.amazon.aws.plugins.module_utils.core.HAS_BOTO3', new=True)
+@patch("ansible_collections.amazon.aws.plugins.module_utils.core.HAS_BOTO3", new=True)
@patch.object(directconnect_confirm_connection.AnsibleAWSModule, "client")
class TestAWSDirectConnectConfirmConnection(ModuleTestCase):
def test_missing_required_parameters(self, *args):
@@ -45,22 +47,18 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase):
"connectionName": "ansible-test-connection",
"bandwidth": "1Gbps",
"ownerAccount": "123456789012",
- "region": "us-west-2"
+ "region": "us-west-2",
}
]
}
- set_module_args({
- "connection_id": "dxcon-fgq9rgot"
- })
+ set_module_args({"connection_id": "dxcon-fgq9rgot"})
with self.assertRaises(AnsibleExitJson) as exec_info:
directconnect_confirm_connection.main()
result = exec_info.exception.args[0]
assert result["changed"] is False
assert result["connection_state"] == "requested"
- mock_client.return_value.describe_connections.assert_has_calls([
- call(connectionId="dxcon-fgq9rgot")
- ])
+ mock_client.return_value.describe_connections.assert_has_calls([call(connectionId="dxcon-fgq9rgot")])
mock_client.return_value.confirm_connection.assert_not_called()
def test_get_by_name(self, mock_client):
@@ -73,39 +71,31 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase):
"connectionName": "ansible-test-connection",
"bandwidth": "1Gbps",
"ownerAccount": "123456789012",
- "region": "us-west-2"
+ "region": "us-west-2",
}
]
}
- set_module_args({
- "name": "ansible-test-connection"
- })
+ set_module_args({"name": "ansible-test-connection"})
with self.assertRaises(AnsibleExitJson) as exec_info:
directconnect_confirm_connection.main()
result = exec_info.exception.args[0]
assert result["changed"] is False
assert result["connection_state"] == "requested"
- mock_client.return_value.describe_connections.assert_has_calls([
- call(),
- call(connectionId="dxcon-fgq9rgot")
- ])
+ mock_client.return_value.describe_connections.assert_has_calls([call(), call(connectionId="dxcon-fgq9rgot")])
mock_client.return_value.confirm_connection.assert_not_called()
def test_missing_connection_id(self, mock_client):
mock_client.return_value.describe_connections.side_effect = ClientError(
- {'Error': {'Code': 'ResourceNotFoundException'}}, 'DescribeConnection')
- set_module_args({
- "connection_id": "dxcon-aaaabbbb"
- })
+ {"Error": {"Code": "ResourceNotFoundException"}}, "DescribeConnection"
+ )
+ set_module_args({"connection_id": "dxcon-aaaabbbb"})
with self.assertRaises(AnsibleFailJson) as exec_info:
directconnect_confirm_connection.main()
result = exec_info.exception.args[0]
assert result["failed"] is True
- mock_client.return_value.describe_connections.assert_has_calls([
- call(connectionId="dxcon-aaaabbbb")
- ])
+ mock_client.return_value.describe_connections.assert_has_calls([call(connectionId="dxcon-aaaabbbb")])
def test_missing_name(self, mock_client):
mock_client.return_value.describe_connections.return_value = {
@@ -117,21 +107,17 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase):
"connectionName": "ansible-test-connection",
"bandwidth": "1Gbps",
"ownerAccount": "123456789012",
- "region": "us-west-2"
+ "region": "us-west-2",
}
]
}
- set_module_args({
- "name": "foobar"
- })
+ set_module_args({"name": "foobar"})
with self.assertRaises(AnsibleFailJson) as exec_info:
directconnect_confirm_connection.main()
result = exec_info.exception.args[0]
assert result["failed"] is True
- mock_client.return_value.describe_connections.assert_has_calls([
- call()
- ])
+ mock_client.return_value.describe_connections.assert_has_calls([call()])
def test_confirm(self, mock_client):
mock_client.return_value.describe_connections.return_value = {
@@ -143,22 +129,22 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase):
"connectionName": "ansible-test-connection",
"bandwidth": "1Gbps",
"ownerAccount": "123456789012",
- "region": "us-west-2"
+ "region": "us-west-2",
}
]
}
mock_client.return_value.confirm_connection.return_value = [{}]
- set_module_args({
- "connection_id": "dxcon-fgq9rgot"
- })
+ set_module_args({"connection_id": "dxcon-fgq9rgot"})
with self.assertRaises(AnsibleExitJson) as exec_info:
directconnect_confirm_connection.main()
result = exec_info.exception.args[0]
assert result["changed"] is True
- mock_client.return_value.describe_connections.assert_has_calls([
- call(connectionId="dxcon-fgq9rgot"),
- call(connectionId="dxcon-fgq9rgot"),
- call(connectionId="dxcon-fgq9rgot")
- ])
+ mock_client.return_value.describe_connections.assert_has_calls(
+ [
+ call(connectionId="dxcon-fgq9rgot"),
+ call(connectionId="dxcon-fgq9rgot"),
+ call(connectionId="dxcon-fgq9rgot"),
+ ]
+ )
mock_client.return_value.confirm_connection.assert_called_once_with(connectionId="dxcon-fgq9rgot")
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py
index 65ba0a3f0..f9a620843 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py
@@ -4,81 +4,90 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import pytest
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
+
# Magic... Incorrectly identified by pylint as unused
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import
+# isort: off
+# pylint: disable=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+# pylint: enable=unused-import
+# isort: on
from ansible_collections.community.aws.plugins.modules import directconnect_connection
if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules")
+ pytestmark = pytest.mark.skip(
+ "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules"
+ )
# When rerecording these tests, create a stand alone connection with default values in us-west-2
# with the name ansible-test-connection and set connection_id to the appropriate value
connection_id = "dxcon-fgq9rgot"
-connection_name = 'ansible-test-connection'
+connection_name = "ansible-test-connection"
def test_connection_status(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- status = directconnect_connection.connection_status(client, connection_id)['connection']
- assert status['connectionName'] == connection_name
- assert status['connectionId'] == connection_id
+ client = placeboify.client("directconnect")
+ status = directconnect_connection.connection_status(client, connection_id)["connection"]
+ assert status["connectionName"] == connection_name
+ assert status["connectionId"] == connection_id
def test_connection_exists_by_id(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
+ client = placeboify.client("directconnect")
exists = directconnect_connection.connection_exists(client, connection_id)
assert exists == connection_id
def test_connection_exists_by_name(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
+ client = placeboify.client("directconnect")
exists = directconnect_connection.connection_exists(client, None, connection_name)
assert exists == connection_id
def test_connection_does_not_exist(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- exists = directconnect_connection.connection_exists(client, 'dxcon-notthere')
+ client = placeboify.client("directconnect")
+ exists = directconnect_connection.connection_exists(client, "dxcon-notthere")
assert exists is False
def test_changed_properties(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- status = directconnect_connection.connection_status(client, connection_id)['connection']
+ client = placeboify.client("directconnect")
+ status = directconnect_connection.connection_status(client, connection_id)["connection"]
location = "differentlocation"
- bandwidth = status['bandwidth']
+ bandwidth = status["bandwidth"]
assert directconnect_connection.changed_properties(status, location, bandwidth) is True
def test_associations_are_not_updated(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- status = directconnect_connection.connection_status(client, connection_id)['connection']
- lag_id = status.get('lagId')
+ client = placeboify.client("directconnect")
+ status = directconnect_connection.connection_status(client, connection_id)["connection"]
+ lag_id = status.get("lagId")
assert directconnect_connection.update_associations(client, status, connection_id, lag_id) is False
def test_create_and_delete(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
+ client = placeboify.client("directconnect")
created_conn = verify_create_works(placeboify, maybe_sleep, client)
verify_delete_works(placeboify, maybe_sleep, client, created_conn)
def verify_create_works(placeboify, maybe_sleep, client):
- created = directconnect_connection.create_connection(client=client,
- location="EqSE2",
- bandwidth="1Gbps",
- name="ansible-test-2",
- lag_id=None)
- assert created.startswith('dxcon')
+ created = directconnect_connection.create_connection(
+ client=client, location="EqSE2", bandwidth="1Gbps", name="ansible-test-2", lag_id=None
+ )
+ assert created.startswith("dxcon")
return created
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py
index 90c8d9604..134be7167 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py
@@ -4,40 +4,52 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
-import pytest
-import os
import collections
+import os
-# Magic... Incorrectly identified by pylint as unused
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import
+import pytest
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_conn
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info
+# Magic... Incorrectly identified by pylint as unused
+# isort: off
+# pylint: disable=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+# pylint: enable=unused-import
+# isort: on
+
from ansible_collections.community.aws.plugins.modules import directconnect_link_aggregation_group as lag_module
if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules")
+ pytestmark = pytest.mark.skip(
+ "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules"
+ )
@pytest.fixture(scope="module")
def dependencies():
-
# each LAG dict will contain the keys: module, connections, virtual_interfaces
Dependencies = collections.namedtuple("Dependencies", ["lag_1", "lag_2"])
lag_1 = dict()
lag_2 = dict()
- vanilla_params = {"name": "ansible_lag_1",
- "location": "EqSe2",
- "num_connections": 1,
- "min_links": 0,
- "bandwidth": "1Gbps"}
+ vanilla_params = {
+ "name": "ansible_lag_1",
+ "location": "EqSe2",
+ "num_connections": 1,
+ "min_links": 0,
+ "bandwidth": "1Gbps",
+ }
for lag in ("ansible_lag_1", "ansible_lag_2"):
params = dict(vanilla_params)
@@ -49,10 +61,19 @@ def dependencies():
if os.getenv("PLACEBO_RECORD"):
region, ec2_url, aws_connect_kwargs = get_aws_connection_info(lag_1["module"], boto3=True)
- client = boto3_conn(lag_1["module"], conn_type="client", resource="directconnect", region=region, endpoint=ec2_url, **aws_connect_kwargs)
+ client = boto3_conn(
+ lag_1["module"],
+ conn_type="client",
+ resource="directconnect",
+ region=region,
+ endpoint=ec2_url,
+ **aws_connect_kwargs,
+ )
# See if link aggregation groups exist
for name in ("ansible_lag_1", "ansible_lag_2"):
- lag_id = lag_module.create_lag(client, num_connections=1, location="EqSe2", bandwidth="1Gbps", name=name, connection_id=None)
+ lag_id = lag_module.create_lag(
+ client, num_connections=1, location="EqSe2", bandwidth="1Gbps", name=name, connection_id=None
+ )
if name == "ansible_lag_1":
lag_1["lag_id"] = lag_id
lag_1["name"] = name
@@ -87,10 +108,7 @@ class FakeModule(object):
def test_nonexistent_lag_status(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id="doesntexist",
- lag_name="doesntexist",
- verify=True)
+ exists = lag_module.lag_exists(client=client, lag_id="doesntexist", lag_name="doesntexist", verify=True)
assert not exists
@@ -103,28 +121,19 @@ def test_lag_status(placeboify, maybe_sleep, dependencies):
def test_lag_exists(placeboify, maybe_sleep, dependencies):
client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id=dependencies.lag_1.get("lag_id"),
- lag_name=None,
- verify=True)
+ exists = lag_module.lag_exists(client=client, lag_id=dependencies.lag_1.get("lag_id"), lag_name=None, verify=True)
assert exists
def test_lag_exists_using_name(placeboify, maybe_sleep, dependencies):
client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id=None,
- lag_name=dependencies.lag_1.get("name"),
- verify=True)
+ exists = lag_module.lag_exists(client=client, lag_id=None, lag_name=dependencies.lag_1.get("name"), verify=True)
assert exists
def test_nonexistent_lag_does_not_exist(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id="dxlag-XXXXXXXX",
- lag_name="doesntexist",
- verify=True)
+ exists = lag_module.lag_exists(client=client, lag_id="dxlag-XXXXXXXX", lag_name="doesntexist", verify=True)
assert not exists
@@ -143,19 +152,21 @@ def test_lag_changed_true_no(placeboify, maybe_sleep, dependencies):
def test_update_lag(placeboify, maybe_sleep, dependencies):
client = placeboify.client("directconnect")
status_before = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id"))
- lag_module.update_lag(client,
- lag_id=dependencies.lag_2.get("lag_id"),
- lag_name="ansible_lag_2_update",
- min_links=0,
- wait=False,
- wait_timeout=0,
- num_connections=1)
+ lag_module.update_lag(
+ client,
+ lag_id=dependencies.lag_2.get("lag_id"),
+ lag_name="ansible_lag_2_update",
+ min_links=0,
+ wait=False,
+ wait_timeout=0,
+ num_connections=1,
+ )
status_after = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id"))
assert status_before != status_after
# remove the lag name from the statuses and verify it was the only thing changed
- del status_before['lagName']
- del status_after['lagName']
+ del status_before["lagName"]
+ del status_after["lagName"]
assert status_before == status_after
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py
index 4f0086421..62b511bde 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py
@@ -4,20 +4,31 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import pytest
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
+
# Magic... Incorrectly identified by pylint as unused
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import
+# isort: off
+# pylint: disable=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+# pylint: enable=unused-import
+# isort: on
from ansible_collections.community.aws.plugins.modules import directconnect_virtual_interface
if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules")
+ pytestmark = pytest.mark.skip(
+ "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules"
+ )
class FailException(Exception):
@@ -46,10 +57,7 @@ def test_find_unique_vi_by_connection_id(placeboify, maybe_sleep):
def test_find_unique_vi_by_vi_id(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- vi_id = directconnect_virtual_interface.find_unique_vi(client,
- None,
- "dxvif-aaaaaaaaa",
- None)
+ vi_id = directconnect_virtual_interface.find_unique_vi(client, None, "dxvif-aaaaaaaaa", None)
assert vi_id == "dxvif-aaaaaaaa"
@@ -61,47 +69,38 @@ def test_find_unique_vi_by_name(placeboify, maybe_sleep):
def test_find_unique_vi_returns_multiple(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- module = FakeModule(state="present",
- id_to_associate="dxcon-aaaaaaaa",
- public=False,
- name=None)
+ module = FakeModule(state="present", id_to_associate="dxcon-aaaaaaaa", public=False, name=None)
with pytest.raises(FailException):
- directconnect_virtual_interface.ensure_state(
- client,
- module
- )
+ directconnect_virtual_interface.ensure_state(client, module)
assert "Multiple virtual interfaces were found" in module.exit_kwargs["msg"]
def test_find_unique_vi_returns_missing_for_vi_id(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- module = FakeModule(state="present",
- id_to_associate=None,
- public=False,
- name=None,
- virtual_interface_id="dxvif-aaaaaaaa")
+ module = FakeModule(
+ state="present", id_to_associate=None, public=False, name=None, virtual_interface_id="dxvif-aaaaaaaa"
+ )
with pytest.raises(FailException):
- directconnect_virtual_interface.ensure_state(
- client,
- module
- )
+ directconnect_virtual_interface.ensure_state(client, module)
assert "The virtual interface dxvif-aaaaaaaa does not exist" in module.exit_kwargs["msg"]
def test_construct_public_vi():
- module = FakeModule(state="present",
- id_to_associate=None,
- public=True,
- name="aaaaaaaa",
- vlan=1,
- bgp_asn=123,
- authentication_key="aaaa",
- customer_address="169.254.0.1/30",
- amazon_address="169.254.0.2/30",
- address_type="ipv4",
- cidr=["10.88.0.0/30"],
- virtual_gateway_id="xxxx",
- direct_connect_gateway_id="yyyy")
+ module = FakeModule(
+ state="present",
+ id_to_associate=None,
+ public=True,
+ name="aaaaaaaa",
+ vlan=1,
+ bgp_asn=123,
+ authentication_key="aaaa",
+ customer_address="169.254.0.1/30",
+ amazon_address="169.254.0.2/30",
+ address_type="ipv4",
+ cidr=["10.88.0.0/30"],
+ virtual_gateway_id="xxxx",
+ direct_connect_gateway_id="yyyy",
+ )
vi = directconnect_virtual_interface.assemble_params_for_creating_vi(module.params)
assert vi == {
"virtualInterfaceName": "aaaaaaaa",
@@ -111,24 +110,26 @@ def test_construct_public_vi():
"amazonAddress": "169.254.0.2/30",
"customerAddress": "169.254.0.1/30",
"addressFamily": "ipv4",
- "routeFilterPrefixes": [{"cidr": "10.88.0.0/30"}]
+ "routeFilterPrefixes": [{"cidr": "10.88.0.0/30"}],
}
def test_construct_private_vi_with_virtual_gateway_id():
- module = FakeModule(state="present",
- id_to_associate=None,
- public=False,
- name="aaaaaaaa",
- vlan=1,
- bgp_asn=123,
- authentication_key="aaaa",
- customer_address="169.254.0.1/30",
- amazon_address="169.254.0.2/30",
- address_type="ipv4",
- cidr=["10.88.0.0/30"],
- virtual_gateway_id="xxxx",
- direct_connect_gateway_id="yyyy")
+ module = FakeModule(
+ state="present",
+ id_to_associate=None,
+ public=False,
+ name="aaaaaaaa",
+ vlan=1,
+ bgp_asn=123,
+ authentication_key="aaaa",
+ customer_address="169.254.0.1/30",
+ amazon_address="169.254.0.2/30",
+ address_type="ipv4",
+ cidr=["10.88.0.0/30"],
+ virtual_gateway_id="xxxx",
+ direct_connect_gateway_id="yyyy",
+ )
vi = directconnect_virtual_interface.assemble_params_for_creating_vi(module.params)
assert vi == {
"virtualInterfaceName": "aaaaaaaa",
@@ -138,24 +139,26 @@ def test_construct_private_vi_with_virtual_gateway_id():
"amazonAddress": "169.254.0.2/30",
"customerAddress": "169.254.0.1/30",
"addressFamily": "ipv4",
- "virtualGatewayId": "xxxx"
+ "virtualGatewayId": "xxxx",
}
def test_construct_private_vi_with_direct_connect_gateway_id():
- module = FakeModule(state="present",
- id_to_associate=None,
- public=False,
- name="aaaaaaaa",
- vlan=1,
- bgp_asn=123,
- authentication_key="aaaa",
- customer_address="169.254.0.1/30",
- amazon_address="169.254.0.2/30",
- address_type="ipv4",
- cidr=["10.88.0.0/30"],
- virtual_gateway_id=None,
- direct_connect_gateway_id="yyyy")
+ module = FakeModule(
+ state="present",
+ id_to_associate=None,
+ public=False,
+ name="aaaaaaaa",
+ vlan=1,
+ bgp_asn=123,
+ authentication_key="aaaa",
+ customer_address="169.254.0.1/30",
+ amazon_address="169.254.0.2/30",
+ address_type="ipv4",
+ cidr=["10.88.0.0/30"],
+ virtual_gateway_id=None,
+ direct_connect_gateway_id="yyyy",
+ )
vi = directconnect_virtual_interface.assemble_params_for_creating_vi(module.params)
print(vi)
assert vi == {
@@ -166,26 +169,28 @@ def test_construct_private_vi_with_direct_connect_gateway_id():
"amazonAddress": "169.254.0.2/30",
"customerAddress": "169.254.0.1/30",
"addressFamily": "ipv4",
- "directConnectGatewayId": "yyyy"
+ "directConnectGatewayId": "yyyy",
}
def test_create_public_vi(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- module = FakeModule(state="present",
- id_to_associate='dxcon-aaaaaaaa',
- virtual_interface_id=None,
- public=True,
- name="aaaaaaaa",
- vlan=1,
- bgp_asn=123,
- authentication_key="aaaa",
- customer_address="169.254.0.1/30",
- amazon_address="169.254.0.2/30",
- address_type="ipv4",
- cidr=["10.88.0.0/30"],
- virtual_gateway_id="xxxx",
- direct_connect_gateway_id="yyyy")
+ module = FakeModule(
+ state="present",
+ id_to_associate="dxcon-aaaaaaaa",
+ virtual_interface_id=None,
+ public=True,
+ name="aaaaaaaa",
+ vlan=1,
+ bgp_asn=123,
+ authentication_key="aaaa",
+ customer_address="169.254.0.1/30",
+ amazon_address="169.254.0.2/30",
+ address_type="ipv4",
+ cidr=["10.88.0.0/30"],
+ virtual_gateway_id="xxxx",
+ direct_connect_gateway_id="yyyy",
+ )
changed, latest_state = directconnect_virtual_interface.ensure_state(client, module)
assert changed is True
assert latest_state is not None
@@ -193,20 +198,22 @@ def test_create_public_vi(placeboify, maybe_sleep):
def test_create_private_vi(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- module = FakeModule(state="present",
- id_to_associate='dxcon-aaaaaaaa',
- virtual_interface_id=None,
- public=False,
- name="aaaaaaaa",
- vlan=1,
- bgp_asn=123,
- authentication_key="aaaa",
- customer_address="169.254.0.1/30",
- amazon_address="169.254.0.2/30",
- address_type="ipv4",
- cidr=["10.88.0.0/30"],
- virtual_gateway_id="xxxx",
- direct_connect_gateway_id="yyyy")
+ module = FakeModule(
+ state="present",
+ id_to_associate="dxcon-aaaaaaaa",
+ virtual_interface_id=None,
+ public=False,
+ name="aaaaaaaa",
+ vlan=1,
+ bgp_asn=123,
+ authentication_key="aaaa",
+ customer_address="169.254.0.1/30",
+ amazon_address="169.254.0.2/30",
+ address_type="ipv4",
+ cidr=["10.88.0.0/30"],
+ virtual_gateway_id="xxxx",
+ direct_connect_gateway_id="yyyy",
+ )
changed, latest_state = directconnect_virtual_interface.ensure_state(client, module)
assert changed is True
assert latest_state is not None
@@ -214,20 +221,22 @@ def test_create_private_vi(placeboify, maybe_sleep):
def test_delete_vi(placeboify, maybe_sleep):
client = placeboify.client("directconnect")
- module = FakeModule(state="absent",
- id_to_associate='dxcon-aaaaaaaa',
- virtual_interface_id='dxvif-aaaaaaaa',
- public=False,
- name="aaaaaaaa",
- vlan=1,
- bgp_asn=123,
- authentication_key="aaaa",
- customer_address="169.254.0.1/30",
- amazon_address="169.254.0.2/30",
- address_type="ipv4",
- cidr=["10.88.0.0/30"],
- virtual_gateway_id=None,
- direct_connect_gateway_id="yyyy")
+ module = FakeModule(
+ state="absent",
+ id_to_associate="dxcon-aaaaaaaa",
+ virtual_interface_id="dxvif-aaaaaaaa",
+ public=False,
+ name="aaaaaaaa",
+ vlan=1,
+ bgp_asn=123,
+ authentication_key="aaaa",
+ customer_address="169.254.0.1/30",
+ amazon_address="169.254.0.2/30",
+ address_type="ipv4",
+ cidr=["10.88.0.0/30"],
+ virtual_gateway_id=None,
+ direct_connect_gateway_id="yyyy",
+ )
changed, latest_state = directconnect_virtual_interface.ensure_state(client, module)
assert changed is True
assert latest_state == {}
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py
index 88a1aea83..2b5db4226 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py
@@ -1,21 +1,29 @@
# (c) 2017 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import os
-import pytest
-# Magic... Incorrectly identified by pylint as unused
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import
-from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import
+import pytest
-import ansible_collections.amazon.aws.plugins.module_utils.modules as aws_modules
import ansible_collections.amazon.aws.plugins.module_utils.retries as aws_retries
-from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_conn
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_tag_list_to_ansible_dict
+from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info
+
+# Magic... Incorrectly identified by pylint as unused
+# isort: off
+# pylint: disable=unused-import
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep
+from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify
+
+# pylint: enable=unused-import
+# isort: on
from ansible_collections.community.aws.plugins.modules import ec2_vpc_vpn
@@ -31,12 +39,12 @@ class FakeModule(object):
def fail_json_aws(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise FailException('FAIL')
+ raise FailException("FAIL")
def fail_json(self, *args, **kwargs):
self.exit_args = args
self.exit_kwargs = kwargs
- raise FailException('FAIL')
+ raise FailException("FAIL")
def exit_json(self, *args, **kwargs):
self.exit_args = args
@@ -45,36 +53,44 @@ class FakeModule(object):
def get_vgw(connection):
# see if two vgw exist and return them if so
- vgw = connection.describe_vpn_gateways(Filters=[{'Name': 'tag:Ansible_VPN', 'Values': ['Test']}])
- if len(vgw['VpnGateways']) >= 2:
- return [vgw['VpnGateways'][0]['VpnGatewayId'], vgw['VpnGateways'][1]['VpnGatewayId']]
+ vgw = connection.describe_vpn_gateways(Filters=[{"Name": "tag:Ansible_VPN", "Values": ["Test"]}])
+ if len(vgw["VpnGateways"]) >= 2:
+ return [vgw["VpnGateways"][0]["VpnGatewayId"], vgw["VpnGateways"][1]["VpnGatewayId"]]
# otherwise create two and return them
- vgw_1 = connection.create_vpn_gateway(Type='ipsec.1')
- vgw_2 = connection.create_vpn_gateway(Type='ipsec.1')
+ vgw_1 = connection.create_vpn_gateway(Type="ipsec.1")
+ vgw_2 = connection.create_vpn_gateway(Type="ipsec.1")
for resource in (vgw_1, vgw_2):
- connection.create_tags(Resources=[resource['VpnGateway']['VpnGatewayId']], Tags=[{'Key': 'Ansible_VPN', 'Value': 'Test'}])
- return [vgw_1['VpnGateway']['VpnGatewayId'], vgw_2['VpnGateway']['VpnGatewayId']]
+ connection.create_tags(
+ Resources=[resource["VpnGateway"]["VpnGatewayId"]], Tags=[{"Key": "Ansible_VPN", "Value": "Test"}]
+ )
+ return [vgw_1["VpnGateway"]["VpnGatewayId"], vgw_2["VpnGateway"]["VpnGatewayId"]]
def get_cgw(connection):
# see if two cgw exist and return them if so
- cgw = connection.describe_customer_gateways(DryRun=False, Filters=[{'Name': 'state', 'Values': ['available']},
- {'Name': 'tag:Name', 'Values': ['Ansible-CGW']}])
- if len(cgw['CustomerGateways']) >= 2:
- return [cgw['CustomerGateways'][0]['CustomerGatewayId'], cgw['CustomerGateways'][1]['CustomerGatewayId']]
+ cgw = connection.describe_customer_gateways(
+ DryRun=False,
+ Filters=[{"Name": "state", "Values": ["available"]}, {"Name": "tag:Name", "Values": ["Ansible-CGW"]}],
+ )
+ if len(cgw["CustomerGateways"]) >= 2:
+ return [cgw["CustomerGateways"][0]["CustomerGatewayId"], cgw["CustomerGateways"][1]["CustomerGatewayId"]]
# otherwise create and return them
- cgw_1 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='9.8.7.6', BgpAsn=65000)
- cgw_2 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='5.4.3.2', BgpAsn=65000)
+ cgw_1 = connection.create_customer_gateway(DryRun=False, Type="ipsec.1", PublicIp="9.8.7.6", BgpAsn=65000)
+ cgw_2 = connection.create_customer_gateway(DryRun=False, Type="ipsec.1", PublicIp="5.4.3.2", BgpAsn=65000)
for resource in (cgw_1, cgw_2):
- connection.create_tags(Resources=[resource['CustomerGateway']['CustomerGatewayId']], Tags=[{'Key': 'Ansible-CGW', 'Value': 'Test'}])
- return [cgw_1['CustomerGateway']['CustomerGatewayId'], cgw_2['CustomerGateway']['CustomerGatewayId']]
+ connection.create_tags(
+ Resources=[resource["CustomerGateway"]["CustomerGatewayId"]], Tags=[{"Key": "Ansible-CGW", "Value": "Test"}]
+ )
+ return [cgw_1["CustomerGateway"]["CustomerGatewayId"], cgw_2["CustomerGateway"]["CustomerGatewayId"]]
def get_dependencies():
- if os.getenv('PLACEBO_RECORD'):
+ if os.getenv("PLACEBO_RECORD"):
module = FakeModule(**{})
region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
- connection = boto3_conn(module, conn_type='client', resource='ec2', region=region, endpoint=ec2_url, **aws_connect_kwargs)
+ connection = boto3_conn(
+ module, conn_type="client", resource="ec2", region=region, endpoint=ec2_url, **aws_connect_kwargs
+ )
vgw = get_vgw(connection)
cgw = get_cgw(connection)
else:
@@ -85,9 +101,9 @@ def get_dependencies():
def setup_mod_conn(placeboify, params):
- conn = placeboify.client('ec2')
+ conn = placeboify.client("ec2")
retry_decorator = aws_retries.AWSRetry.jittered_backoff()
- wrapped_conn = aws_modules._RetryingBotoClientWrapper(conn, retry_decorator)
+ wrapped_conn = aws_retries.RetryingBotoClientWrapper(conn, retry_decorator)
m = FakeModule(**params)
return m, wrapped_conn
@@ -97,23 +113,25 @@ def make_params(cgw, vgw, tags=None, filters=None, routes=None):
filters = {} if filters is None else filters
routes = [] if routes is None else routes
- return {'customer_gateway_id': cgw,
- 'static_only': True,
- 'vpn_gateway_id': vgw,
- 'connection_type': 'ipsec.1',
- 'purge_tags': True,
- 'tags': tags,
- 'filters': filters,
- 'routes': routes,
- 'delay': 15,
- 'wait_timeout': 600}
+ return {
+ "customer_gateway_id": cgw,
+ "static_only": True,
+ "vpn_gateway_id": vgw,
+ "connection_type": "ipsec.1",
+ "purge_tags": True,
+ "tags": tags,
+ "filters": filters,
+ "routes": routes,
+ "delay": 15,
+ "wait_timeout": 600,
+ }
def make_conn(placeboify, module, connection):
- customer_gateway_id = module.params['customer_gateway_id']
- static_only = module.params['static_only']
- vpn_gateway_id = module.params['vpn_gateway_id']
- connection_type = module.params['connection_type']
+ customer_gateway_id = module.params["customer_gateway_id"]
+ static_only = module.params["static_only"]
+ vpn_gateway_id = module.params["vpn_gateway_id"]
+ connection_type = module.params["connection_type"]
changed = True
vpn = ec2_vpc_vpn.create_connection(connection, customer_gateway_id, static_only, vpn_gateway_id, connection_type)
return changed, vpn
@@ -124,7 +142,7 @@ def tear_down_conn(placeboify, connection, vpn_connection_id):
def setup_req(placeboify, number_of_results=1):
- ''' returns dependencies for VPN connections '''
+ """returns dependencies for VPN connections"""
assert number_of_results in (1, 2)
results = []
cgw, vgw = get_dependencies()
@@ -133,7 +151,7 @@ def setup_req(placeboify, number_of_results=1):
m, conn = setup_mod_conn(placeboify, params)
vpn = ec2_vpc_vpn.ensure_present(conn, params)[1]
- results.append({'module': m, 'connection': conn, 'vpn': vpn, 'params': params})
+ results.append({"module": m, "connection": conn, "vpn": vpn, "params": params})
if number_of_results == 1:
return results[0]
else:
@@ -144,41 +162,44 @@ def test_find_connection_vpc_conn_id(placeboify, maybe_sleep):
# setup dependencies for 2 vpn connections
dependencies = setup_req(placeboify, 2)
dep1, dep2 = dependencies[0], dependencies[1]
- params1, vpn1, _m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection']
- _params2, vpn2, _m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection']
+ params1, vpn1, _m1, conn1 = dep1["params"], dep1["vpn"], dep1["module"], dep1["connection"]
+ _params2, vpn2, _m2, conn2 = dep2["params"], dep2["vpn"], dep2["module"], dep2["connection"]
# find the connection with a vpn_connection_id and assert it is the expected one
- assert vpn1['VpnConnectionId'] == ec2_vpc_vpn.find_connection(conn1, params1, vpn1['VpnConnectionId'])['VpnConnectionId']
+ assert (
+ vpn1["VpnConnectionId"]
+ == ec2_vpc_vpn.find_connection(conn1, params1, vpn1["VpnConnectionId"])["VpnConnectionId"]
+ )
- tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId'])
- tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId'])
+ tear_down_conn(placeboify, conn1, vpn1["VpnConnectionId"])
+ tear_down_conn(placeboify, conn2, vpn2["VpnConnectionId"])
def test_find_connection_filters(placeboify, maybe_sleep):
# setup dependencies for 2 vpn connections
dependencies = setup_req(placeboify, 2)
dep1, dep2 = dependencies[0], dependencies[1]
- params1, vpn1, _m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection']
- params2, vpn2, _m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection']
+ params1, vpn1, _m1, conn1 = dep1["params"], dep1["vpn"], dep1["module"], dep1["connection"]
+ params2, vpn2, _m2, conn2 = dep2["params"], dep2["vpn"], dep2["module"], dep2["connection"]
# update to different tags
- params1.update(tags={'Wrong': 'Tag'})
- params2.update(tags={'Correct': 'Tag'})
+ params1.update(tags={"Wrong": "Tag"})
+ params2.update(tags={"Correct": "Tag"})
ec2_vpc_vpn.ensure_present(conn1, params1)
ec2_vpc_vpn.ensure_present(conn2, params2)
# create some new parameters for a filter
- params = {'filters': {'tags': {'Correct': 'Tag'}}}
+ params = {"filters": {"tags": {"Correct": "Tag"}}}
# find the connection that has the parameters above
found = ec2_vpc_vpn.find_connection(conn1, params)
# assert the correct connection was found
- assert found['VpnConnectionId'] == vpn2['VpnConnectionId']
+ assert found["VpnConnectionId"] == vpn2["VpnConnectionId"]
# delete the connections
- tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId'])
- tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId'])
+ tear_down_conn(placeboify, conn1, vpn1["VpnConnectionId"])
+ tear_down_conn(placeboify, conn2, vpn2["VpnConnectionId"])
def test_find_connection_insufficient_filters(placeboify, maybe_sleep):
@@ -186,15 +207,15 @@ def test_find_connection_insufficient_filters(placeboify, maybe_sleep):
cgw, vgw = get_dependencies()
# create two connections with the same tags
- params = make_params(cgw[0], vgw[0], tags={'Correct': 'Tag'})
- params2 = make_params(cgw[1], vgw[1], tags={'Correct': 'Tag'})
+ params = make_params(cgw[0], vgw[0], tags={"Correct": "Tag"})
+ params2 = make_params(cgw[1], vgw[1], tags={"Correct": "Tag"})
m, conn = setup_mod_conn(placeboify, params)
m2, conn2 = setup_mod_conn(placeboify, params2)
vpn1 = ec2_vpc_vpn.ensure_present(conn, m.params)[1]
vpn2 = ec2_vpc_vpn.ensure_present(conn2, m2.params)[1]
# reset the parameters so only filtering by tags will occur
- m.params = {'filters': {'tags': {'Correct': 'Tag'}}}
+ m.params = {"filters": {"tags": {"Correct": "Tag"}}}
expected_message = "More than one matching VPN connection was found"
# assert that multiple matching connections have been found
@@ -202,13 +223,13 @@ def test_find_connection_insufficient_filters(placeboify, maybe_sleep):
ec2_vpc_vpn.find_connection(conn, m.params)
# delete the connections
- tear_down_conn(placeboify, conn, vpn1['VpnConnectionId'])
- tear_down_conn(placeboify, conn, vpn2['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn1["VpnConnectionId"])
+ tear_down_conn(placeboify, conn, vpn2["VpnConnectionId"])
def test_find_connection_nonexistent(placeboify, maybe_sleep):
# create parameters but don't create a connection with them
- params = {'filters': {'tags': {'Correct': 'Tag'}}}
+ params = {"filters": {"tags": {"Correct": "Tag"}}}
m, conn = setup_mod_conn(placeboify, params)
# try to find a connection with matching parameters and assert None are found
@@ -226,38 +247,48 @@ def test_create_connection(placeboify, maybe_sleep):
# assert that changed is true and that there is a connection id
assert changed is True
- assert 'VpnConnectionId' in vpn
+ assert "VpnConnectionId" in vpn
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
def test_create_connection_that_exists(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ params, vpn, _m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# try to recreate the same connection
changed, vpn2 = ec2_vpc_vpn.ensure_present(conn, params)
# nothing should have changed
assert changed is False
- assert vpn['VpnConnectionId'] == vpn2['VpnConnectionId']
+ assert vpn["VpnConnectionId"] == vpn2["VpnConnectionId"]
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
def test_modify_deleted_connection(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- _params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ _params, vpn, m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# delete it
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
# try to update the deleted connection
- m.params.update(vpn_connection_id=vpn['VpnConnectionId'])
+ m.params.update(vpn_connection_id=vpn["VpnConnectionId"])
expected_message = "no VPN connection available or pending with that id"
with pytest.raises(ec2_vpc_vpn.VPNConnectionException, match=expected_message):
ec2_vpc_vpn.ensure_present(conn, m.params)
@@ -266,7 +297,12 @@ def test_modify_deleted_connection(placeboify, maybe_sleep):
def test_delete_connection(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- _params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ _params, vpn, m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# delete it
changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params)
@@ -277,7 +313,7 @@ def test_delete_connection(placeboify, maybe_sleep):
def test_delete_nonexistent_connection(placeboify, maybe_sleep):
# create parameters and ensure any connection matching (None) is deleted
- params = {'filters': {'tags': {'ThisConnection': 'DoesntExist'}}, 'delay': 15, 'wait_timeout': 600}
+ params = {"filters": {"tags": {"ThisConnection": "DoesntExist"}}, "delay": 15, "wait_timeout": 600}
m, conn = setup_mod_conn(placeboify, params)
changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params)
@@ -288,83 +324,112 @@ def test_delete_nonexistent_connection(placeboify, maybe_sleep):
def test_check_for_update_tags(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- _params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ _params, vpn, m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# add and remove a number of tags
- m.params['tags'] = {'One': 'one', 'Two': 'two'}
+ m.params["tags"] = {"One": "one", "Two": "two"}
ec2_vpc_vpn.ensure_present(conn, m.params)
- m.params['tags'] = {'Two': 'two', 'Three': 'three', 'Four': 'four'}
- changes = ec2_vpc_vpn.check_for_update(conn, m.params, vpn['VpnConnectionId'])
+ m.params["tags"] = {"Two": "two", "Three": "three", "Four": "four"}
+ changes = ec2_vpc_vpn.check_for_update(conn, m.params, vpn["VpnConnectionId"])
- flat_dict_changes = boto3_tag_list_to_ansible_dict(changes['tags_to_add'])
- correct_changes = boto3_tag_list_to_ansible_dict([{'Key': 'Three', 'Value': 'three'}, {'Key': 'Four', 'Value': 'four'}])
+ flat_dict_changes = boto3_tag_list_to_ansible_dict(changes["tags_to_add"])
+ correct_changes = boto3_tag_list_to_ansible_dict(
+ [{"Key": "Three", "Value": "three"}, {"Key": "Four", "Value": "four"}]
+ )
assert flat_dict_changes == correct_changes
- assert changes['tags_to_remove'] == ['One']
+ assert changes["tags_to_remove"] == ["One"]
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
def test_check_for_update_nonmodifiable_attr(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
- current_vgw = params['vpn_gateway_id']
+ params, vpn, m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
+ current_vgw = params["vpn_gateway_id"]
# update a parameter that isn't modifiable
m.params.update(vpn_gateway_id="invalidchange")
- expected_message = 'You cannot modify vpn_gateway_id, the current value of which is {0}. Modifiable VPN connection attributes are'.format(current_vgw)
+ expected_message = f"You cannot modify vpn_gateway_id, the current value of which is {current_vgw}. Modifiable VPN connection attributes are"
with pytest.raises(ec2_vpc_vpn.VPNConnectionException, match=expected_message):
- ec2_vpc_vpn.check_for_update(conn, m.params, vpn['VpnConnectionId'])
+ ec2_vpc_vpn.check_for_update(conn, m.params, vpn["VpnConnectionId"])
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
def test_add_tags(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ params, vpn, _m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# add a tag to the connection
- ec2_vpc_vpn.add_tags(conn, vpn['VpnConnectionId'], add=[{'Key': 'Ansible-Test', 'Value': 'VPN'}])
+ ec2_vpc_vpn.add_tags(conn, vpn["VpnConnectionId"], add=[{"Key": "Ansible-Test", "Value": "VPN"}])
# assert tag is there
current_vpn = ec2_vpc_vpn.find_connection(conn, params)
- assert current_vpn['Tags'] == [{'Key': 'Ansible-Test', 'Value': 'VPN'}]
+ assert current_vpn["Tags"] == [{"Key": "Ansible-Test", "Value": "VPN"}]
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
def test_remove_tags(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ params, vpn, _m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# remove a tag from the connection
- ec2_vpc_vpn.remove_tags(conn, vpn['VpnConnectionId'], remove=['Ansible-Test'])
+ ec2_vpc_vpn.remove_tags(conn, vpn["VpnConnectionId"], remove=["Ansible-Test"])
# assert the tag is gone
current_vpn = ec2_vpc_vpn.find_connection(conn, params)
- assert 'Tags' not in current_vpn
+ assert "Tags" not in current_vpn
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
def test_add_routes(placeboify, maybe_sleep):
# setup dependencies for 1 vpn connection
dependencies = setup_req(placeboify, 1)
- params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
+ params, vpn, _m, conn = (
+ dependencies["params"],
+ dependencies["vpn"],
+ dependencies["module"],
+ dependencies["connection"],
+ )
# create connection with a route
- ec2_vpc_vpn.add_routes(conn, vpn['VpnConnectionId'], ['195.168.2.0/24', '196.168.2.0/24'])
+ ec2_vpc_vpn.add_routes(conn, vpn["VpnConnectionId"], ["195.168.2.0/24", "196.168.2.0/24"])
# assert both routes are there
current_vpn = ec2_vpc_vpn.find_connection(conn, params)
- assert set(each['DestinationCidrBlock'] for each in current_vpn['Routes']) == set(['195.168.2.0/24', '196.168.2.0/24'])
+ assert set(each["DestinationCidrBlock"] for each in current_vpn["Routes"]) == set(
+ ["195.168.2.0/24", "196.168.2.0/24"]
+ )
# delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
+ tear_down_conn(placeboify, conn, vpn["VpnConnectionId"])
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py
index 939620120..7f832aa71 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py
@@ -1,8 +1,4 @@
-from __future__ import (absolute_import, division, print_function)
-
-__metaclass__ = type
-
-'''
+"""
Commands to encrypt a message that can be decrypted:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
@@ -15,9 +11,11 @@ with open(path, 'r') as f:
load_pem_public_key(rsa_public_key_pem = , default_backend())
base64_cipher = public_key.encrypt('Ansible_AWS_EC2_Win_Password', PKCS1v15())
string_cipher = base64.b64encode(base64_cipher)
-'''
+"""
import datetime
+from unittest.mock import patch
+
import pytest
from ansible.module_utils._text import to_bytes
@@ -25,52 +23,53 @@ from ansible.module_utils._text import to_text
from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3
-from ansible_collections.community.aws.tests.unit.compat.mock import patch
+from ansible_collections.community.aws.plugins.modules.ec2_win_password import ec2_win_password
+from ansible_collections.community.aws.plugins.modules.ec2_win_password import setup_module_object
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleExitJson
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import ModuleTestCase
from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args
-from ansible_collections.community.aws.plugins.modules.ec2_win_password import setup_module_object
-from ansible_collections.community.aws.plugins.modules.ec2_win_password import ec2_win_password
-
-fixture_prefix = 'tests/unit/plugins/modules/fixtures/certs'
+fixture_prefix = "tests/unit/plugins/modules/fixtures/certs"
if not HAS_BOTO3:
pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules")
class TestEc2WinPasswordModule(ModuleTestCase):
-
# Future: It would be good to generate this data on the fly and use a
# temporary certificate and password.
- PEM_PATH = fixture_prefix + '/ec2_win_password.pem'
- UNENCRYPTED_DATA = 'Ansible_AWS_EC2_Win_Password'
- ENCRYPTED_DATA = 'L2k1iFiu/TRrjGr6Rwco/T3C7xkWxUw4+YPYpGGOmP3KDdy3hT1' \
- '8RvdDJ2i0e+y7wUcH43DwbRYSlkSyALY/nzjSV9R5NChUyVs3W5' \
- '5oiVuyTKsk0lor8dFJ9z9unq14tScZHvyQ3Nx1ggOtS18S9Pk55q' \
- 'IaCXfx26ucH76VRho='
- INSTANCE_ID = 'i-12345'
-
- @patch('ansible_collections.community.aws.plugins.modules.s3_bucket_notification.AnsibleAWSModule.client')
+ PEM_PATH = fixture_prefix + "/ec2_win_password.pem"
+ UNENCRYPTED_DATA = "Ansible_AWS_EC2_Win_Password"
+ ENCRYPTED_DATA = (
+ "L2k1iFiu/TRrjGr6Rwco/T3C7xkWxUw4+YPYpGGOmP3KDdy3hT1"
+ "8RvdDJ2i0e+y7wUcH43DwbRYSlkSyALY/nzjSV9R5NChUyVs3W5"
+ "5oiVuyTKsk0lor8dFJ9z9unq14tScZHvyQ3Nx1ggOtS18S9Pk55q"
+ "IaCXfx26ucH76VRho="
+ )
+ INSTANCE_ID = "i-12345"
+
+ @patch("ansible_collections.community.aws.plugins.modules.s3_bucket_notification.AnsibleAWSModule.client")
def test_decryption(self, mock_client):
-
path = self.PEM_PATH
- with open(path, 'r') as f:
+ with open(path, "r") as f:
pem = to_text(f.read())
with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({'instance_id': self.INSTANCE_ID,
- 'key_data': pem,
- })
+ set_module_args(
+ {
+ "instance_id": self.INSTANCE_ID,
+ "key_data": pem,
+ }
+ )
module = setup_module_object()
mock_client().get_password_data.return_value = {
- 'InstanceId': self.INSTANCE_ID,
- 'PasswordData': self.ENCRYPTED_DATA,
- 'Timestamp': datetime.datetime.now(),
+ "InstanceId": self.INSTANCE_ID,
+ "PasswordData": self.ENCRYPTED_DATA,
+ "Timestamp": datetime.datetime.now(),
}
ec2_win_password(module)
self.assertEqual(
- exec_info.exception.args[0]['win_password'],
+ exec_info.exception.args[0]["win_password"],
to_bytes(self.UNENCRYPTED_DATA),
)
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py
deleted file mode 100644
index 11de7f477..000000000
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import json
-import pytest
-
-from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args
-
-from ansible_collections.community.aws.plugins.modules import iam_password_policy
-
-
-def test_warn_if_state_not_specified(capsys):
- set_module_args({
- "min_pw_length": "8",
- "require_symbols": "false",
- "require_numbers": "true",
- "require_uppercase": "true",
- "require_lowercase": "true",
- "allow_pw_change": "true",
- "pw_max_age": "60",
- "pw_reuse_prevent": "5",
- "pw_expire": "false"
- })
- with pytest.raises(SystemExit):
- iam_password_policy.main()
- captured = capsys.readouterr()
-
- output = json.loads(captured.out)
- assert 'missing required arguments' in output.get('msg', '')
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py
index 836e2cf07..7dcd785c9 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py
@@ -1,86 +1,85 @@
# Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
import functools
-from ansible_collections.community.aws.plugins.module_utils.opensearch import (
- compare_domain_versions,
- parse_version,
-)
+
+from ansible_collections.community.aws.plugins.module_utils.opensearch import compare_domain_versions
+from ansible_collections.community.aws.plugins.module_utils.opensearch import parse_version
def test_parse_version():
test_versions = [
- ['Elasticsearch_5.5', {'engine_type': 'Elasticsearch', 'major': 5, 'minor': 5}],
- ['Elasticsearch_7.1', {'engine_type': 'Elasticsearch', 'major': 7, 'minor': 1}],
- ['Elasticsearch_7.10', {'engine_type': 'Elasticsearch', 'major': 7, 'minor': 10}],
- ['OpenSearch_1.0', {'engine_type': 'OpenSearch', 'major': 1, 'minor': 0}],
- ['OpenSearch_1.1', {'engine_type': 'OpenSearch', 'major': 1, 'minor': 1}],
- ['OpenSearch_a.b', None],
- ['OpenSearch_1.b', None],
- ['OpenSearch_1-1', None],
- ['OpenSearch_1.1.2', None],
- ['OpenSearch_foo_1.1', None],
- ['OpenSearch_1', None],
- ['OpenSearch-1.0', None],
- ['Foo_1.0', None],
+ ["Elasticsearch_5.5", {"engine_type": "Elasticsearch", "major": 5, "minor": 5}],
+ ["Elasticsearch_7.1", {"engine_type": "Elasticsearch", "major": 7, "minor": 1}],
+ ["Elasticsearch_7.10", {"engine_type": "Elasticsearch", "major": 7, "minor": 10}],
+ ["OpenSearch_1.0", {"engine_type": "OpenSearch", "major": 1, "minor": 0}],
+ ["OpenSearch_1.1", {"engine_type": "OpenSearch", "major": 1, "minor": 1}],
+ ["OpenSearch_a.b", None],
+ ["OpenSearch_1.b", None],
+ ["OpenSearch_1-1", None],
+ ["OpenSearch_1.1.2", None],
+ ["OpenSearch_foo_1.1", None],
+ ["OpenSearch_1", None],
+ ["OpenSearch-1.0", None],
+ ["Foo_1.0", None],
]
for expected in test_versions:
ret = parse_version(expected[0])
if ret != expected[1]:
- raise AssertionError(
- f"parse_version({expected[0]} returned {ret}, expected {expected[1]}")
+ raise AssertionError(f"parse_version({expected[0]} returned {ret}, expected {expected[1]}")
def test_version_compare():
test_versions = [
- ['Elasticsearch_5.5', 'Elasticsearch_5.5', 0],
- ['Elasticsearch_5.5', 'Elasticsearch_7.1', -1],
- ['Elasticsearch_7.1', 'Elasticsearch_7.1', 0],
- ['Elasticsearch_7.1', 'Elasticsearch_7.2', -1],
- ['Elasticsearch_7.1', 'Elasticsearch_7.10', -1],
- ['Elasticsearch_7.2', 'Elasticsearch_7.10', -1],
- ['Elasticsearch_7.10', 'Elasticsearch_7.2', 1],
- ['Elasticsearch_7.2', 'Elasticsearch_5.5', 1],
- ['Elasticsearch_7.2', 'OpenSearch_1.0', -1],
- ['Elasticsearch_7.2', 'OpenSearch_1.1', -1],
- ['OpenSearch_1.1', 'OpenSearch_1.1', 0],
- ['OpenSearch_1.0', 'OpenSearch_1.1', -1],
- ['OpenSearch_1.1', 'OpenSearch_1.0', 1],
- ['foo_1.1', 'OpenSearch_1.0', -1],
- ['Elasticsearch_5.5', 'foo_1.0', 1],
+ ["Elasticsearch_5.5", "Elasticsearch_5.5", 0],
+ ["Elasticsearch_5.5", "Elasticsearch_7.1", -1],
+ ["Elasticsearch_7.1", "Elasticsearch_7.1", 0],
+ ["Elasticsearch_7.1", "Elasticsearch_7.2", -1],
+ ["Elasticsearch_7.1", "Elasticsearch_7.10", -1],
+ ["Elasticsearch_7.2", "Elasticsearch_7.10", -1],
+ ["Elasticsearch_7.10", "Elasticsearch_7.2", 1],
+ ["Elasticsearch_7.2", "Elasticsearch_5.5", 1],
+ ["Elasticsearch_7.2", "OpenSearch_1.0", -1],
+ ["Elasticsearch_7.2", "OpenSearch_1.1", -1],
+ ["OpenSearch_1.1", "OpenSearch_1.1", 0],
+ ["OpenSearch_1.0", "OpenSearch_1.1", -1],
+ ["OpenSearch_1.1", "OpenSearch_1.0", 1],
+ ["foo_1.1", "OpenSearch_1.0", -1],
+ ["Elasticsearch_5.5", "foo_1.0", 1],
]
for v in test_versions:
ret = compare_domain_versions(v[0], v[1])
if ret != v[2]:
- raise AssertionError(
- f"compare({v[0]}, {v[1]} returned {ret}, expected {v[2]}")
+ raise AssertionError(f"compare({v[0]}, {v[1]} returned {ret}, expected {v[2]}")
def test_sort_versions():
input_versions = [
- 'Elasticsearch_5.6',
- 'Elasticsearch_5.5',
- 'Elasticsearch_7.10',
- 'Elasticsearch_7.2',
- 'foo_10.5',
- 'OpenSearch_1.1',
- 'OpenSearch_1.0',
- 'Elasticsearch_7.3',
+ "Elasticsearch_5.6",
+ "Elasticsearch_5.5",
+ "Elasticsearch_7.10",
+ "Elasticsearch_7.2",
+ "foo_10.5",
+ "OpenSearch_1.1",
+ "OpenSearch_1.0",
+ "Elasticsearch_7.3",
]
expected_versions = [
- 'foo_10.5',
- 'Elasticsearch_5.5',
- 'Elasticsearch_5.6',
- 'Elasticsearch_7.2',
- 'Elasticsearch_7.3',
- 'Elasticsearch_7.10',
- 'OpenSearch_1.0',
- 'OpenSearch_1.1',
+ "foo_10.5",
+ "Elasticsearch_5.5",
+ "Elasticsearch_5.6",
+ "Elasticsearch_7.2",
+ "Elasticsearch_7.3",
+ "Elasticsearch_7.10",
+ "OpenSearch_1.0",
+ "OpenSearch_1.1",
]
input_versions = sorted(input_versions, key=functools.cmp_to_key(compare_domain_versions))
if input_versions != expected_versions:
- raise AssertionError(
- f"Expected {expected_versions}, got {input_versions}")
+ raise AssertionError(f"Expected {expected_versions}, got {input_versions}")
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py
index 7b22d5b00..1342a8d58 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py
@@ -1,40 +1,41 @@
# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
__metaclass__ = type
from ansible_collections.community.aws.plugins.modules import redshift_cross_region_snapshots as rcrs
mock_status_enabled = {
- 'SnapshotCopyGrantName': 'snapshot-us-east-1-to-us-west-2',
- 'DestinationRegion': 'us-west-2',
- 'RetentionPeriod': 1,
+ "SnapshotCopyGrantName": "snapshot-us-east-1-to-us-west-2",
+ "DestinationRegion": "us-west-2",
+ "RetentionPeriod": 1,
}
mock_status_disabled = {}
mock_request_illegal = {
- 'snapshot_copy_grant': 'changed',
- 'destination_region': 'us-west-2',
- 'snapshot_retention_period': 1
+ "snapshot_copy_grant": "changed",
+ "destination_region": "us-west-2",
+ "snapshot_retention_period": 1,
}
mock_request_update = {
- 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2',
- 'destination_region': 'us-west-2',
- 'snapshot_retention_period': 3
+ "snapshot_copy_grant": "snapshot-us-east-1-to-us-west-2",
+ "destination_region": "us-west-2",
+ "snapshot_retention_period": 3,
}
mock_request_no_update = {
- 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2',
- 'destination_region': 'us-west-2',
- 'snapshot_retention_period': 1
+ "snapshot_copy_grant": "snapshot-us-east-1-to-us-west-2",
+ "destination_region": "us-west-2",
+ "snapshot_retention_period": 1,
}
def test_fail_at_unsupported_operations():
- response = rcrs.requesting_unsupported_modifications(
- mock_status_enabled, mock_request_illegal
- )
+ response = rcrs.requesting_unsupported_modifications(mock_status_enabled, mock_request_illegal)
assert response is True
@@ -44,9 +45,7 @@ def test_needs_update_true():
def test_no_change():
- response = rcrs.requesting_unsupported_modifications(
- mock_status_enabled, mock_request_no_update
- )
+ response = rcrs.requesting_unsupported_modifications(mock_status_enabled, mock_request_no_update)
needs_update_response = rcrs.needs_update(mock_status_enabled, mock_request_no_update)
assert response is False
assert needs_update_response is False
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py
new file mode 100644
index 000000000..57ed705c5
--- /dev/null
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py
@@ -0,0 +1,240 @@
+# -*- coding: utf-8 -*-
+
+# Copyright: (c) 2023, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import pytest
+
+from ansible_collections.community.aws.plugins.modules.route53_wait import detect_task_results
+
+_SINGLE_RESULT_SUCCESS = {
+ "changed": True,
+ "diff": {},
+ "failed": False,
+ "wait_id": None,
+}
+
+_SINGLE_RESULT_FAILED = {
+ "changed": False,
+ "failed": True,
+ "msg": "value of type must be one of: A, AAAA, CAA, CNAME, MX, NS, PTR, SOA, SPF, SRV, TXT, got: bar",
+}
+
+_MULTI_RESULT_SUCCESS = {
+ "ansible_loop_var": "item",
+ "changed": True,
+ "diff": {},
+ "failed": False,
+ "invocation": {
+ "module_args": {
+ "access_key": "asdf",
+ "alias": None,
+ "alias_evaluate_target_health": False,
+ "alias_hosted_zone_id": None,
+ "aws_access_key": "asdf",
+ "aws_ca_bundle": None,
+ "aws_config": None,
+ "aws_secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
+ "debug_botocore_endpoint_logs": False,
+ "endpoint_url": None,
+ "failover": None,
+ "geo_location": None,
+ "health_check": None,
+ "hosted_zone_id": None,
+ "identifier": None,
+ "overwrite": True,
+ "private_zone": False,
+ "profile": None,
+ "record": "foo.example.org",
+ "region": None,
+ "retry_interval": 500,
+ "secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
+ "session_token": None,
+ "state": "present",
+ "ttl": 300,
+ "type": "TXT",
+ "validate_certs": True,
+ "value": ["foo"],
+ "vpc_id": None,
+ "wait": False,
+ "wait_timeout": 300,
+ "weight": None,
+ "zone": "example.org",
+ },
+ },
+ "item": {"record": "foo.example.org", "value": "foo"},
+ "wait_id": None,
+}
+
+_MULTI_RESULT_FAILED = {
+ "ansible_loop_var": "item",
+ "changed": False,
+ "failed": True,
+ "invocation": {
+ "module_args": {
+ "access_key": "asdf",
+ "alias": None,
+ "alias_evaluate_target_health": False,
+ "alias_hosted_zone_id": None,
+ "aws_access_key": "asdf",
+ "aws_ca_bundle": None,
+ "aws_config": None,
+ "aws_secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
+ "debug_botocore_endpoint_logs": False,
+ "endpoint_url": None,
+ "failover": None,
+ "geo_location": None,
+ "health_check": None,
+ "hosted_zone_id": None,
+ "identifier": None,
+ "overwrite": True,
+ "private_zone": False,
+ "profile": None,
+ "record": "foo.example.org",
+ "region": None,
+ "retry_interval": 500,
+ "secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
+ "session_token": None,
+ "state": "present",
+ "ttl": 300,
+ "type": "bar",
+ "validate_certs": True,
+ "value": ["foo"],
+ "vpc_id": None,
+ "wait": False,
+ "wait_timeout": 300,
+ "weight": None,
+ "zone": "example.org",
+ },
+ },
+ "item": {"record": "foo.example.org", "value": "foo"},
+ "msg": "value of type must be one of: A, AAAA, CAA, CNAME, MX, NS, PTR, SOA, SPF, SRV, TXT, got: bar",
+}
+
+
+DETECT_TASK_RESULTS_DATA = [
+ [
+ _SINGLE_RESULT_SUCCESS,
+ [
+ (
+ "",
+ _SINGLE_RESULT_SUCCESS,
+ ),
+ ],
+ ],
+ [
+ {
+ "changed": True,
+ "msg": "All items completed",
+ "results": [
+ _MULTI_RESULT_SUCCESS,
+ ],
+ "skipped": False,
+ },
+ [
+ (
+ " for result #1",
+ _MULTI_RESULT_SUCCESS,
+ ),
+ ],
+ ],
+ [
+ _SINGLE_RESULT_FAILED,
+ [
+ (
+ "",
+ _SINGLE_RESULT_FAILED,
+ ),
+ ],
+ ],
+ [
+ {
+ "changed": False,
+ "failed": True,
+ "msg": "One or more items failed",
+ "results": [
+ _MULTI_RESULT_FAILED,
+ ],
+ "skipped": False,
+ },
+ [
+ (
+ " for result #1",
+ _MULTI_RESULT_FAILED,
+ ),
+ ],
+ ],
+]
+
+
+@pytest.mark.parametrize(
+ "input, expected",
+ DETECT_TASK_RESULTS_DATA,
+)
+def test_detect_task_results(input, expected):
+ assert list(detect_task_results(input)) == expected
+
+
+DETECT_TASK_RESULTS_FAIL_DATA = [
+ [
+ {},
+ "missing changed key",
+ [],
+ ],
+ [
+ {"changed": True},
+ "missing failed key",
+ [],
+ ],
+ [
+ {"results": None},
+ "missing changed key",
+ [],
+ ],
+ [
+ {"results": None, "changed": True, "msg": "foo"},
+ "missing skipped key",
+ [],
+ ],
+ [
+ {"results": None, "changed": True, "msg": "foo", "skipped": False},
+ "results is present, but not a list",
+ [],
+ ],
+ [
+ {"results": [None], "changed": True, "msg": "foo", "skipped": False},
+ "result 1 is not a dictionary",
+ [],
+ ],
+ [
+ {"results": [{}], "changed": True, "msg": "foo", "skipped": False},
+ "missing changed key for result 1",
+ [],
+ ],
+ [
+ {
+ "results": [{"changed": True, "failed": False, "ansible_loop_var": "item", "invocation": {}}, {}],
+ "changed": True,
+ "msg": "foo",
+ "skipped": False,
+ },
+ "missing changed key for result 2",
+ [(" for result #1", {"changed": True, "failed": False, "ansible_loop_var": "item", "invocation": {}})],
+ ],
+]
+
+
+@pytest.mark.parametrize(
+ "input, expected_exc, expected_result",
+ DETECT_TASK_RESULTS_FAIL_DATA,
+)
+def test_detect_task_fail_results(input, expected_exc, expected_result):
+ result = []
+ with pytest.raises(ValueError) as exc:
+ for res in detect_task_results(input):
+ result.append(res)
+
+ print(exc.value.args[0])
+ assert expected_exc == exc.value.args[0]
+ print(result)
+ assert expected_result == result
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py
new file mode 100644
index 000000000..518a11a3b
--- /dev/null
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+
+# Copyright: Contributors to the Ansible project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import pytest
+from botocore.exceptions import BotoCoreError
+
+from ansible_collections.community.aws.plugins.modules.ssm_inventory_info import SsmInventoryInfoFailure
+from ansible_collections.community.aws.plugins.modules.ssm_inventory_info import execute_module
+from ansible_collections.community.aws.plugins.modules.ssm_inventory_info import get_ssm_inventory
+
+
+def test_get_ssm_inventory():
+ connection = MagicMock()
+ inventory_response = MagicMock()
+ connection.get_inventory.return_value = inventory_response
+ filters = MagicMock()
+
+ assert get_ssm_inventory(connection, filters) == inventory_response
+ connection.get_inventory.assert_called_once_with(Filters=filters)
+
+
+def test_get_ssm_inventory_failure():
+ connection = MagicMock()
+ connection.get_inventory.side_effect = BotoCoreError(error="failed", operation="get_ssm_inventory")
+ filters = MagicMock()
+
+ with pytest.raises(SsmInventoryInfoFailure):
+ get_ssm_inventory(connection, filters)
+
+
+@patch("ansible_collections.community.aws.plugins.modules.ssm_inventory_info.get_ssm_inventory")
+def test_execute_module(m_get_ssm_inventory):
+ instance_id = "i-0202020202020202"
+ aws_inventory = {
+ "AgentType": "amazon-ssm-agent",
+ "AgentVersion": "3.2.582.0",
+ "ComputerName": "ip-172-31-44-166.ec2.internal",
+ "InstanceId": "i-039eb9b1f55934ab6",
+ "InstanceStatus": "Active",
+ "IpAddress": "172.31.44.166",
+ "PlatformName": "Fedora Linux",
+ "PlatformType": "Linux",
+ "PlatformVersion": "37",
+ "ResourceType": "EC2Instance",
+ }
+
+ ansible_inventory = {
+ "agent_type": "amazon-ssm-agent",
+ "agent_version": "3.2.582.0",
+ "computer_name": "ip-172-31-44-166.ec2.internal",
+ "instance_id": "i-039eb9b1f55934ab6",
+ "instance_status": "Active",
+ "ip_address": "172.31.44.166",
+ "platform_name": "Fedora Linux",
+ "platform_type": "Linux",
+ "platform_version": "37",
+ "resource_type": "EC2Instance",
+ }
+
+ m_get_ssm_inventory.return_value = {
+ "Entities": [{"Id": instance_id, "Data": {"AWS:InstanceInformation": {"Content": [aws_inventory]}}}],
+ "Status": 200,
+ }
+
+ connection = MagicMock()
+ module = MagicMock()
+ module.params = dict(instance_id=instance_id)
+ module.exit_json.side_effect = SystemExit(1)
+ module.fail_json_aws.side_effect = SystemError(2)
+
+ with pytest.raises(SystemExit):
+ execute_module(module, connection)
+
+ module.exit_json.assert_called_once_with(changed=False, ssm_inventory=ansible_inventory)
+
+
+@patch("ansible_collections.community.aws.plugins.modules.ssm_inventory_info.get_ssm_inventory")
+def test_execute_module_no_data(m_get_ssm_inventory):
+ instance_id = "i-0202020202020202"
+
+ m_get_ssm_inventory.return_value = {
+ "Entities": [{"Id": instance_id, "Data": {}}],
+ }
+
+ connection = MagicMock()
+ module = MagicMock()
+ module.params = dict(instance_id=instance_id)
+ module.exit_json.side_effect = SystemExit(1)
+ module.fail_json_aws.side_effect = SystemError(2)
+
+ with pytest.raises(SystemExit):
+ execute_module(module, connection)
+
+ module.exit_json.assert_called_once_with(changed=False, ssm_inventory={})
+
+
+@patch("ansible_collections.community.aws.plugins.modules.ssm_inventory_info.get_ssm_inventory")
+def test_execute_module_failure(m_get_ssm_inventory):
+ instance_id = "i-0202020202020202"
+
+ m_get_ssm_inventory.side_effect = SsmInventoryInfoFailure(
+ exc=BotoCoreError(error="failed", operation="get_ssm_inventory"), msg="get_ssm_inventory() failed."
+ )
+
+ connection = MagicMock()
+ module = MagicMock()
+ module.params = dict(instance_id=instance_id)
+ module.exit_json.side_effect = SystemExit(1)
+ module.fail_json_aws.side_effect = SystemError(2)
+
+ with pytest.raises(SystemError):
+ execute_module(module, connection)
diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py b/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py
index 026bf2549..a3d9e31db 100644
--- a/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py
+++ b/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py
@@ -1,23 +1,20 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
import json
+import unittest
+from unittest.mock import patch
-from ansible_collections.community.aws.tests.unit.compat import unittest
-from ansible_collections.community.aws.tests.unit.compat.mock import patch
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
def set_module_args(args):
- if '_ansible_remote_tmp' not in args:
- args['_ansible_remote_tmp'] = '/tmp'
- if '_ansible_keep_remote_files' not in args:
- args['_ansible_keep_remote_files'] = False
+ if "_ansible_remote_tmp" not in args:
+ args["_ansible_remote_tmp"] = "/tmp"
+ if "_ansible_keep_remote_files" not in args:
+ args["_ansible_keep_remote_files"] = False
- args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ args = json.dumps({"ANSIBLE_MODULE_ARGS": args})
basic._ANSIBLE_ARGS = to_bytes(args)
@@ -30,22 +27,21 @@ class AnsibleFailJson(Exception):
def exit_json(*args, **kwargs):
- if 'changed' not in kwargs:
- kwargs['changed'] = False
+ if "changed" not in kwargs:
+ kwargs["changed"] = False
raise AnsibleExitJson(kwargs)
def fail_json(*args, **kwargs):
- kwargs['failed'] = True
+ kwargs["failed"] = True
raise AnsibleFailJson(kwargs)
class ModuleTestCase(unittest.TestCase):
-
def setUp(self):
self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
self.mock_module.start()
- self.mock_sleep = patch('time.sleep')
+ self.mock_sleep = patch("time.sleep")
self.mock_sleep.start()
set_module_args({})
self.addCleanup(self.mock_module.stop)
diff --git a/ansible_collections/community/aws/tests/unit/requirements.yml b/ansible_collections/community/aws/tests/unit/requirements.yml
new file mode 100644
index 000000000..99ce82a1b
--- /dev/null
+++ b/ansible_collections/community/aws/tests/unit/requirements.yml
@@ -0,0 +1,5 @@
+---
+collections:
+ - name: https://github.com/ansible-collections/amazon.aws.git
+ type: git
+ version: main