From 7fec0b69a082aaeec72fee0612766aa42f6b1b4d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 18 Apr 2024 07:52:35 +0200 Subject: Merging upstream version 9.4.0+dfsg. Signed-off-by: Daniel Baumann --- .../community/aws/tests/unit/compat/builtins.py | 9 +- .../community/aws/tests/unit/compat/mock.py | 122 ---- .../community/aws/tests/unit/compat/unittest.py | 38 -- .../community/aws/tests/unit/constraints.txt | 6 +- .../community/aws/tests/unit/mock/loader.py | 17 +- .../community/aws/tests/unit/mock/path.py | 5 +- .../community/aws/tests/unit/mock/procenv.py | 19 +- .../community/aws/tests/unit/mock/vault_helper.py | 16 +- .../community/aws/tests/unit/mock/yaml_helper.py | 37 +- .../tests/unit/plugins/connection/test_aws_ssm.py | 95 ++- .../aws/tests/unit/plugins/inventory/__init__.py | 0 .../tests/unit/plugins/inventory/test_aws_mq.py | 638 +++++++++++++++++++++ .../aws/tests/unit/plugins/modules/conftest.py | 20 +- .../unit/plugins/modules/test_acm_certificate.py | 97 ++-- .../tests/unit/plugins/modules/test_api_gateway.py | 43 +- .../unit/plugins/modules/test_data_pipeline.py | 196 ++++--- .../test_directconnect_confirm_connection.py | 80 ++- .../modules/test_directconnect_connection.py | 61 +- .../test_directconnect_link_aggregation_group.py | 91 +-- .../test_directconnect_virtual_interface.py | 227 ++++---- .../tests/unit/plugins/modules/test_ec2_vpc_vpn.py | 261 +++++---- .../unit/plugins/modules/test_ec2_win_password.py | 59 +- .../plugins/modules/test_iam_password_policy.py | 30 - .../tests/unit/plugins/modules/test_opensearch.py | 109 ++-- .../test_redshift_cross_region_snapshots.py | 37 +- .../unit/plugins/modules/test_route53_wait.py | 240 ++++++++ .../plugins/modules/test_ssm_inventory_info.py | 117 ++++ .../aws/tests/unit/plugins/modules/utils.py | 26 +- .../community/aws/tests/unit/requirements.yml | 5 + 29 files changed, 1804 insertions(+), 897 deletions(-) delete mode 100644 ansible_collections/community/aws/tests/unit/compat/mock.py delete mode 100644 ansible_collections/community/aws/tests/unit/compat/unittest.py create mode 100644 ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py create mode 100644 ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py delete mode 100644 ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py create mode 100644 ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py create mode 100644 ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py create mode 100644 ansible_collections/community/aws/tests/unit/requirements.yml (limited to 'ansible_collections/community/aws/tests/unit') diff --git a/ansible_collections/community/aws/tests/unit/compat/builtins.py b/ansible_collections/community/aws/tests/unit/compat/builtins.py index 349d310e8..3df85be4f 100644 --- a/ansible_collections/community/aws/tests/unit/compat/builtins.py +++ b/ansible_collections/community/aws/tests/unit/compat/builtins.py @@ -16,7 +16,10 @@ # along with Ansible. If not, see . # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type # @@ -28,6 +31,6 @@ __metaclass__ = type try: import __builtin__ # pylint: disable=unused-import except ImportError: - BUILTINS = 'builtins' + BUILTINS = "builtins" else: - BUILTINS = '__builtin__' + BUILTINS = "__builtin__" diff --git a/ansible_collections/community/aws/tests/unit/compat/mock.py b/ansible_collections/community/aws/tests/unit/compat/mock.py deleted file mode 100644 index 0972cd2e8..000000000 --- a/ansible_collections/community/aws/tests/unit/compat/mock.py +++ /dev/null @@ -1,122 +0,0 @@ -# (c) 2014, Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -''' -Compat module for Python3.x's unittest.mock module -''' -import sys - -# Python 2.7 - -# Note: Could use the pypi mock library on python3.x as well as python2.x. It -# is the same as the python3 stdlib mock library - -try: - # Allow wildcard import because we really do want to import all of mock's - # symbols into this compat shim - # pylint: disable=wildcard-import,unused-wildcard-import - from unittest.mock import * -except ImportError: - # Python 2 - # pylint: disable=wildcard-import,unused-wildcard-import - try: - from mock import * - except ImportError: - print('You need the mock library installed on python2.x to run tests') - - -# Prior to 3.4.4, mock_open cannot handle binary read_data -if sys.version_info >= (3,) and sys.version_info < (3, 4, 4): - file_spec = None - - def _iterate_read_data(read_data): - # Helper for mock_open: - # Retrieve lines from read_data via a generator so that separate calls to - # readline, read, and readlines are properly interleaved - sep = b'\n' if isinstance(read_data, bytes) else '\n' - data_as_list = [l + sep for l in read_data.split(sep)] - - if data_as_list[-1] == sep: - # If the last line ended in a newline, the list comprehension will have an - # extra entry that's just a newline. Remove this. - data_as_list = data_as_list[:-1] - else: - # If there wasn't an extra newline by itself, then the file being - # emulated doesn't have a newline to end the last line remove the - # newline that our naive format() added - data_as_list[-1] = data_as_list[-1][:-1] - - for line in data_as_list: - yield line - - def mock_open(mock=None, read_data=''): - """ - A helper function to create a mock to replace the use of `open`. It works - for `open` called directly or used as a context manager. - - The `mock` argument is the mock object to configure. If `None` (the - default) then a `MagicMock` will be created for you, with the API limited - to methods or attributes available on standard file handles. - - `read_data` is a string for the `read` methoddline`, and `readlines` of the - file handle to return. This is an empty string by default. - """ - def _readlines_side_effect(*args, **kwargs): - if handle.readlines.return_value is not None: - return handle.readlines.return_value - return list(_data) - - def _read_side_effect(*args, **kwargs): - if handle.read.return_value is not None: - return handle.read.return_value - return type(read_data)().join(_data) - - def _readline_side_effect(): - if handle.readline.return_value is not None: - while True: - yield handle.readline.return_value - for line in _data: - yield line - - global file_spec - if file_spec is None: - import _io - file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) - - if mock is None: - mock = MagicMock(name='open', spec=open) - - handle = MagicMock(spec=file_spec) - handle.__enter__.return_value = handle - - _data = _iterate_read_data(read_data) - - handle.write.return_value = None - handle.read.return_value = None - handle.readline.return_value = None - handle.readlines.return_value = None - - handle.read.side_effect = _read_side_effect - handle.readline.side_effect = _readline_side_effect() - handle.readlines.side_effect = _readlines_side_effect - - mock.return_value = handle - return mock diff --git a/ansible_collections/community/aws/tests/unit/compat/unittest.py b/ansible_collections/community/aws/tests/unit/compat/unittest.py deleted file mode 100644 index 98f08ad6a..000000000 --- a/ansible_collections/community/aws/tests/unit/compat/unittest.py +++ /dev/null @@ -1,38 +0,0 @@ -# (c) 2014, Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -''' -Compat module for Python2.7's unittest module -''' - -import sys - -# Allow wildcard import because we really do want to import all of -# unittests's symbols into this compat shim -# pylint: disable=wildcard-import,unused-wildcard-import -if sys.version_info < (2, 7): - try: - # Need unittest2 on python2.6 - from unittest2 import * - except ImportError: - print('You need unittest2 installed on python2.6.x to run tests') -else: - from unittest import * diff --git a/ansible_collections/community/aws/tests/unit/constraints.txt b/ansible_collections/community/aws/tests/unit/constraints.txt index cd546e7c2..5708323f1 100644 --- a/ansible_collections/community/aws/tests/unit/constraints.txt +++ b/ansible_collections/community/aws/tests/unit/constraints.txt @@ -1,7 +1,7 @@ # Specifically run tests against the oldest versions that we support -boto3==1.18.0 -botocore==1.21.0 +botocore==1.29.0 +boto3==1.26.0 # AWS CLI has `botocore==` dependencies, provide the one that matches botocore # to avoid needing to download over a years worth of awscli wheels. -awscli==1.20.0 +awscli==1.27.0 diff --git a/ansible_collections/community/aws/tests/unit/mock/loader.py b/ansible_collections/community/aws/tests/unit/mock/loader.py index 00a584127..339a1918c 100644 --- a/ansible_collections/community/aws/tests/unit/mock/loader.py +++ b/ansible_collections/community/aws/tests/unit/mock/loader.py @@ -16,21 +16,24 @@ # along with Ansible. If not, see . # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import os from ansible.errors import AnsibleParserError +from ansible.module_utils._text import to_bytes +from ansible.module_utils._text import to_text from ansible.parsing.dataloader import DataLoader -from ansible.module_utils._text import to_bytes, to_text class DictDataLoader(DataLoader): - def __init__(self, file_mapping=None): file_mapping = {} if file_mapping is None else file_mapping - assert type(file_mapping) == dict + assert isinstance(file_mapping, dict) super(DictDataLoader, self).__init__() @@ -51,7 +54,7 @@ class DictDataLoader(DataLoader): if file_name in self._file_mapping: return (to_bytes(self._file_mapping[file_name]), False) else: - raise AnsibleParserError("file not found: %s" % file_name) + raise AnsibleParserError(f"file not found: {file_name}") def path_exists(self, path): path = to_text(path) @@ -68,7 +71,7 @@ class DictDataLoader(DataLoader): def list_directory(self, path): ret = [] path = to_text(path) - for x in (list(self._file_mapping.keys()) + self._known_directories): + for x in list(self._file_mapping.keys()) + self._known_directories: if x.startswith(path): if os.path.dirname(x) == path: ret.append(os.path.basename(x)) @@ -86,7 +89,7 @@ class DictDataLoader(DataLoader): self._known_directories = [] for path in self._file_mapping: dirname = os.path.dirname(path) - while dirname not in ('/', ''): + while dirname not in ("/", ""): self._add_known_directory(dirname) dirname = os.path.dirname(dirname) diff --git a/ansible_collections/community/aws/tests/unit/mock/path.py b/ansible_collections/community/aws/tests/unit/mock/path.py index 676b35ab8..8057e5a58 100644 --- a/ansible_collections/community/aws/tests/unit/mock/path.py +++ b/ansible_collections/community/aws/tests/unit/mock/path.py @@ -1,10 +1,7 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +from unittest.mock import MagicMock -from ansible_collections.community.aws.tests.unit.compat.mock import MagicMock from ansible.utils.path import unfrackpath - mock_unfrackpath_noop = MagicMock(spec_set=unfrackpath, side_effect=lambda x, *args, **kwargs: x) diff --git a/ansible_collections/community/aws/tests/unit/mock/procenv.py b/ansible_collections/community/aws/tests/unit/mock/procenv.py index e516a9458..0d8547f50 100644 --- a/ansible_collections/community/aws/tests/unit/mock/procenv.py +++ b/ansible_collections/community/aws/tests/unit/mock/procenv.py @@ -16,22 +16,19 @@ # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import sys import json - +import sys +import unittest from contextlib import contextmanager -from io import BytesIO, StringIO -from ansible_collections.community.aws.tests.unit.compat import unittest -from ansible.module_utils.six import PY3 +from io import BytesIO +from io import StringIO + from ansible.module_utils._text import to_bytes +from ansible.module_utils.six import PY3 @contextmanager -def swap_stdin_and_argv(stdin_data='', argv_data=tuple()): +def swap_stdin_and_argv(stdin_data="", argv_data=tuple()): """ context manager that temporarily masks the test runner's values for stdin and argv """ @@ -77,7 +74,7 @@ def swap_stdout(): class ModuleTestCase(unittest.TestCase): def setUp(self, module_args=None): if module_args is None: - module_args = {'_ansible_remote_tmp': '/tmp', '_ansible_keep_remote_files': False} + module_args = {"_ansible_remote_tmp": "/tmp", "_ansible_keep_remote_files": False} args = json.dumps(dict(ANSIBLE_MODULE_ARGS=module_args)) diff --git a/ansible_collections/community/aws/tests/unit/mock/vault_helper.py b/ansible_collections/community/aws/tests/unit/mock/vault_helper.py index b54629da4..c55228c88 100644 --- a/ansible_collections/community/aws/tests/unit/mock/vault_helper.py +++ b/ansible_collections/community/aws/tests/unit/mock/vault_helper.py @@ -1,27 +1,29 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type from ansible.module_utils._text import to_bytes - from ansible.parsing.vault import VaultSecret class TextVaultSecret(VaultSecret): - '''A secret piece of text. ie, a password. Tracks text encoding. + """A secret piece of text. ie, a password. Tracks text encoding. The text encoding of the text may not be the default text encoding so - we keep track of the encoding so we encode it to the same bytes.''' + we keep track of the encoding so we encode it to the same bytes.""" def __init__(self, text, encoding=None, errors=None, _bytes=None): super(TextVaultSecret, self).__init__() self.text = text - self.encoding = encoding or 'utf-8' + self.encoding = encoding or "utf-8" self._bytes = _bytes - self.errors = errors or 'strict' + self.errors = errors or "strict" @property def bytes(self): - '''The text encoded with encoding, unless we specifically set _bytes.''' + """The text encoded with encoding, unless we specifically set _bytes.""" return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors) diff --git a/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py b/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py index a646c0241..8c99ef40f 100644 --- a/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py +++ b/ansible_collections/community/aws/tests/unit/mock/yaml_helper.py @@ -1,18 +1,23 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import io + import yaml from ansible.module_utils.six import PY3 -from ansible.parsing.yaml.loader import AnsibleLoader from ansible.parsing.yaml.dumper import AnsibleDumper +from ansible.parsing.yaml.loader import AnsibleLoader class YamlTestUtils(object): """Mixin class to combine with a unittest.TestCase subclass.""" + def _loader(self, stream): """Vault related tests will want to override this. @@ -45,8 +50,7 @@ class YamlTestUtils(object): obj_2 = loader.get_data() # dump the gen 2 objects directory to strings - string_from_object_dump_2 = self._dump_string(obj_2, - dumper=AnsibleDumper) + string_from_object_dump_2 = self._dump_string(obj_2, dumper=AnsibleDumper) # The gen 1 and gen 2 yaml strings self.assertEqual(string_from_object_dump, string_from_object_dump_2) @@ -66,7 +70,7 @@ class YamlTestUtils(object): self.assertEqual(string_from_object_dump, string_from_object_dump_3) def _old_dump_load_cycle(self, obj): - '''Dump the passed in object to yaml, load it back up, dump again, compare.''' + """Dump the passed in object to yaml, load it back up, dump again, compare.""" stream = io.StringIO() yaml_string = self._dump_string(obj, dumper=AnsibleDumper) @@ -111,16 +115,23 @@ class YamlTestUtils(object): assert yaml_string == yaml_string_obj_from_stream assert yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string - assert (yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string == yaml_string_stream_obj_from_stream == - yaml_string_stream_obj_from_string) + assert ( + yaml_string + == yaml_string_obj_from_stream + == yaml_string_obj_from_string + == yaml_string_stream_obj_from_stream + == yaml_string_stream_obj_from_string + ) assert obj == obj_from_stream assert obj == obj_from_string assert obj == yaml_string_obj_from_stream assert obj == yaml_string_obj_from_string assert obj == obj_from_stream == obj_from_string == yaml_string_obj_from_stream == yaml_string_obj_from_string - return {'obj': obj, - 'yaml_string': yaml_string, - 'yaml_string_from_stream': yaml_string_from_stream, - 'obj_from_stream': obj_from_stream, - 'obj_from_string': obj_from_string, - 'yaml_string_obj_from_string': yaml_string_obj_from_string} + return { + "obj": obj, + "yaml_string": yaml_string, + "yaml_string_from_stream": yaml_string_from_stream, + "obj_from_stream": obj_from_stream, + "obj_from_string": obj_from_string, + "yaml_string_obj_from_string": yaml_string_obj_from_string, + } diff --git a/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py b/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py index 579cafc16..d5fcb4b1e 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py +++ b/ansible_collections/community/aws/tests/unit/plugins/connection/test_aws_ssm.py @@ -1,11 +1,11 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from io import StringIO +from unittest.mock import MagicMock +from unittest.mock import patch + import pytest -from ansible_collections.community.aws.tests.unit.compat.mock import patch, MagicMock from ansible.playbook.play_context import PlayContext from ansible.plugins.loader import connection_loader @@ -15,46 +15,45 @@ if not HAS_BOTO3: pytestmark = pytest.mark.skip("test_data_pipeline.py requires the python modules 'boto3' and 'botocore'") -class TestConnectionBaseClass(): - - @patch('os.path.exists') - @patch('subprocess.Popen') - @patch('select.poll') - @patch('boto3.client') +class TestConnectionBaseClass: + @patch("os.path.exists") + @patch("subprocess.Popen") + @patch("select.poll") + @patch("boto3.client") def test_plugins_connection_aws_ssm_start_session(self, boto_client, s_poll, s_popen, mock_ospe): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.get_option = MagicMock() - conn.get_option.side_effect = ['i1234', 'executable', 'abcd', 'i1234'] - conn.host = 'abc' + conn.get_option.side_effect = ["i1234", "executable", "abcd", "i1234"] + conn.host = "abc" mock_ospe.return_value = True boto3 = MagicMock() - boto3.client('ssm').return_value = MagicMock() + boto3.client("ssm").return_value = MagicMock() conn.start_session = MagicMock() conn._session_id = MagicMock() - conn._session_id.return_value = 's1' + conn._session_id.return_value = "s1" s_popen.return_value.stdin.write = MagicMock() s_poll.return_value = MagicMock() s_poll.return_value.register = MagicMock() s_popen.return_value.poll = MagicMock() s_popen.return_value.poll.return_value = None conn._stdin_readline = MagicMock() - conn._stdin_readline.return_value = 'abc123' - conn.SESSION_START = 'abc' + conn._stdin_readline.return_value = "abc123" + conn.SESSION_START = "abc" conn.start_session() - @patch('random.choice') + @patch("random.choice") def test_plugins_connection_aws_ssm_exec_command(self, r_choice): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) - r_choice.side_effect = ['a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b'] + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) + r_choice.side_effect = ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"] conn.MARK_LENGTH = 5 conn._session = MagicMock() conn._session.stdin.write = MagicMock() conn._wrap_command = MagicMock() - conn._wrap_command.return_value = 'cmd1' + conn._wrap_command.return_value = "cmd1" conn._flush_stderr = MagicMock() conn._windows = MagicMock() conn._windows.return_value = True @@ -67,44 +66,44 @@ class TestConnectionBaseClass(): conn._session.stdout = MagicMock() conn._session.stdout.readline = MagicMock() conn._post_process = MagicMock() - conn._post_process.return_value = 'test' - conn._session.stdout.readline.side_effect = iter(['aaaaa\n', 'Hi\n', '0\n', 'bbbbb\n']) + conn._post_process.return_value = "test" + conn._session.stdout.readline.side_effect = iter(["aaaaa\n", "Hi\n", "0\n", "bbbbb\n"]) conn.get_option = MagicMock() conn.get_option.return_value = 1 - returncode = 'a' - stdout = 'b' + returncode = "a" + stdout = "b" return (returncode, stdout, conn._flush_stderr) def test_plugins_connection_aws_ssm_prepare_terminal(self): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.is_windows = MagicMock() conn.is_windows.return_value = True def test_plugins_connection_aws_ssm_wrap_command(self): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.is_windows = MagicMock() conn.is_windows.return_value = True - return 'windows1' + return "windows1" def test_plugins_connection_aws_ssm_post_process(self): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.is_windows = MagicMock() conn.is_windows.return_value = True conn.stdout = MagicMock() returncode = 0 return returncode, conn.stdout - @patch('subprocess.Popen') + @patch("subprocess.Popen") def test_plugins_connection_aws_ssm_flush_stderr(self, s_popen): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.poll_stderr = MagicMock() conn.poll_stderr.register = MagicMock() conn.stderr = None @@ -121,37 +120,37 @@ class TestConnectionBaseClass(): # boto3.generate_presigned_url.return_value = MagicMock() # return (boto3.generate_presigned_url.return_value) - @patch('os.path.exists') + @patch("os.path.exists") def test_plugins_connection_aws_ssm_put_file(self, mock_ospe): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn._connect = MagicMock() conn._file_transport_command = MagicMock() - conn._file_transport_command.return_value = (0, 'stdout', 'stderr') - conn.put_file('/in/file', '/out/file') + conn._file_transport_command.return_value = (0, "stdout", "stderr") + conn.put_file("/in/file", "/out/file") def test_plugins_connection_aws_ssm_fetch_file(self): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn._connect = MagicMock() conn._file_transport_command = MagicMock() - conn._file_transport_command.return_value = (0, 'stdout', 'stderr') - conn.fetch_file('/in/file', '/out/file') + conn._file_transport_command.return_value = (0, "stdout", "stderr") + conn.fetch_file("/in/file", "/out/file") - @patch('subprocess.check_output') - @patch('boto3.client') + @patch("subprocess.check_output") + @patch("boto3.client") def test_plugins_connection_file_transport_command(self, boto_client, s_check_output): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.get_option = MagicMock() - conn.get_option.side_effect = ['1', '2', '3', '4', '5'] + conn.get_option.side_effect = ["1", "2", "3", "4", "5"] conn._get_url = MagicMock() - conn._get_url.side_effect = ['url1', 'url2'] + conn._get_url.side_effect = ["url1", "url2"] boto3 = MagicMock() - boto3.client('s3').return_value = MagicMock() + boto3.client("s3").return_value = MagicMock() conn.get_option.return_value = 1 get_command = MagicMock() put_command = MagicMock() @@ -161,11 +160,11 @@ class TestConnectionBaseClass(): conn.exec_command(put_command, in_data=None, sudoable=False) conn.exec_command(get_command, in_data=None, sudoable=False) - @patch('subprocess.check_output') + @patch("subprocess.check_output") def test_plugins_connection_aws_ssm_close(self, s_check_output): pc = PlayContext() new_stdin = StringIO() - conn = connection_loader.get('community.aws.aws_ssm', pc, new_stdin) + conn = connection_loader.get("community.aws.aws_ssm", pc, new_stdin) conn.instance_id = "i-12345" conn._session_id = True conn.get_option = MagicMock() @@ -174,8 +173,8 @@ class TestConnectionBaseClass(): conn._session.terminate = MagicMock() conn._session.communicate = MagicMock() conn._terminate_session = MagicMock() - conn._terminate_session.return_value = '' + conn._terminate_session.return_value = "" conn._session_id = MagicMock() - conn._session_id.return_value = 'a' + conn._session_id.return_value = "a" conn._client = MagicMock() conn.close() diff --git a/ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py b/ansible_collections/community/aws/tests/unit/plugins/inventory/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py b/ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py new file mode 100644 index 000000000..8969b4a03 --- /dev/null +++ b/ansible_collections/community/aws/tests/unit/plugins/inventory/test_aws_mq.py @@ -0,0 +1,638 @@ +# -*- coding: utf-8 -*- + +# Copyright 2023 Ali AlKhalidi <@doteast> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +import copy +import random +import string +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import pytest + +try: + import botocore +except ImportError: + # Handled by HAS_BOTO3 + pass + +from ansible.errors import AnsibleError + +from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 + +from ansible_collections.community.aws.plugins.inventory.aws_mq import InventoryModule +from ansible_collections.community.aws.plugins.inventory.aws_mq import _add_details_to_hosts +from ansible_collections.community.aws.plugins.inventory.aws_mq import _find_hosts_matching_statuses +from ansible_collections.community.aws.plugins.inventory.aws_mq import _get_broker_host_tags + +if not HAS_BOTO3: + pytestmark = pytest.mark.skip("test_aws_mq.py requires the python modules 'boto3' and 'botocore'") + + +def make_clienterror_exception(code="AccessDenied"): + return botocore.exceptions.ClientError( + { + "Error": {"Code": code, "Message": "User is not authorized to perform: xxx on resource: user yyyy"}, + "ResponseMetadata": {"RequestId": "01234567-89ab-cdef-0123-456789abcdef"}, + }, + "getXXX", + ) + + +@pytest.fixture() +def inventory(): + inventory = InventoryModule() + inventory.inventory = MagicMock() + inventory.inventory.set_variable = MagicMock() + + inventory.all_clients = MagicMock() + inventory.get_option = MagicMock() + + inventory._populate_host_vars = MagicMock() + inventory._set_composite_vars = MagicMock() + inventory._add_host_to_composed_groups = MagicMock() + inventory._add_host_to_keyed_groups = MagicMock() + + inventory.get_cache_key = MagicMock() + + inventory._cache = {} + + return inventory + + +@pytest.fixture() +def connection(): + conn = MagicMock() + return conn + + +@pytest.mark.parametrize( + "suffix,result", + [ + ("aws_mq.yml", True), + ("aws_mq.yaml", True), + ("aws_MQ.yml", False), + ("AWS_mq.yaml", False), + ], +) +def test_inventory_verify_file_suffix(inventory, suffix, result, tmp_path): + test_dir = tmp_path / "test_aws_mq" + test_dir.mkdir() + inventory_file = "inventory" + suffix + inventory_file = test_dir / inventory_file + inventory_file.write_text("my inventory") + assert result == inventory.verify_file(str(inventory_file)) + + +def test_inventory_verify_file_with_missing_file(inventory): + inventory_file = "this_file_does_not_exist_aws_mq.yml" + assert not inventory.verify_file(inventory_file) + + +def generate_random_string(with_digits=True, with_punctuation=True, length=16): + data = string.ascii_letters + if with_digits: + data += string.digits + if with_punctuation: + data += string.punctuation + return "".join([random.choice(data) for i in range(length)]) + + +@pytest.mark.parametrize( + "hosts,statuses,expected", + [ + ( + [ + {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"}, + {"host": "host2", "BrokerState": "RUNNING"}, + {"host": "host3", "BrokerState": "REBOOT_IN_PROGRESS"}, + {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"}, + {"host": "host5", "BrokerState": "CREATION_FAILED"}, + {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"}, + ], + ["RUNNING"], + [{"host": "host2", "BrokerState": "RUNNING"}], + ), + ( + [ + {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"}, + {"host": "host2", "BrokerState": "RUNNING"}, + {"host": "host3", "BrokerState": "REBOOT_IN_PROGRESS"}, + {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"}, + {"host": "host5", "BrokerState": "CREATION_FAILED"}, + {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"}, + ], + ["all"], + [ + {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"}, + {"host": "host2", "BrokerState": "RUNNING"}, + {"host": "host3", "BrokerState": "REBOOT_IN_PROGRESS"}, + {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"}, + {"host": "host5", "BrokerState": "CREATION_FAILED"}, + {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"}, + ], + ), + ( + [ + {"host": "host1", "BrokerState": "DELETION_IN_PROGRESS"}, + {"host": "host2", "BrokerState": "RUNNING"}, + {"host": "host3", "BrokerState": "CREATION_FAILED"}, + {"host": "host4", "BrokerState": "CRITICAL_ACTION_REQUIRED"}, + {"host": "host5", "BrokerState": "RUNNING"}, + {"host": "host6", "BrokerState": "CREATION_IN_PROGRESS"}, + ], + ["RUNNING"], + [ + {"host": "host2", "BrokerState": "RUNNING"}, + {"host": "host5", "BrokerState": "RUNNING"}, + ], + ), + ], +) +def test_find_hosts_matching_statuses(hosts, statuses, expected): + assert expected == _find_hosts_matching_statuses(hosts, statuses) + + +@pytest.mark.parametrize("hosts", ["", "host1", "host2,host3", "host2,host3,host1"]) +@patch("ansible_collections.community.aws.plugins.inventory.aws_mq._get_mq_hostname") +def test_inventory_format_inventory(m_get_mq_hostname, inventory, hosts): + hosts_vars = { + "host1": {"var10": "value10"}, + "host2": {"var20": "value20", "var21": "value21"}, + "host3": {"var30": "value30", "var31": "value31", "var32": "value32"}, + } + + m_get_mq_hostname.side_effect = lambda h: h["name"] + + class _inventory_host(object): + def __init__(self, name, host_vars): + self.name = name + self.vars = host_vars + + inventory.inventory = MagicMock() + inventory.inventory.get_host.side_effect = lambda x: _inventory_host(name=x, host_vars=hosts_vars.get(x)) + + hosts = [{"name": x} for x in hosts.split(",") if x] + expected = { + "_meta": {"hostvars": {x["name"]: hosts_vars.get(x["name"]) for x in hosts}}, + "aws_mq": {"hosts": [x["name"] for x in hosts]}, + } + + assert expected == inventory._format_inventory(hosts) + if hosts == []: + m_get_mq_hostname.assert_not_called() + + +@pytest.mark.parametrize("length", range(0, 10, 2)) +def test_inventory_populate(inventory, length): + group = "aws_mq" + hosts = [f"host_{int(i)}" for i in range(length)] + + inventory._add_hosts = MagicMock() + inventory._populate(hosts=hosts) + + inventory.inventory.add_group.assert_called_with("aws_mq") + + if len(hosts) == 0: + inventory.inventory._add_hosts.assert_not_called() + inventory.inventory.add_child.assert_not_called() + else: + inventory._add_hosts.assert_called_with(hosts=hosts, group=group) + inventory.inventory.add_child.assert_called_with("all", group) + + +def test_inventory_populate_from_cache(inventory): + cache_data = { + "_meta": { + "hostvars": { + "broker_A": {"var10": "value10"}, + "broker_B": {"var2": "value2"}, + "broker_C": {"var3": ["value30", "value31", "value32"]}, + } + }, + "all": {"hosts": ["broker_A", "broker_D", "broker_B", "broker_C"]}, + "aws_broker_group_A": {"hosts": ["broker_A", "broker_D"]}, + "aws_broker_group_B": {"hosts": ["broker_B"]}, + "aws_broker_group_C": {"hosts": ["broker_C"]}, + } + + inventory._populate_from_cache(cache_data) + inventory.inventory.add_group.assert_has_calls( + [ + call("aws_broker_group_A"), + call("aws_broker_group_B"), + call("aws_broker_group_C"), + ], + any_order=True, + ) + inventory.inventory.add_child.assert_has_calls( + [ + call("all", "aws_broker_group_A"), + call("all", "aws_broker_group_B"), + call("all", "aws_broker_group_C"), + ], + any_order=True, + ) + + inventory._populate_host_vars.assert_has_calls( + [ + call(["broker_A"], {"var10": "value10"}, "aws_broker_group_A"), + call(["broker_D"], {}, "aws_broker_group_A"), + call(["broker_B"], {"var2": "value2"}, "aws_broker_group_B"), + call(["broker_C"], {"var3": ["value30", "value31", "value32"]}, "aws_broker_group_C"), + ], + any_order=True, + ) + + +@pytest.mark.parametrize("detail", [{}, {"Tags": {"tag1": "value1", "tag2": "value2", "Tag3": "Value2"}}]) +def test_get_broker_host_tags(detail): + expected_tags = [ + {"Key": "tag1", "Value": "value1"}, + {"Key": "tag2", "Value": "value2"}, + {"Key": "Tag3", "Value": "Value2"}, + ] + + tags = _get_broker_host_tags(detail) + + if not detail: + assert tags == [] + else: + assert tags == expected_tags + + +@pytest.mark.parametrize("strict", [True, False]) +def test_add_details_to_hosts_with_no_hosts(connection, strict): + hosts = [] + + _add_details_to_hosts(connection, hosts, strict) + connection.describe_broker.assert_not_called() + + +def test_add_details_to_hosts_with_failure_not_strict(connection): + hosts = [{"BrokerId": "1"}] + + connection.describe_broker.side_effect = make_clienterror_exception() + + _add_details_to_hosts(connection, hosts, strict=False) + + assert hosts == [{"BrokerId": "1"}] + + +def test_add_details_to_hosts_with_failure_strict(connection): + hosts = [{"BrokerId": "1"}] + + connection.describe_broker.side_effect = make_clienterror_exception() + + with pytest.raises(AnsibleError): + _add_details_to_hosts(connection, hosts, strict=True) + + +def test_add_details_to_hosts_with_hosts(connection): + hosts = [{"BrokerId": "1"}, {"BrokerId": "2"}] + broker_hosts_tags = { + "1": {"Tags": {"tag10": "value10", "tag11": "value11"}}, + "2": {"Tags": {"tag20": "value20", "tag21": "value21", "tag22": "value22"}}, + } + connection.describe_broker.side_effect = lambda **kwargs: broker_hosts_tags.get(kwargs.get("BrokerId")) + + _add_details_to_hosts(connection, hosts, strict=False) + + assert hosts == [ + { + "BrokerId": "1", + "Tags": [ + {"Key": "tag10", "Value": "value10"}, + {"Key": "tag11", "Value": "value11"}, + ], + }, + { + "BrokerId": "2", + "Tags": [ + {"Key": "tag20", "Value": "value20"}, + {"Key": "tag21", "Value": "value21"}, + {"Key": "tag22", "Value": "value22"}, + ], + }, + ] + + +ADD_DETAILS_TO_HOSTS = "ansible_collections.community.aws.plugins.inventory.aws_mq._add_details_to_hosts" + + +@patch(ADD_DETAILS_TO_HOSTS) +def test_get_broker_hosts(m_add_details_to_hosts, inventory, connection): + broker = { + "BrokerArn": "arn:xxx:xxxx", + "BrokerId": "resource_id", + "BrokerName": "brk1", + "BrokerState": "RUNNING", + "EngineType": "RABBITMQ", + "DeploymentMode": "CLUSTER_MULTI_AZ", + } + + conn_paginator = MagicMock() + paginate = MagicMock() + + connection.get_paginator.return_value = conn_paginator + conn_paginator.paginate.return_value = paginate + + paginate.build_full_result.side_effect = lambda **kwargs: {"BrokerSummaries": [broker]} + + connection.describe_broker.return_value = {} + connection.list_brokers.return_value = {"BrokerSummaries": [broker]} + + strict = False + + result = inventory._get_broker_hosts(connection=connection, strict=strict)(paginate.build_full_result) + + assert result == [broker] + + m_add_details_to_hosts.assert_called_with(connection, result, strict) + + +@pytest.mark.parametrize("strict", [True, False]) +@patch(ADD_DETAILS_TO_HOSTS) +def test_get_broker_hosts_with_access_denied(m_add_details_to_hosts, inventory, connection, strict): + conn_paginator = MagicMock() + paginate = MagicMock() + + connection.get_paginator.return_value = conn_paginator + conn_paginator.paginate.return_value = paginate + + paginate.build_full_result.side_effect = make_clienterror_exception() + + if strict: + with pytest.raises(AnsibleError): + inventory._get_broker_hosts(connection=connection, strict=strict)(paginate.build_full_result) + else: + assert inventory._get_broker_hosts(connection=connection, strict=strict)(paginate.build_full_result) == [] + + m_add_details_to_hosts.assert_not_called() + + +@patch(ADD_DETAILS_TO_HOSTS) +def test_get_broker_hosts_with_client_error(m_add_details_to_hosts, inventory, connection): + conn_paginator = MagicMock() + paginate = MagicMock() + + connection.get_paginator.return_value = conn_paginator + conn_paginator.paginate.return_value = paginate + + paginate.build_full_result.side_effect = make_clienterror_exception(code="Unknown") + + with pytest.raises(AnsibleError): + inventory._get_broker_hosts(connection=connection, strict=False)(paginate.build_full_result) + + m_add_details_to_hosts.assert_not_called() + + +FIND_HOSTS_MATCHING_STATUSES = ( + "ansible_collections.community.aws.plugins.inventory.aws_mq._find_hosts_matching_statuses" +) + + +@pytest.mark.parametrize("regions", range(1, 5)) +@patch(FIND_HOSTS_MATCHING_STATUSES) +def test_inventory_get_all_hosts(m_find_hosts, inventory, regions): + params = { + "regions": [f"us-east-{int(i)}" for i in range(regions)], + "strict": random.choice((True, False)), + "statuses": [ + random.choice( + [ + "RUNNING", + "CREATION_IN_PROGRESS", + "REBOOT_IN_PROGRESS", + "DELETION_IN_PROGRESS", + "CRITICAL_ACTION_REQUIRED", + ] + ) + for i in range(3) + ], + } + + connections = [MagicMock() for i in range(regions)] + + inventory.all_clients.return_value = [(connections[i], f"us-east-{int(i)}") for i in range(regions)] + + ids = list(reversed(range(regions))) + broker_hosts = [{"BrokerName": f"broker_00{int(i)}"} for i in ids] + + inventory._get_broker_hosts = MagicMock() + inventory._get_broker_hosts._boto3_paginate_wrapper = MagicMock() + inventory._get_broker_hosts._boto3_paginate_wrapper.side_effect = [[i] for i in broker_hosts] + inventory._get_broker_hosts.return_value = inventory._get_broker_hosts._boto3_paginate_wrapper + + result = list(sorted(broker_hosts, key=lambda x: x["BrokerName"])) + + m_find_hosts.return_value = result + + assert result == inventory._get_all_hosts(**params) + inventory.all_clients.assert_called_with("mq") + inventory._get_broker_hosts.assert_has_calls( + [call(connections[i], params["strict"]) for i in range(regions)], any_order=True + ) + + m_find_hosts.assert_called_with(result, params["statuses"]) + + +@pytest.mark.parametrize("hostvars_prefix", [True]) +@pytest.mark.parametrize("hostvars_suffix", [True]) +@patch("ansible_collections.community.aws.plugins.inventory.aws_mq._get_mq_hostname") +def test_inventory_add_hosts(m_get_mq_hostname, inventory, hostvars_prefix, hostvars_suffix): + _options = { + "strict": random.choice((False, True)), + "compose": random.choice((False, True)), + "keyed_groups": "keyed_group_test_inventory_add_hosts", + "groups": ["all", "test_inventory_add_hosts"], + } + + if hostvars_prefix: + _options["hostvars_prefix"] = f"prefix_{generate_random_string(length=8, with_punctuation=False)}" + if hostvars_suffix: + _options["hostvars_suffix"] = f"suffix_{generate_random_string(length=8, with_punctuation=False)}" + + def _get_option_side_effect(x): + return _options.get(x) + + inventory.get_option.side_effect = _get_option_side_effect + + m_get_mq_hostname.side_effect = lambda h: h["BrokerName"] + + hosts = [ + { + "BrokerName": "broker_i_001", + "Tags": [{"Key": "Name", "Value": "broker_001"}, {"Key": "RunningEngine", "Value": "ActiveMQ"}], + "availability_zone": "us-east-1a", + }, + { + "BrokerName": "broker_i_002", + "Tags": [{"Key": "ClusterName", "Value": "test_cluster"}, {"Key": "RunningOS", "Value": "CoreOS"}], + }, + { + "BrokerName": "test_cluster", + "Tags": [{"Key": "CluserVersionOrigin", "Value": "2.0"}, {"Key": "Provider", "Value": "RedHat"}], + }, + { + "BrokerName": "another_cluster", + "Tags": [{"Key": "TestingPurpose", "Value": "Ansible"}], + "availability_zones": ["us-west-1a", "us-east-1b"], + }, + ] + + group = f"test_add_hosts_group_{generate_random_string(length=10, with_punctuation=False)}" + inventory._add_hosts(hosts, group) + + m_get_mq_hostname.assert_has_calls([call(h) for h in hosts], any_order=True) + + hosts_names = ["broker_i_001", "broker_i_002", "test_cluster", "another_cluster"] + inventory.inventory.add_host.assert_has_calls([call(name, group=group) for name in hosts_names], any_order=True) + + camel_hosts = [ + { + "broker_name": "broker_i_001", + "tags": {"Name": "broker_001", "RunningEngine": "ActiveMQ"}, + "availability_zone": "us-east-1a", + }, + {"broker_name": "broker_i_002", "tags": {"ClusterName": "test_cluster", "RunningOS": "CoreOS"}}, + {"broker_name": "test_cluster", "tags": {"CluserVersionOrigin": "2.0", "Provider": "RedHat"}}, + { + "broker_name": "another_cluster", + "tags": {"TestingPurpose": "Ansible"}, + "availability_zones": ["us-west-1a", "us-east-1b"], + }, + ] + + set_variable_calls = [] + for i in range(len(camel_hosts)): + for var, value in camel_hosts[i].items(): + if hostvars_prefix: + var = _options["hostvars_prefix"] + var + if hostvars_suffix: + var += _options["hostvars_suffix"] + set_variable_calls.append(call(hosts_names[i], var, value)) + + inventory.get_option.assert_has_calls([call("hostvars_prefix"), call("hostvars_suffix")]) + inventory.inventory.set_variable.assert_has_calls(set_variable_calls) + + if hostvars_prefix or hostvars_suffix: + tmp = [] + for host in camel_hosts: + new_host = copy.deepcopy(host) + for key in host: + new_key = key + if hostvars_prefix: + new_key = _options["hostvars_prefix"] + new_key + if hostvars_suffix: + new_key += _options["hostvars_suffix"] + new_host[new_key] = host[key] + tmp.append(new_host) + camel_hosts = tmp + + inventory._set_composite_vars.assert_has_calls( + [ + call(_options["compose"], camel_hosts[i], hosts_names[i], strict=_options["strict"]) + for i in range(len(camel_hosts)) + ], + any_order=True, + ) + inventory._add_host_to_composed_groups.assert_has_calls( + [ + call(_options["groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"]) + for i in range(len(camel_hosts)) + ], + any_order=True, + ) + inventory._add_host_to_keyed_groups.assert_has_calls( + [ + call(_options["keyed_groups"], camel_hosts[i], hosts_names[i], strict=_options["strict"]) + for i in range(len(camel_hosts)) + ], + any_order=True, + ) + + +BASE_INVENTORY_PARSE = "ansible_collections.community.aws.plugins.inventory.aws_mq.AWSInventoryBase.parse" + + +@pytest.mark.parametrize("user_cache_directive", [True, False]) +@pytest.mark.parametrize("cache", [True, False]) +@pytest.mark.parametrize("cache_hit", [True, False]) +@patch(BASE_INVENTORY_PARSE) +def test_inventory_parse(m_parse, inventory, user_cache_directive, cache, cache_hit): + inventory_data = MagicMock() + loader = MagicMock() + path = generate_random_string(with_punctuation=False, with_digits=False) + + options = {} + options["regions"] = [f"us-east-{d}" for d in range(random.randint(1, 5))] + options["strict_permissions"] = random.choice((True, False)) + options["statuses"] = generate_random_string(with_punctuation=False) + + options["cache"] = user_cache_directive + + def get_option_side_effect(v): + return options.get(v) + + inventory.get_option.side_effect = get_option_side_effect + + cache_key = path + generate_random_string() + inventory.get_cache_key.return_value = cache_key + + cache_key_value = generate_random_string() + if cache_hit: + inventory._cache[cache_key] = cache_key_value + + inventory._populate = MagicMock() + inventory._populate_from_cache = MagicMock() + inventory._get_all_hosts = MagicMock() + all_hosts = [ + {"host": f"host_{int(random.randint(1, 1000))}"}, + {"host": f"host_{int(random.randint(1, 1000))}"}, + {"host": f"host_{int(random.randint(1, 1000))}"}, + {"host": f"host_{int(random.randint(1, 1000))}"}, + ] + inventory._get_all_hosts.return_value = all_hosts + + format_cache_key_value = f"format_inventory_{all_hosts}" + inventory._format_inventory = MagicMock() + inventory._format_inventory.return_value = format_cache_key_value + + inventory.parse(inventory_data, loader, path, cache) + + m_parse.assert_called_with(inventory_data, loader, path, cache=cache) + + if not cache or not user_cache_directive or (cache and user_cache_directive and not cache_hit): + inventory._get_all_hosts.assert_called_with( + options["regions"], + options["strict_permissions"], + options["statuses"], + ) + inventory._populate.assert_called_with(all_hosts) + inventory._format_inventory.assert_called_with(all_hosts) + else: + inventory._get_all_hosts.assert_not_called() + + if cache and user_cache_directive and cache_hit: + inventory._populate_from_cache.assert_called_with(cache_key_value) + + if cache and user_cache_directive and not cache_hit or (not cache and user_cache_directive): + # validate that cache was populated + assert inventory._cache[cache_key] == format_cache_key_value diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py b/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py index a7d1e0475..ba4a1adc3 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/conftest.py @@ -1,16 +1,14 @@ # Copyright (c) 2017 Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type import json import pytest -from ansible.module_utils.six import string_types from ansible.module_utils._text import to_bytes from ansible.module_utils.common._collections_compat import MutableMapping +from ansible.module_utils.six import string_types @pytest.fixture @@ -18,14 +16,14 @@ def patch_ansible_module(request, mocker): if isinstance(request.param, string_types): args = request.param elif isinstance(request.param, MutableMapping): - if 'ANSIBLE_MODULE_ARGS' not in request.param: - request.param = {'ANSIBLE_MODULE_ARGS': request.param} - if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']: - request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp' - if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']: - request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False + if "ANSIBLE_MODULE_ARGS" not in request.param: + request.param = {"ANSIBLE_MODULE_ARGS": request.param} + if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"]["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"]["_ansible_keep_remote_files"] = False args = json.dumps(request.param) else: - raise Exception('Malformed data to the patch_ansible_module pytest fixture') + raise Exception("Malformed data to the patch_ansible_module pytest fixture") - mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args)) + mocker.patch("ansible.module_utils.basic._ANSIBLE_ARGS", to_bytes(args)) diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py index 726601fe8..608246217 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_acm_certificate.py @@ -15,18 +15,21 @@ # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type from pprint import pprint +from ansible.module_utils._text import to_text + from ansible_collections.community.aws.plugins.modules.acm_certificate import chain_compare from ansible_collections.community.aws.plugins.modules.acm_certificate import pem_chain_split -from ansible.module_utils._text import to_text def test_chain_compare(): - # The functions we're testing take module as an argument # Just so they can call module.fail_json # Let's just use None for the unit tests, @@ -34,14 +37,14 @@ def test_chain_compare(): # And if they do, fail_json is not applicable module = None - fixture_suffix = 'tests/unit/plugins/modules/fixtures/certs' + fixture_suffix = "tests/unit/plugins/modules/fixtures/certs" # Test chain split function on super simple (invalid) certs - expected = ['aaa', 'bbb', 'ccc'] + expected = ["aaa", "bbb", "ccc"] - for fname in ['simple-chain-a.cert', 'simple-chain-b.cert']: - path = fixture_suffix + '/' + fname - with open(path, 'r') as f: + for fname in ["simple-chain-a.cert", "simple-chain-b.cert"]: + path = fixture_suffix + "/" + fname + with open(path, "r") as f: pem = to_text(f.read()) actual = pem_chain_split(module, pem) actual = [a.strip() for a in actual] @@ -50,76 +53,60 @@ def test_chain_compare(): pprint(expected) print("Actual:") pprint(actual) - raise AssertionError("Failed to properly split %s" % fname) + raise AssertionError(f"Failed to properly split {fname}") # Now test real chains # chains with same same_as should be considered equal test_chains = [ - { # Original Cert chain - 'path': fixture_suffix + '/chain-1.0.cert', - 'same_as': 1, - 'length': 3 - }, - { # Same as 1.0, but longer PEM lines - 'path': fixture_suffix + '/chain-1.1.cert', - 'same_as': 1, - 'length': 3 - }, + {"path": fixture_suffix + "/chain-1.0.cert", "same_as": 1, "length": 3}, # Original Cert chain + {"path": fixture_suffix + "/chain-1.1.cert", "same_as": 1, "length": 3}, # Same as 1.0, but longer PEM lines { # Same as 1.0, but without the stuff before each -------- - 'path': fixture_suffix + '/chain-1.2.cert', - 'same_as': 1, - 'length': 3 + "path": fixture_suffix + "/chain-1.2.cert", + "same_as": 1, + "length": 3, }, { # Same as 1.0, but in a different order, so should be considered different - 'path': fixture_suffix + '/chain-1.3.cert', - 'same_as': 2, - 'length': 3 + "path": fixture_suffix + "/chain-1.3.cert", + "same_as": 2, + "length": 3, }, { # Same as 1.0, but with last link missing - 'path': fixture_suffix + '/chain-1.4.cert', - 'same_as': 3, - 'length': 2 + "path": fixture_suffix + "/chain-1.4.cert", + "same_as": 3, + "length": 2, }, { # Completely different cert chain to all the others - 'path': fixture_suffix + '/chain-4.cert', - 'same_as': 4, - 'length': 3 - }, - { # Single cert - 'path': fixture_suffix + '/a.pem', - 'same_as': 5, - 'length': 1 + "path": fixture_suffix + "/chain-4.cert", + "same_as": 4, + "length": 3, }, - { # a different, single cert - 'path': fixture_suffix + '/b.pem', - 'same_as': 6, - 'length': 1 - } + {"path": fixture_suffix + "/a.pem", "same_as": 5, "length": 1}, # Single cert + {"path": fixture_suffix + "/b.pem", "same_as": 6, "length": 1}, # a different, single cert ] for chain in test_chains: - with open(chain['path'], 'r') as f: - chain['pem_text'] = to_text(f.read()) + with open(chain["path"], "r") as f: + chain["pem_text"] = to_text(f.read()) # Test to make sure our regex isn't too greedy - chain['split'] = pem_chain_split(module, chain['pem_text']) - if len(chain['split']) != chain['length']: + chain["split"] = pem_chain_split(module, chain["pem_text"]) + if len(chain["split"]) != chain["length"]: print("Cert before split") - print(chain['pem_text']) + print(chain["pem_text"]) print("Cert after split") - pprint(chain['split']) - print("path: %s" % chain['path']) - print("Expected chain length: %d" % chain['length']) - print("Actual chain length: %d" % len(chain['split'])) - raise AssertionError("Chain %s was not split properly" % chain['path']) + pprint(chain["split"]) + print(f"path: {chain['path']}") + print(f"Expected chain length: {int(chain['length'])}") + print(f"Actual chain length: {len(chain['split'])}") + raise AssertionError(f"Chain {chain['path']} was not split properly") for chain_a in test_chains: for chain_b in test_chains: - expected = (chain_a['same_as'] == chain_b['same_as']) + expected = chain_a["same_as"] == chain_b["same_as"] # Now test the comparison function - actual = chain_compare(module, chain_a['pem_text'], chain_b['pem_text']) + actual = chain_compare(module, chain_a["pem_text"], chain_b["pem_text"]) if expected != actual: - print("Error, unexpected comparison result between \n%s\nand\n%s" % (chain_a['path'], chain_b['path'])) - print("Expected %s got %s" % (str(expected), str(actual))) + print(f"Error, unexpected comparison result between \n{chain_a['path']}\nand\n{chain_b['path']}") + print(f"Expected {str(expected)} got {str(actual)}") assert expected == actual diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py index a6f2c3e91..f0d9de8fa 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_api_gateway.py @@ -5,17 +5,21 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import sys + import pytest from ansible_collections.amazon.aws.plugins.module_utils import modules as aws_modules from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 -from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args import ansible_collections.community.aws.plugins.modules.api_gateway as agw +from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args if not HAS_BOTO3: pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules") @@ -25,7 +29,7 @@ exit_return_dict = {} def fake_exit_json(self, **kwargs): - """ store the kwargs given to exit_json rather than putting them out to stdout""" + """store the kwargs given to exit_json rather than putting them out to stdout""" global exit_return_dict exit_return_dict = kwargs sys.exit(0) @@ -33,7 +37,6 @@ def fake_exit_json(self, **kwargs): def test_upload_api(monkeypatch): class FakeConnection: - def put_rest_api(self, *args, **kwargs): assert kwargs["body"] == "the-swagger-text-is-fake" return {"msg": "success!"} @@ -46,25 +49,29 @@ def test_upload_api(monkeypatch): monkeypatch.setattr(aws_modules, "boto3_conn", return_fake_connection) monkeypatch.setattr(aws_modules.AnsibleAWSModule, "exit_json", fake_exit_json) - set_module_args({ - "api_id": "fred", - "state": "present", - "swagger_text": "the-swagger-text-is-fake", - "region": 'mars-north-1', - "_ansible_tmpdir": "/tmp/ansibl-abcdef", - }) + set_module_args( + { + "api_id": "fred", + "state": "present", + "swagger_text": "the-swagger-text-is-fake", + "region": "mars-north-1", + "_ansible_tmpdir": "/tmp/ansibl-abcdef", + } + ) with pytest.raises(SystemExit): agw.main() assert exit_return_dict["changed"] def test_warn_if_region_not_specified(): - - set_module_args({ - "name": "api_gateway", - "state": "present", - "runtime": 'python2.7', - "role": 'arn:aws:iam::123456789012:role/lambda_basic_execution', - "handler": 'lambda_python.my_handler'}) + set_module_args( + { + "name": "api_gateway", + "state": "present", + "runtime": "python2.7", + "role": "arn:aws:iam::123456789012:role/lambda_basic_execution", + "handler": "lambda_python.my_handler", + } + ) with pytest.raises(SystemExit): print(agw.main()) diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py index 1a188e8ed..a2bd06ad8 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_data_pipeline.py @@ -4,12 +4,16 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import collections -import os import json +import os + import pytest from ansible.module_utils._text import to_text @@ -19,11 +23,18 @@ try: except ImportError: pass +from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 + # Magic... Incorrectly identified by pylint as unused -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import +# isort: off +# pylint: disable=unused-import + +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +# pylint: enable=unused-import +# isort: on -from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 from ansible_collections.community.aws.plugins.modules import data_pipeline if not HAS_BOTO3: @@ -34,7 +45,7 @@ class FailException(Exception): pass -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def dp_setup(): """ Yield a FakeModule object, data pipeline id of a vanilla data pipeline, and data pipeline objects @@ -44,41 +55,41 @@ def dp_setup(): Dependencies = collections.namedtuple("Dependencies", ["module", "data_pipeline_id", "objects"]) # get objects to use to test populating and activating the data pipeline - if not os.getenv('PLACEBO_RECORD'): - objects = [{"name": "Every 1 day", - "id": "DefaultSchedule", - "fields": []}, - {"name": "Default", - "id": "Default", - "fields": []}] + if not os.getenv("PLACEBO_RECORD"): + objects = [ + {"name": "Every 1 day", "id": "DefaultSchedule", "fields": []}, + {"name": "Default", "id": "Default", "fields": []}, + ] else: - s3 = boto3.client('s3') + s3 = boto3.client("s3") data = s3.get_object(Bucket="ansible-test-datapipeline", Key="pipeline-object/new.json") - objects = json.loads(to_text(data['Body'].read())) + objects = json.loads(to_text(data["Body"].read())) # create a module with vanilla data pipeline parameters - params = {'name': 'ansible-test-create-pipeline', - 'description': 'ansible-datapipeline-unit-test', - 'state': 'present', - 'timeout': 300, - 'objects': [], - 'tags': {}, - 'parameters': [], - 'values': []} + params = { + "name": "ansible-test-create-pipeline", + "description": "ansible-datapipeline-unit-test", + "state": "present", + "timeout": 300, + "objects": [], + "tags": {}, + "parameters": [], + "values": [], + } module = FakeModule(**params) # yield a module, the data pipeline id, and the data pipeline objects (that are not yet defining the vanilla data pipeline) - if not os.getenv('PLACEBO_RECORD'): - yield Dependencies(module=module, data_pipeline_id='df-0590406117G8DPQZY2HA', objects=objects) + if not os.getenv("PLACEBO_RECORD"): + yield Dependencies(module=module, data_pipeline_id="df-0590406117G8DPQZY2HA", objects=objects) else: - connection = boto3.client('datapipeline') + connection = boto3.client("datapipeline") _changed, result = data_pipeline.create_pipeline(connection, module) - data_pipeline_id = result['data_pipeline']['pipeline_id'] + data_pipeline_id = result["data_pipeline"]["pipeline_id"] yield Dependencies(module=module, data_pipeline_id=data_pipeline_id, objects=objects) # remove data pipeline - if os.getenv('PLACEBO_RECORD'): - module.params.update(state='absent') + if os.getenv("PLACEBO_RECORD"): + module.params.update(state="absent") data_pipeline.delete_pipeline(connection, module) @@ -89,7 +100,7 @@ class FakeModule(object): def fail_json(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise FailException('FAIL') + raise FailException("FAIL") def exit_json(self, *args, **kwargs): self.exit_args = args @@ -97,91 +108,101 @@ class FakeModule(object): def test_create_pipeline_already_exists(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") changed, result = data_pipeline.create_pipeline(connection, dp_setup.module) assert changed is False - assert "Data Pipeline ansible-test-create-pipeline is present" in result['msg'] + assert "Data Pipeline ansible-test-create-pipeline is present" in result["msg"] def test_pipeline_field(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") pipeline_field_info = data_pipeline.pipeline_field(connection, dp_setup.data_pipeline_id, "@pipelineState") assert pipeline_field_info == "PENDING" def test_define_pipeline(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - changed, result = data_pipeline.define_pipeline(connection, dp_setup.module, dp_setup.objects, dp_setup.data_pipeline_id) + connection = placeboify.client("datapipeline") + changed, result = data_pipeline.define_pipeline( + connection, dp_setup.module, dp_setup.objects, dp_setup.data_pipeline_id + ) assert changed is True - assert 'has been updated' in result + assert "has been updated" in result def test_deactivate_pipeline(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") _changed, result = data_pipeline.deactivate_pipeline(connection, dp_setup.module) # XXX possible bug # assert changed is True - assert "Data Pipeline ansible-test-create-pipeline deactivated" in result['msg'] + assert "Data Pipeline ansible-test-create-pipeline deactivated" in result["msg"] def test_activate_without_population(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") with pytest.raises(FailException): _changed, _result = data_pipeline.activate_pipeline(connection, dp_setup.module) - assert dp_setup.module.exit_kwargs.get('msg') == "You need to populate your pipeline before activation." + assert dp_setup.module.exit_kwargs.get("msg") == "You need to populate your pipeline before activation." def test_create_pipeline(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-unittest-create-pipeline', - 'description': 'ansible-datapipeline-unit-test', - 'state': 'present', - 'timeout': 300, - 'tags': {}} + connection = placeboify.client("datapipeline") + params = { + "name": "ansible-unittest-create-pipeline", + "description": "ansible-datapipeline-unit-test", + "state": "present", + "timeout": 300, + "tags": {}, + } m = FakeModule(**params) changed, result = data_pipeline.create_pipeline(connection, m) assert changed is True - assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline created." + assert result["msg"] == "Data Pipeline ansible-unittest-create-pipeline created." data_pipeline.delete_pipeline(connection, m) def test_create_pipeline_with_tags(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-unittest-create-pipeline_tags', - 'description': 'ansible-datapipeline-unit-test', - 'state': 'present', - 'tags': {'ansible': 'test'}, - 'timeout': 300} + connection = placeboify.client("datapipeline") + params = { + "name": "ansible-unittest-create-pipeline_tags", + "description": "ansible-datapipeline-unit-test", + "state": "present", + "tags": {"ansible": "test"}, + "timeout": 300, + } m = FakeModule(**params) changed, result = data_pipeline.create_pipeline(connection, m) assert changed is True - assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline_tags created." + assert result["msg"] == "Data Pipeline ansible-unittest-create-pipeline_tags created." data_pipeline.delete_pipeline(connection, m) def test_delete_nonexistent_pipeline(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-test-nonexistent', - 'description': 'ansible-test-nonexistent', - 'state': 'absent', - 'objects': [], - 'tags': {'ansible': 'test'}, - 'timeout': 300} + connection = placeboify.client("datapipeline") + params = { + "name": "ansible-test-nonexistent", + "description": "ansible-test-nonexistent", + "state": "absent", + "objects": [], + "tags": {"ansible": "test"}, + "timeout": 300, + } m = FakeModule(**params) changed, _result = data_pipeline.delete_pipeline(connection, m) assert changed is False def test_delete_pipeline(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-test-nonexistent', - 'description': 'ansible-test-nonexistent', - 'state': 'absent', - 'objects': [], - 'tags': {'ansible': 'test'}, - 'timeout': 300} + connection = placeboify.client("datapipeline") + params = { + "name": "ansible-test-nonexistent", + "description": "ansible-test-nonexistent", + "state": "absent", + "objects": [], + "tags": {"ansible": "test"}, + "timeout": 300, + } m = FakeModule(**params) data_pipeline.create_pipeline(connection, m) changed, _result = data_pipeline.delete_pipeline(connection, m) @@ -189,29 +210,29 @@ def test_delete_pipeline(placeboify, maybe_sleep): def test_build_unique_id_different(): - m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id'}) - m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id-different'}) + m = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id"}) + m2 = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id-different"}) assert data_pipeline.build_unique_id(m) != data_pipeline.build_unique_id(m2) def test_build_unique_id_same(): - m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}}) - m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}}) + m = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id", "tags": {"ansible": "test"}}) + m2 = FakeModule(**{"name": "ansible-unittest-1", "description": "test-unique-id", "tags": {"ansible": "test"}}) assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2) def test_build_unique_id_obj(): # check that the object can be different and the unique id should be the same; should be able to modify objects - m = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'first': 'object'}]}) - m2 = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'second': 'object'}]}) + m = FakeModule(**{"name": "ansible-unittest-1", "objects": [{"first": "object"}]}) + m2 = FakeModule(**{"name": "ansible-unittest-1", "objects": [{"second": "object"}]}) assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2) def test_format_tags(): - unformatted_tags = {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'} + unformatted_tags = {"key1": "val1", "key2": "val2", "key3": "val3"} formatted_tags = data_pipeline.format_tags(unformatted_tags) for tag_set in formatted_tags: - assert unformatted_tags[tag_set['key']] == tag_set['value'] + assert unformatted_tags[tag_set["key"]] == tag_set["value"] def test_format_empty_tags(): @@ -221,45 +242,44 @@ def test_format_empty_tags(): def test_pipeline_description(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") dp_id = dp_setup.data_pipeline_id pipelines = data_pipeline.pipeline_description(connection, dp_id) - assert dp_id == pipelines['pipelineDescriptionList'][0]['pipelineId'] + assert dp_id == pipelines["pipelineDescriptionList"][0]["pipelineId"] def test_pipeline_description_nonexistent(placeboify, maybe_sleep): hypothetical_pipeline_id = "df-015440025PF7YGLDK47C" - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") with pytest.raises(data_pipeline.DataPipelineNotFound): data_pipeline.pipeline_description(connection, hypothetical_pipeline_id) def test_check_dp_exists_true(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") exists = data_pipeline.check_dp_exists(connection, dp_setup.data_pipeline_id) assert exists is True def test_check_dp_exists_false(placeboify, maybe_sleep): hypothetical_pipeline_id = "df-015440025PF7YGLDK47C" - connection = placeboify.client('datapipeline') + connection = placeboify.client("datapipeline") exists = data_pipeline.check_dp_exists(connection, hypothetical_pipeline_id) assert exists is False def test_check_dp_status(placeboify, maybe_sleep, dp_setup): - inactive_states = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING'] - connection = placeboify.client('datapipeline') + inactive_states = ["INACTIVE", "PENDING", "FINISHED", "DELETING"] + connection = placeboify.client("datapipeline") state = data_pipeline.check_dp_status(connection, dp_setup.data_pipeline_id, inactive_states) assert state is True def test_activate_pipeline(placeboify, maybe_sleep, dp_setup): # use objects to define pipeline before activating - connection = placeboify.client('datapipeline') - data_pipeline.define_pipeline(connection, - module=dp_setup.module, - objects=dp_setup.objects, - dp_id=dp_setup.data_pipeline_id) + connection = placeboify.client("datapipeline") + data_pipeline.define_pipeline( + connection, module=dp_setup.module, objects=dp_setup.objects, dp_id=dp_setup.data_pipeline_id + ) changed, _result = data_pipeline.activate_pipeline(connection, dp_setup.module) assert changed is True diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py index 63804415d..f65648dad 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_confirm_connection.py @@ -1,28 +1,30 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import call +from unittest.mock import patch import pytest + try: from botocore.exceptions import ClientError except ImportError: pass from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 -from ansible_collections.community.aws.tests.unit.compat.mock import call -from ansible_collections.community.aws.tests.unit.compat.mock import patch + +from ansible_collections.community.aws.plugins.modules import directconnect_confirm_connection from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleExitJson from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleFailJson from ansible_collections.community.aws.tests.unit.plugins.modules.utils import ModuleTestCase from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args -from ansible_collections.community.aws.plugins.modules import directconnect_confirm_connection - if not HAS_BOTO3: - pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules") + pytestmark = pytest.mark.skip( + "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules" + ) -@patch('ansible_collections.amazon.aws.plugins.module_utils.core.HAS_BOTO3', new=True) +@patch("ansible_collections.amazon.aws.plugins.module_utils.core.HAS_BOTO3", new=True) @patch.object(directconnect_confirm_connection.AnsibleAWSModule, "client") class TestAWSDirectConnectConfirmConnection(ModuleTestCase): def test_missing_required_parameters(self, *args): @@ -45,22 +47,18 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase): "connectionName": "ansible-test-connection", "bandwidth": "1Gbps", "ownerAccount": "123456789012", - "region": "us-west-2" + "region": "us-west-2", } ] } - set_module_args({ - "connection_id": "dxcon-fgq9rgot" - }) + set_module_args({"connection_id": "dxcon-fgq9rgot"}) with self.assertRaises(AnsibleExitJson) as exec_info: directconnect_confirm_connection.main() result = exec_info.exception.args[0] assert result["changed"] is False assert result["connection_state"] == "requested" - mock_client.return_value.describe_connections.assert_has_calls([ - call(connectionId="dxcon-fgq9rgot") - ]) + mock_client.return_value.describe_connections.assert_has_calls([call(connectionId="dxcon-fgq9rgot")]) mock_client.return_value.confirm_connection.assert_not_called() def test_get_by_name(self, mock_client): @@ -73,39 +71,31 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase): "connectionName": "ansible-test-connection", "bandwidth": "1Gbps", "ownerAccount": "123456789012", - "region": "us-west-2" + "region": "us-west-2", } ] } - set_module_args({ - "name": "ansible-test-connection" - }) + set_module_args({"name": "ansible-test-connection"}) with self.assertRaises(AnsibleExitJson) as exec_info: directconnect_confirm_connection.main() result = exec_info.exception.args[0] assert result["changed"] is False assert result["connection_state"] == "requested" - mock_client.return_value.describe_connections.assert_has_calls([ - call(), - call(connectionId="dxcon-fgq9rgot") - ]) + mock_client.return_value.describe_connections.assert_has_calls([call(), call(connectionId="dxcon-fgq9rgot")]) mock_client.return_value.confirm_connection.assert_not_called() def test_missing_connection_id(self, mock_client): mock_client.return_value.describe_connections.side_effect = ClientError( - {'Error': {'Code': 'ResourceNotFoundException'}}, 'DescribeConnection') - set_module_args({ - "connection_id": "dxcon-aaaabbbb" - }) + {"Error": {"Code": "ResourceNotFoundException"}}, "DescribeConnection" + ) + set_module_args({"connection_id": "dxcon-aaaabbbb"}) with self.assertRaises(AnsibleFailJson) as exec_info: directconnect_confirm_connection.main() result = exec_info.exception.args[0] assert result["failed"] is True - mock_client.return_value.describe_connections.assert_has_calls([ - call(connectionId="dxcon-aaaabbbb") - ]) + mock_client.return_value.describe_connections.assert_has_calls([call(connectionId="dxcon-aaaabbbb")]) def test_missing_name(self, mock_client): mock_client.return_value.describe_connections.return_value = { @@ -117,21 +107,17 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase): "connectionName": "ansible-test-connection", "bandwidth": "1Gbps", "ownerAccount": "123456789012", - "region": "us-west-2" + "region": "us-west-2", } ] } - set_module_args({ - "name": "foobar" - }) + set_module_args({"name": "foobar"}) with self.assertRaises(AnsibleFailJson) as exec_info: directconnect_confirm_connection.main() result = exec_info.exception.args[0] assert result["failed"] is True - mock_client.return_value.describe_connections.assert_has_calls([ - call() - ]) + mock_client.return_value.describe_connections.assert_has_calls([call()]) def test_confirm(self, mock_client): mock_client.return_value.describe_connections.return_value = { @@ -143,22 +129,22 @@ class TestAWSDirectConnectConfirmConnection(ModuleTestCase): "connectionName": "ansible-test-connection", "bandwidth": "1Gbps", "ownerAccount": "123456789012", - "region": "us-west-2" + "region": "us-west-2", } ] } mock_client.return_value.confirm_connection.return_value = [{}] - set_module_args({ - "connection_id": "dxcon-fgq9rgot" - }) + set_module_args({"connection_id": "dxcon-fgq9rgot"}) with self.assertRaises(AnsibleExitJson) as exec_info: directconnect_confirm_connection.main() result = exec_info.exception.args[0] assert result["changed"] is True - mock_client.return_value.describe_connections.assert_has_calls([ - call(connectionId="dxcon-fgq9rgot"), - call(connectionId="dxcon-fgq9rgot"), - call(connectionId="dxcon-fgq9rgot") - ]) + mock_client.return_value.describe_connections.assert_has_calls( + [ + call(connectionId="dxcon-fgq9rgot"), + call(connectionId="dxcon-fgq9rgot"), + call(connectionId="dxcon-fgq9rgot"), + ] + ) mock_client.return_value.confirm_connection.assert_called_once_with(connectionId="dxcon-fgq9rgot") diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py index 65ba0a3f0..f9a620843 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_connection.py @@ -4,81 +4,90 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import pytest from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 + # Magic... Incorrectly identified by pylint as unused -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import +# isort: off +# pylint: disable=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +# pylint: enable=unused-import +# isort: on from ansible_collections.community.aws.plugins.modules import directconnect_connection if not HAS_BOTO3: - pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules") + pytestmark = pytest.mark.skip( + "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules" + ) # When rerecording these tests, create a stand alone connection with default values in us-west-2 # with the name ansible-test-connection and set connection_id to the appropriate value connection_id = "dxcon-fgq9rgot" -connection_name = 'ansible-test-connection' +connection_name = "ansible-test-connection" def test_connection_status(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - status = directconnect_connection.connection_status(client, connection_id)['connection'] - assert status['connectionName'] == connection_name - assert status['connectionId'] == connection_id + client = placeboify.client("directconnect") + status = directconnect_connection.connection_status(client, connection_id)["connection"] + assert status["connectionName"] == connection_name + assert status["connectionId"] == connection_id def test_connection_exists_by_id(placeboify, maybe_sleep): - client = placeboify.client('directconnect') + client = placeboify.client("directconnect") exists = directconnect_connection.connection_exists(client, connection_id) assert exists == connection_id def test_connection_exists_by_name(placeboify, maybe_sleep): - client = placeboify.client('directconnect') + client = placeboify.client("directconnect") exists = directconnect_connection.connection_exists(client, None, connection_name) assert exists == connection_id def test_connection_does_not_exist(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - exists = directconnect_connection.connection_exists(client, 'dxcon-notthere') + client = placeboify.client("directconnect") + exists = directconnect_connection.connection_exists(client, "dxcon-notthere") assert exists is False def test_changed_properties(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - status = directconnect_connection.connection_status(client, connection_id)['connection'] + client = placeboify.client("directconnect") + status = directconnect_connection.connection_status(client, connection_id)["connection"] location = "differentlocation" - bandwidth = status['bandwidth'] + bandwidth = status["bandwidth"] assert directconnect_connection.changed_properties(status, location, bandwidth) is True def test_associations_are_not_updated(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - status = directconnect_connection.connection_status(client, connection_id)['connection'] - lag_id = status.get('lagId') + client = placeboify.client("directconnect") + status = directconnect_connection.connection_status(client, connection_id)["connection"] + lag_id = status.get("lagId") assert directconnect_connection.update_associations(client, status, connection_id, lag_id) is False def test_create_and_delete(placeboify, maybe_sleep): - client = placeboify.client('directconnect') + client = placeboify.client("directconnect") created_conn = verify_create_works(placeboify, maybe_sleep, client) verify_delete_works(placeboify, maybe_sleep, client, created_conn) def verify_create_works(placeboify, maybe_sleep, client): - created = directconnect_connection.create_connection(client=client, - location="EqSE2", - bandwidth="1Gbps", - name="ansible-test-2", - lag_id=None) - assert created.startswith('dxcon') + created = directconnect_connection.create_connection( + client=client, location="EqSE2", bandwidth="1Gbps", name="ansible-test-2", lag_id=None + ) + assert created.startswith("dxcon") return created diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py index 90c8d9604..134be7167 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_link_aggregation_group.py @@ -4,40 +4,52 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type -import pytest -import os import collections +import os -# Magic... Incorrectly identified by pylint as unused -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import +import pytest from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_conn from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info +# Magic... Incorrectly identified by pylint as unused +# isort: off +# pylint: disable=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +# pylint: enable=unused-import +# isort: on + from ansible_collections.community.aws.plugins.modules import directconnect_link_aggregation_group as lag_module if not HAS_BOTO3: - pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules") + pytestmark = pytest.mark.skip( + "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules" + ) @pytest.fixture(scope="module") def dependencies(): - # each LAG dict will contain the keys: module, connections, virtual_interfaces Dependencies = collections.namedtuple("Dependencies", ["lag_1", "lag_2"]) lag_1 = dict() lag_2 = dict() - vanilla_params = {"name": "ansible_lag_1", - "location": "EqSe2", - "num_connections": 1, - "min_links": 0, - "bandwidth": "1Gbps"} + vanilla_params = { + "name": "ansible_lag_1", + "location": "EqSe2", + "num_connections": 1, + "min_links": 0, + "bandwidth": "1Gbps", + } for lag in ("ansible_lag_1", "ansible_lag_2"): params = dict(vanilla_params) @@ -49,10 +61,19 @@ def dependencies(): if os.getenv("PLACEBO_RECORD"): region, ec2_url, aws_connect_kwargs = get_aws_connection_info(lag_1["module"], boto3=True) - client = boto3_conn(lag_1["module"], conn_type="client", resource="directconnect", region=region, endpoint=ec2_url, **aws_connect_kwargs) + client = boto3_conn( + lag_1["module"], + conn_type="client", + resource="directconnect", + region=region, + endpoint=ec2_url, + **aws_connect_kwargs, + ) # See if link aggregation groups exist for name in ("ansible_lag_1", "ansible_lag_2"): - lag_id = lag_module.create_lag(client, num_connections=1, location="EqSe2", bandwidth="1Gbps", name=name, connection_id=None) + lag_id = lag_module.create_lag( + client, num_connections=1, location="EqSe2", bandwidth="1Gbps", name=name, connection_id=None + ) if name == "ansible_lag_1": lag_1["lag_id"] = lag_id lag_1["name"] = name @@ -87,10 +108,7 @@ class FakeModule(object): def test_nonexistent_lag_status(placeboify, maybe_sleep): client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id="doesntexist", - lag_name="doesntexist", - verify=True) + exists = lag_module.lag_exists(client=client, lag_id="doesntexist", lag_name="doesntexist", verify=True) assert not exists @@ -103,28 +121,19 @@ def test_lag_status(placeboify, maybe_sleep, dependencies): def test_lag_exists(placeboify, maybe_sleep, dependencies): client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id=dependencies.lag_1.get("lag_id"), - lag_name=None, - verify=True) + exists = lag_module.lag_exists(client=client, lag_id=dependencies.lag_1.get("lag_id"), lag_name=None, verify=True) assert exists def test_lag_exists_using_name(placeboify, maybe_sleep, dependencies): client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id=None, - lag_name=dependencies.lag_1.get("name"), - verify=True) + exists = lag_module.lag_exists(client=client, lag_id=None, lag_name=dependencies.lag_1.get("name"), verify=True) assert exists def test_nonexistent_lag_does_not_exist(placeboify, maybe_sleep): client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id="dxlag-XXXXXXXX", - lag_name="doesntexist", - verify=True) + exists = lag_module.lag_exists(client=client, lag_id="dxlag-XXXXXXXX", lag_name="doesntexist", verify=True) assert not exists @@ -143,19 +152,21 @@ def test_lag_changed_true_no(placeboify, maybe_sleep, dependencies): def test_update_lag(placeboify, maybe_sleep, dependencies): client = placeboify.client("directconnect") status_before = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id")) - lag_module.update_lag(client, - lag_id=dependencies.lag_2.get("lag_id"), - lag_name="ansible_lag_2_update", - min_links=0, - wait=False, - wait_timeout=0, - num_connections=1) + lag_module.update_lag( + client, + lag_id=dependencies.lag_2.get("lag_id"), + lag_name="ansible_lag_2_update", + min_links=0, + wait=False, + wait_timeout=0, + num_connections=1, + ) status_after = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id")) assert status_before != status_after # remove the lag name from the statuses and verify it was the only thing changed - del status_before['lagName'] - del status_after['lagName'] + del status_before["lagName"] + del status_after["lagName"] assert status_before == status_after diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py index 4f0086421..62b511bde 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_directconnect_virtual_interface.py @@ -4,20 +4,31 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import pytest from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 + # Magic... Incorrectly identified by pylint as unused -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import +# isort: off +# pylint: disable=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +# pylint: enable=unused-import +# isort: on from ansible_collections.community.aws.plugins.modules import directconnect_virtual_interface if not HAS_BOTO3: - pytestmark = pytest.mark.skip("test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules") + pytestmark = pytest.mark.skip( + "test_directconnect_confirm_connection.py requires the `boto3` and `botocore` modules" + ) class FailException(Exception): @@ -46,10 +57,7 @@ def test_find_unique_vi_by_connection_id(placeboify, maybe_sleep): def test_find_unique_vi_by_vi_id(placeboify, maybe_sleep): client = placeboify.client("directconnect") - vi_id = directconnect_virtual_interface.find_unique_vi(client, - None, - "dxvif-aaaaaaaaa", - None) + vi_id = directconnect_virtual_interface.find_unique_vi(client, None, "dxvif-aaaaaaaaa", None) assert vi_id == "dxvif-aaaaaaaa" @@ -61,47 +69,38 @@ def test_find_unique_vi_by_name(placeboify, maybe_sleep): def test_find_unique_vi_returns_multiple(placeboify, maybe_sleep): client = placeboify.client("directconnect") - module = FakeModule(state="present", - id_to_associate="dxcon-aaaaaaaa", - public=False, - name=None) + module = FakeModule(state="present", id_to_associate="dxcon-aaaaaaaa", public=False, name=None) with pytest.raises(FailException): - directconnect_virtual_interface.ensure_state( - client, - module - ) + directconnect_virtual_interface.ensure_state(client, module) assert "Multiple virtual interfaces were found" in module.exit_kwargs["msg"] def test_find_unique_vi_returns_missing_for_vi_id(placeboify, maybe_sleep): client = placeboify.client("directconnect") - module = FakeModule(state="present", - id_to_associate=None, - public=False, - name=None, - virtual_interface_id="dxvif-aaaaaaaa") + module = FakeModule( + state="present", id_to_associate=None, public=False, name=None, virtual_interface_id="dxvif-aaaaaaaa" + ) with pytest.raises(FailException): - directconnect_virtual_interface.ensure_state( - client, - module - ) + directconnect_virtual_interface.ensure_state(client, module) assert "The virtual interface dxvif-aaaaaaaa does not exist" in module.exit_kwargs["msg"] def test_construct_public_vi(): - module = FakeModule(state="present", - id_to_associate=None, - public=True, - name="aaaaaaaa", - vlan=1, - bgp_asn=123, - authentication_key="aaaa", - customer_address="169.254.0.1/30", - amazon_address="169.254.0.2/30", - address_type="ipv4", - cidr=["10.88.0.0/30"], - virtual_gateway_id="xxxx", - direct_connect_gateway_id="yyyy") + module = FakeModule( + state="present", + id_to_associate=None, + public=True, + name="aaaaaaaa", + vlan=1, + bgp_asn=123, + authentication_key="aaaa", + customer_address="169.254.0.1/30", + amazon_address="169.254.0.2/30", + address_type="ipv4", + cidr=["10.88.0.0/30"], + virtual_gateway_id="xxxx", + direct_connect_gateway_id="yyyy", + ) vi = directconnect_virtual_interface.assemble_params_for_creating_vi(module.params) assert vi == { "virtualInterfaceName": "aaaaaaaa", @@ -111,24 +110,26 @@ def test_construct_public_vi(): "amazonAddress": "169.254.0.2/30", "customerAddress": "169.254.0.1/30", "addressFamily": "ipv4", - "routeFilterPrefixes": [{"cidr": "10.88.0.0/30"}] + "routeFilterPrefixes": [{"cidr": "10.88.0.0/30"}], } def test_construct_private_vi_with_virtual_gateway_id(): - module = FakeModule(state="present", - id_to_associate=None, - public=False, - name="aaaaaaaa", - vlan=1, - bgp_asn=123, - authentication_key="aaaa", - customer_address="169.254.0.1/30", - amazon_address="169.254.0.2/30", - address_type="ipv4", - cidr=["10.88.0.0/30"], - virtual_gateway_id="xxxx", - direct_connect_gateway_id="yyyy") + module = FakeModule( + state="present", + id_to_associate=None, + public=False, + name="aaaaaaaa", + vlan=1, + bgp_asn=123, + authentication_key="aaaa", + customer_address="169.254.0.1/30", + amazon_address="169.254.0.2/30", + address_type="ipv4", + cidr=["10.88.0.0/30"], + virtual_gateway_id="xxxx", + direct_connect_gateway_id="yyyy", + ) vi = directconnect_virtual_interface.assemble_params_for_creating_vi(module.params) assert vi == { "virtualInterfaceName": "aaaaaaaa", @@ -138,24 +139,26 @@ def test_construct_private_vi_with_virtual_gateway_id(): "amazonAddress": "169.254.0.2/30", "customerAddress": "169.254.0.1/30", "addressFamily": "ipv4", - "virtualGatewayId": "xxxx" + "virtualGatewayId": "xxxx", } def test_construct_private_vi_with_direct_connect_gateway_id(): - module = FakeModule(state="present", - id_to_associate=None, - public=False, - name="aaaaaaaa", - vlan=1, - bgp_asn=123, - authentication_key="aaaa", - customer_address="169.254.0.1/30", - amazon_address="169.254.0.2/30", - address_type="ipv4", - cidr=["10.88.0.0/30"], - virtual_gateway_id=None, - direct_connect_gateway_id="yyyy") + module = FakeModule( + state="present", + id_to_associate=None, + public=False, + name="aaaaaaaa", + vlan=1, + bgp_asn=123, + authentication_key="aaaa", + customer_address="169.254.0.1/30", + amazon_address="169.254.0.2/30", + address_type="ipv4", + cidr=["10.88.0.0/30"], + virtual_gateway_id=None, + direct_connect_gateway_id="yyyy", + ) vi = directconnect_virtual_interface.assemble_params_for_creating_vi(module.params) print(vi) assert vi == { @@ -166,26 +169,28 @@ def test_construct_private_vi_with_direct_connect_gateway_id(): "amazonAddress": "169.254.0.2/30", "customerAddress": "169.254.0.1/30", "addressFamily": "ipv4", - "directConnectGatewayId": "yyyy" + "directConnectGatewayId": "yyyy", } def test_create_public_vi(placeboify, maybe_sleep): client = placeboify.client("directconnect") - module = FakeModule(state="present", - id_to_associate='dxcon-aaaaaaaa', - virtual_interface_id=None, - public=True, - name="aaaaaaaa", - vlan=1, - bgp_asn=123, - authentication_key="aaaa", - customer_address="169.254.0.1/30", - amazon_address="169.254.0.2/30", - address_type="ipv4", - cidr=["10.88.0.0/30"], - virtual_gateway_id="xxxx", - direct_connect_gateway_id="yyyy") + module = FakeModule( + state="present", + id_to_associate="dxcon-aaaaaaaa", + virtual_interface_id=None, + public=True, + name="aaaaaaaa", + vlan=1, + bgp_asn=123, + authentication_key="aaaa", + customer_address="169.254.0.1/30", + amazon_address="169.254.0.2/30", + address_type="ipv4", + cidr=["10.88.0.0/30"], + virtual_gateway_id="xxxx", + direct_connect_gateway_id="yyyy", + ) changed, latest_state = directconnect_virtual_interface.ensure_state(client, module) assert changed is True assert latest_state is not None @@ -193,20 +198,22 @@ def test_create_public_vi(placeboify, maybe_sleep): def test_create_private_vi(placeboify, maybe_sleep): client = placeboify.client("directconnect") - module = FakeModule(state="present", - id_to_associate='dxcon-aaaaaaaa', - virtual_interface_id=None, - public=False, - name="aaaaaaaa", - vlan=1, - bgp_asn=123, - authentication_key="aaaa", - customer_address="169.254.0.1/30", - amazon_address="169.254.0.2/30", - address_type="ipv4", - cidr=["10.88.0.0/30"], - virtual_gateway_id="xxxx", - direct_connect_gateway_id="yyyy") + module = FakeModule( + state="present", + id_to_associate="dxcon-aaaaaaaa", + virtual_interface_id=None, + public=False, + name="aaaaaaaa", + vlan=1, + bgp_asn=123, + authentication_key="aaaa", + customer_address="169.254.0.1/30", + amazon_address="169.254.0.2/30", + address_type="ipv4", + cidr=["10.88.0.0/30"], + virtual_gateway_id="xxxx", + direct_connect_gateway_id="yyyy", + ) changed, latest_state = directconnect_virtual_interface.ensure_state(client, module) assert changed is True assert latest_state is not None @@ -214,20 +221,22 @@ def test_create_private_vi(placeboify, maybe_sleep): def test_delete_vi(placeboify, maybe_sleep): client = placeboify.client("directconnect") - module = FakeModule(state="absent", - id_to_associate='dxcon-aaaaaaaa', - virtual_interface_id='dxvif-aaaaaaaa', - public=False, - name="aaaaaaaa", - vlan=1, - bgp_asn=123, - authentication_key="aaaa", - customer_address="169.254.0.1/30", - amazon_address="169.254.0.2/30", - address_type="ipv4", - cidr=["10.88.0.0/30"], - virtual_gateway_id=None, - direct_connect_gateway_id="yyyy") + module = FakeModule( + state="absent", + id_to_associate="dxcon-aaaaaaaa", + virtual_interface_id="dxvif-aaaaaaaa", + public=False, + name="aaaaaaaa", + vlan=1, + bgp_asn=123, + authentication_key="aaaa", + customer_address="169.254.0.1/30", + amazon_address="169.254.0.2/30", + address_type="ipv4", + cidr=["10.88.0.0/30"], + virtual_gateway_id=None, + direct_connect_gateway_id="yyyy", + ) changed, latest_state = directconnect_virtual_interface.ensure_state(client, module) assert changed is True assert latest_state == {} diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py index 88a1aea83..2b5db4226 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_vpc_vpn.py @@ -1,21 +1,29 @@ # (c) 2017 Red Hat Inc. # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import os -import pytest -# Magic... Incorrectly identified by pylint as unused -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify # pylint: disable=unused-import -from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep # pylint: disable=unused-import +import pytest -import ansible_collections.amazon.aws.plugins.module_utils.modules as aws_modules import ansible_collections.amazon.aws.plugins.module_utils.retries as aws_retries -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_conn from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_tag_list_to_ansible_dict +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info + +# Magic... Incorrectly identified by pylint as unused +# isort: off +# pylint: disable=unused-import +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import maybe_sleep +from ansible_collections.amazon.aws.tests.unit.utils.amazon_placebo_fixtures import placeboify + +# pylint: enable=unused-import +# isort: on from ansible_collections.community.aws.plugins.modules import ec2_vpc_vpn @@ -31,12 +39,12 @@ class FakeModule(object): def fail_json_aws(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise FailException('FAIL') + raise FailException("FAIL") def fail_json(self, *args, **kwargs): self.exit_args = args self.exit_kwargs = kwargs - raise FailException('FAIL') + raise FailException("FAIL") def exit_json(self, *args, **kwargs): self.exit_args = args @@ -45,36 +53,44 @@ class FakeModule(object): def get_vgw(connection): # see if two vgw exist and return them if so - vgw = connection.describe_vpn_gateways(Filters=[{'Name': 'tag:Ansible_VPN', 'Values': ['Test']}]) - if len(vgw['VpnGateways']) >= 2: - return [vgw['VpnGateways'][0]['VpnGatewayId'], vgw['VpnGateways'][1]['VpnGatewayId']] + vgw = connection.describe_vpn_gateways(Filters=[{"Name": "tag:Ansible_VPN", "Values": ["Test"]}]) + if len(vgw["VpnGateways"]) >= 2: + return [vgw["VpnGateways"][0]["VpnGatewayId"], vgw["VpnGateways"][1]["VpnGatewayId"]] # otherwise create two and return them - vgw_1 = connection.create_vpn_gateway(Type='ipsec.1') - vgw_2 = connection.create_vpn_gateway(Type='ipsec.1') + vgw_1 = connection.create_vpn_gateway(Type="ipsec.1") + vgw_2 = connection.create_vpn_gateway(Type="ipsec.1") for resource in (vgw_1, vgw_2): - connection.create_tags(Resources=[resource['VpnGateway']['VpnGatewayId']], Tags=[{'Key': 'Ansible_VPN', 'Value': 'Test'}]) - return [vgw_1['VpnGateway']['VpnGatewayId'], vgw_2['VpnGateway']['VpnGatewayId']] + connection.create_tags( + Resources=[resource["VpnGateway"]["VpnGatewayId"]], Tags=[{"Key": "Ansible_VPN", "Value": "Test"}] + ) + return [vgw_1["VpnGateway"]["VpnGatewayId"], vgw_2["VpnGateway"]["VpnGatewayId"]] def get_cgw(connection): # see if two cgw exist and return them if so - cgw = connection.describe_customer_gateways(DryRun=False, Filters=[{'Name': 'state', 'Values': ['available']}, - {'Name': 'tag:Name', 'Values': ['Ansible-CGW']}]) - if len(cgw['CustomerGateways']) >= 2: - return [cgw['CustomerGateways'][0]['CustomerGatewayId'], cgw['CustomerGateways'][1]['CustomerGatewayId']] + cgw = connection.describe_customer_gateways( + DryRun=False, + Filters=[{"Name": "state", "Values": ["available"]}, {"Name": "tag:Name", "Values": ["Ansible-CGW"]}], + ) + if len(cgw["CustomerGateways"]) >= 2: + return [cgw["CustomerGateways"][0]["CustomerGatewayId"], cgw["CustomerGateways"][1]["CustomerGatewayId"]] # otherwise create and return them - cgw_1 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='9.8.7.6', BgpAsn=65000) - cgw_2 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='5.4.3.2', BgpAsn=65000) + cgw_1 = connection.create_customer_gateway(DryRun=False, Type="ipsec.1", PublicIp="9.8.7.6", BgpAsn=65000) + cgw_2 = connection.create_customer_gateway(DryRun=False, Type="ipsec.1", PublicIp="5.4.3.2", BgpAsn=65000) for resource in (cgw_1, cgw_2): - connection.create_tags(Resources=[resource['CustomerGateway']['CustomerGatewayId']], Tags=[{'Key': 'Ansible-CGW', 'Value': 'Test'}]) - return [cgw_1['CustomerGateway']['CustomerGatewayId'], cgw_2['CustomerGateway']['CustomerGatewayId']] + connection.create_tags( + Resources=[resource["CustomerGateway"]["CustomerGatewayId"]], Tags=[{"Key": "Ansible-CGW", "Value": "Test"}] + ) + return [cgw_1["CustomerGateway"]["CustomerGatewayId"], cgw_2["CustomerGateway"]["CustomerGatewayId"]] def get_dependencies(): - if os.getenv('PLACEBO_RECORD'): + if os.getenv("PLACEBO_RECORD"): module = FakeModule(**{}) region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True) - connection = boto3_conn(module, conn_type='client', resource='ec2', region=region, endpoint=ec2_url, **aws_connect_kwargs) + connection = boto3_conn( + module, conn_type="client", resource="ec2", region=region, endpoint=ec2_url, **aws_connect_kwargs + ) vgw = get_vgw(connection) cgw = get_cgw(connection) else: @@ -85,9 +101,9 @@ def get_dependencies(): def setup_mod_conn(placeboify, params): - conn = placeboify.client('ec2') + conn = placeboify.client("ec2") retry_decorator = aws_retries.AWSRetry.jittered_backoff() - wrapped_conn = aws_modules._RetryingBotoClientWrapper(conn, retry_decorator) + wrapped_conn = aws_retries.RetryingBotoClientWrapper(conn, retry_decorator) m = FakeModule(**params) return m, wrapped_conn @@ -97,23 +113,25 @@ def make_params(cgw, vgw, tags=None, filters=None, routes=None): filters = {} if filters is None else filters routes = [] if routes is None else routes - return {'customer_gateway_id': cgw, - 'static_only': True, - 'vpn_gateway_id': vgw, - 'connection_type': 'ipsec.1', - 'purge_tags': True, - 'tags': tags, - 'filters': filters, - 'routes': routes, - 'delay': 15, - 'wait_timeout': 600} + return { + "customer_gateway_id": cgw, + "static_only": True, + "vpn_gateway_id": vgw, + "connection_type": "ipsec.1", + "purge_tags": True, + "tags": tags, + "filters": filters, + "routes": routes, + "delay": 15, + "wait_timeout": 600, + } def make_conn(placeboify, module, connection): - customer_gateway_id = module.params['customer_gateway_id'] - static_only = module.params['static_only'] - vpn_gateway_id = module.params['vpn_gateway_id'] - connection_type = module.params['connection_type'] + customer_gateway_id = module.params["customer_gateway_id"] + static_only = module.params["static_only"] + vpn_gateway_id = module.params["vpn_gateway_id"] + connection_type = module.params["connection_type"] changed = True vpn = ec2_vpc_vpn.create_connection(connection, customer_gateway_id, static_only, vpn_gateway_id, connection_type) return changed, vpn @@ -124,7 +142,7 @@ def tear_down_conn(placeboify, connection, vpn_connection_id): def setup_req(placeboify, number_of_results=1): - ''' returns dependencies for VPN connections ''' + """returns dependencies for VPN connections""" assert number_of_results in (1, 2) results = [] cgw, vgw = get_dependencies() @@ -133,7 +151,7 @@ def setup_req(placeboify, number_of_results=1): m, conn = setup_mod_conn(placeboify, params) vpn = ec2_vpc_vpn.ensure_present(conn, params)[1] - results.append({'module': m, 'connection': conn, 'vpn': vpn, 'params': params}) + results.append({"module": m, "connection": conn, "vpn": vpn, "params": params}) if number_of_results == 1: return results[0] else: @@ -144,41 +162,44 @@ def test_find_connection_vpc_conn_id(placeboify, maybe_sleep): # setup dependencies for 2 vpn connections dependencies = setup_req(placeboify, 2) dep1, dep2 = dependencies[0], dependencies[1] - params1, vpn1, _m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection'] - _params2, vpn2, _m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection'] + params1, vpn1, _m1, conn1 = dep1["params"], dep1["vpn"], dep1["module"], dep1["connection"] + _params2, vpn2, _m2, conn2 = dep2["params"], dep2["vpn"], dep2["module"], dep2["connection"] # find the connection with a vpn_connection_id and assert it is the expected one - assert vpn1['VpnConnectionId'] == ec2_vpc_vpn.find_connection(conn1, params1, vpn1['VpnConnectionId'])['VpnConnectionId'] + assert ( + vpn1["VpnConnectionId"] + == ec2_vpc_vpn.find_connection(conn1, params1, vpn1["VpnConnectionId"])["VpnConnectionId"] + ) - tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId']) - tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId']) + tear_down_conn(placeboify, conn1, vpn1["VpnConnectionId"]) + tear_down_conn(placeboify, conn2, vpn2["VpnConnectionId"]) def test_find_connection_filters(placeboify, maybe_sleep): # setup dependencies for 2 vpn connections dependencies = setup_req(placeboify, 2) dep1, dep2 = dependencies[0], dependencies[1] - params1, vpn1, _m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection'] - params2, vpn2, _m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection'] + params1, vpn1, _m1, conn1 = dep1["params"], dep1["vpn"], dep1["module"], dep1["connection"] + params2, vpn2, _m2, conn2 = dep2["params"], dep2["vpn"], dep2["module"], dep2["connection"] # update to different tags - params1.update(tags={'Wrong': 'Tag'}) - params2.update(tags={'Correct': 'Tag'}) + params1.update(tags={"Wrong": "Tag"}) + params2.update(tags={"Correct": "Tag"}) ec2_vpc_vpn.ensure_present(conn1, params1) ec2_vpc_vpn.ensure_present(conn2, params2) # create some new parameters for a filter - params = {'filters': {'tags': {'Correct': 'Tag'}}} + params = {"filters": {"tags": {"Correct": "Tag"}}} # find the connection that has the parameters above found = ec2_vpc_vpn.find_connection(conn1, params) # assert the correct connection was found - assert found['VpnConnectionId'] == vpn2['VpnConnectionId'] + assert found["VpnConnectionId"] == vpn2["VpnConnectionId"] # delete the connections - tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId']) - tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId']) + tear_down_conn(placeboify, conn1, vpn1["VpnConnectionId"]) + tear_down_conn(placeboify, conn2, vpn2["VpnConnectionId"]) def test_find_connection_insufficient_filters(placeboify, maybe_sleep): @@ -186,15 +207,15 @@ def test_find_connection_insufficient_filters(placeboify, maybe_sleep): cgw, vgw = get_dependencies() # create two connections with the same tags - params = make_params(cgw[0], vgw[0], tags={'Correct': 'Tag'}) - params2 = make_params(cgw[1], vgw[1], tags={'Correct': 'Tag'}) + params = make_params(cgw[0], vgw[0], tags={"Correct": "Tag"}) + params2 = make_params(cgw[1], vgw[1], tags={"Correct": "Tag"}) m, conn = setup_mod_conn(placeboify, params) m2, conn2 = setup_mod_conn(placeboify, params2) vpn1 = ec2_vpc_vpn.ensure_present(conn, m.params)[1] vpn2 = ec2_vpc_vpn.ensure_present(conn2, m2.params)[1] # reset the parameters so only filtering by tags will occur - m.params = {'filters': {'tags': {'Correct': 'Tag'}}} + m.params = {"filters": {"tags": {"Correct": "Tag"}}} expected_message = "More than one matching VPN connection was found" # assert that multiple matching connections have been found @@ -202,13 +223,13 @@ def test_find_connection_insufficient_filters(placeboify, maybe_sleep): ec2_vpc_vpn.find_connection(conn, m.params) # delete the connections - tear_down_conn(placeboify, conn, vpn1['VpnConnectionId']) - tear_down_conn(placeboify, conn, vpn2['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn1["VpnConnectionId"]) + tear_down_conn(placeboify, conn, vpn2["VpnConnectionId"]) def test_find_connection_nonexistent(placeboify, maybe_sleep): # create parameters but don't create a connection with them - params = {'filters': {'tags': {'Correct': 'Tag'}}} + params = {"filters": {"tags": {"Correct": "Tag"}}} m, conn = setup_mod_conn(placeboify, params) # try to find a connection with matching parameters and assert None are found @@ -226,38 +247,48 @@ def test_create_connection(placeboify, maybe_sleep): # assert that changed is true and that there is a connection id assert changed is True - assert 'VpnConnectionId' in vpn + assert "VpnConnectionId" in vpn # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) def test_create_connection_that_exists(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + params, vpn, _m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # try to recreate the same connection changed, vpn2 = ec2_vpc_vpn.ensure_present(conn, params) # nothing should have changed assert changed is False - assert vpn['VpnConnectionId'] == vpn2['VpnConnectionId'] + assert vpn["VpnConnectionId"] == vpn2["VpnConnectionId"] # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) def test_modify_deleted_connection(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - _params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + _params, vpn, m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # delete it - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) # try to update the deleted connection - m.params.update(vpn_connection_id=vpn['VpnConnectionId']) + m.params.update(vpn_connection_id=vpn["VpnConnectionId"]) expected_message = "no VPN connection available or pending with that id" with pytest.raises(ec2_vpc_vpn.VPNConnectionException, match=expected_message): ec2_vpc_vpn.ensure_present(conn, m.params) @@ -266,7 +297,12 @@ def test_modify_deleted_connection(placeboify, maybe_sleep): def test_delete_connection(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - _params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + _params, vpn, m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # delete it changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params) @@ -277,7 +313,7 @@ def test_delete_connection(placeboify, maybe_sleep): def test_delete_nonexistent_connection(placeboify, maybe_sleep): # create parameters and ensure any connection matching (None) is deleted - params = {'filters': {'tags': {'ThisConnection': 'DoesntExist'}}, 'delay': 15, 'wait_timeout': 600} + params = {"filters": {"tags": {"ThisConnection": "DoesntExist"}}, "delay": 15, "wait_timeout": 600} m, conn = setup_mod_conn(placeboify, params) changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params) @@ -288,83 +324,112 @@ def test_delete_nonexistent_connection(placeboify, maybe_sleep): def test_check_for_update_tags(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - _params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + _params, vpn, m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # add and remove a number of tags - m.params['tags'] = {'One': 'one', 'Two': 'two'} + m.params["tags"] = {"One": "one", "Two": "two"} ec2_vpc_vpn.ensure_present(conn, m.params) - m.params['tags'] = {'Two': 'two', 'Three': 'three', 'Four': 'four'} - changes = ec2_vpc_vpn.check_for_update(conn, m.params, vpn['VpnConnectionId']) + m.params["tags"] = {"Two": "two", "Three": "three", "Four": "four"} + changes = ec2_vpc_vpn.check_for_update(conn, m.params, vpn["VpnConnectionId"]) - flat_dict_changes = boto3_tag_list_to_ansible_dict(changes['tags_to_add']) - correct_changes = boto3_tag_list_to_ansible_dict([{'Key': 'Three', 'Value': 'three'}, {'Key': 'Four', 'Value': 'four'}]) + flat_dict_changes = boto3_tag_list_to_ansible_dict(changes["tags_to_add"]) + correct_changes = boto3_tag_list_to_ansible_dict( + [{"Key": "Three", "Value": "three"}, {"Key": "Four", "Value": "four"}] + ) assert flat_dict_changes == correct_changes - assert changes['tags_to_remove'] == ['One'] + assert changes["tags_to_remove"] == ["One"] # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) def test_check_for_update_nonmodifiable_attr(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - current_vgw = params['vpn_gateway_id'] + params, vpn, m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) + current_vgw = params["vpn_gateway_id"] # update a parameter that isn't modifiable m.params.update(vpn_gateway_id="invalidchange") - expected_message = 'You cannot modify vpn_gateway_id, the current value of which is {0}. Modifiable VPN connection attributes are'.format(current_vgw) + expected_message = f"You cannot modify vpn_gateway_id, the current value of which is {current_vgw}. Modifiable VPN connection attributes are" with pytest.raises(ec2_vpc_vpn.VPNConnectionException, match=expected_message): - ec2_vpc_vpn.check_for_update(conn, m.params, vpn['VpnConnectionId']) + ec2_vpc_vpn.check_for_update(conn, m.params, vpn["VpnConnectionId"]) # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) def test_add_tags(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + params, vpn, _m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # add a tag to the connection - ec2_vpc_vpn.add_tags(conn, vpn['VpnConnectionId'], add=[{'Key': 'Ansible-Test', 'Value': 'VPN'}]) + ec2_vpc_vpn.add_tags(conn, vpn["VpnConnectionId"], add=[{"Key": "Ansible-Test", "Value": "VPN"}]) # assert tag is there current_vpn = ec2_vpc_vpn.find_connection(conn, params) - assert current_vpn['Tags'] == [{'Key': 'Ansible-Test', 'Value': 'VPN'}] + assert current_vpn["Tags"] == [{"Key": "Ansible-Test", "Value": "VPN"}] # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) def test_remove_tags(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + params, vpn, _m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # remove a tag from the connection - ec2_vpc_vpn.remove_tags(conn, vpn['VpnConnectionId'], remove=['Ansible-Test']) + ec2_vpc_vpn.remove_tags(conn, vpn["VpnConnectionId"], remove=["Ansible-Test"]) # assert the tag is gone current_vpn = ec2_vpc_vpn.find_connection(conn, params) - assert 'Tags' not in current_vpn + assert "Tags" not in current_vpn # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) def test_add_routes(placeboify, maybe_sleep): # setup dependencies for 1 vpn connection dependencies = setup_req(placeboify, 1) - params, vpn, _m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] + params, vpn, _m, conn = ( + dependencies["params"], + dependencies["vpn"], + dependencies["module"], + dependencies["connection"], + ) # create connection with a route - ec2_vpc_vpn.add_routes(conn, vpn['VpnConnectionId'], ['195.168.2.0/24', '196.168.2.0/24']) + ec2_vpc_vpn.add_routes(conn, vpn["VpnConnectionId"], ["195.168.2.0/24", "196.168.2.0/24"]) # assert both routes are there current_vpn = ec2_vpc_vpn.find_connection(conn, params) - assert set(each['DestinationCidrBlock'] for each in current_vpn['Routes']) == set(['195.168.2.0/24', '196.168.2.0/24']) + assert set(each["DestinationCidrBlock"] for each in current_vpn["Routes"]) == set( + ["195.168.2.0/24", "196.168.2.0/24"] + ) # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) + tear_down_conn(placeboify, conn, vpn["VpnConnectionId"]) diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py index 939620120..7f832aa71 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ec2_win_password.py @@ -1,8 +1,4 @@ -from __future__ import (absolute_import, division, print_function) - -__metaclass__ = type - -''' +""" Commands to encrypt a message that can be decrypted: from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key @@ -15,9 +11,11 @@ with open(path, 'r') as f: load_pem_public_key(rsa_public_key_pem = , default_backend()) base64_cipher = public_key.encrypt('Ansible_AWS_EC2_Win_Password', PKCS1v15()) string_cipher = base64.b64encode(base64_cipher) -''' +""" import datetime +from unittest.mock import patch + import pytest from ansible.module_utils._text import to_bytes @@ -25,52 +23,53 @@ from ansible.module_utils._text import to_text from ansible_collections.amazon.aws.plugins.module_utils.botocore import HAS_BOTO3 -from ansible_collections.community.aws.tests.unit.compat.mock import patch +from ansible_collections.community.aws.plugins.modules.ec2_win_password import ec2_win_password +from ansible_collections.community.aws.plugins.modules.ec2_win_password import setup_module_object from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleExitJson from ansible_collections.community.aws.tests.unit.plugins.modules.utils import ModuleTestCase from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args -from ansible_collections.community.aws.plugins.modules.ec2_win_password import setup_module_object -from ansible_collections.community.aws.plugins.modules.ec2_win_password import ec2_win_password - -fixture_prefix = 'tests/unit/plugins/modules/fixtures/certs' +fixture_prefix = "tests/unit/plugins/modules/fixtures/certs" if not HAS_BOTO3: pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules") class TestEc2WinPasswordModule(ModuleTestCase): - # Future: It would be good to generate this data on the fly and use a # temporary certificate and password. - PEM_PATH = fixture_prefix + '/ec2_win_password.pem' - UNENCRYPTED_DATA = 'Ansible_AWS_EC2_Win_Password' - ENCRYPTED_DATA = 'L2k1iFiu/TRrjGr6Rwco/T3C7xkWxUw4+YPYpGGOmP3KDdy3hT1' \ - '8RvdDJ2i0e+y7wUcH43DwbRYSlkSyALY/nzjSV9R5NChUyVs3W5' \ - '5oiVuyTKsk0lor8dFJ9z9unq14tScZHvyQ3Nx1ggOtS18S9Pk55q' \ - 'IaCXfx26ucH76VRho=' - INSTANCE_ID = 'i-12345' - - @patch('ansible_collections.community.aws.plugins.modules.s3_bucket_notification.AnsibleAWSModule.client') + PEM_PATH = fixture_prefix + "/ec2_win_password.pem" + UNENCRYPTED_DATA = "Ansible_AWS_EC2_Win_Password" + ENCRYPTED_DATA = ( + "L2k1iFiu/TRrjGr6Rwco/T3C7xkWxUw4+YPYpGGOmP3KDdy3hT1" + "8RvdDJ2i0e+y7wUcH43DwbRYSlkSyALY/nzjSV9R5NChUyVs3W5" + "5oiVuyTKsk0lor8dFJ9z9unq14tScZHvyQ3Nx1ggOtS18S9Pk55q" + "IaCXfx26ucH76VRho=" + ) + INSTANCE_ID = "i-12345" + + @patch("ansible_collections.community.aws.plugins.modules.s3_bucket_notification.AnsibleAWSModule.client") def test_decryption(self, mock_client): - path = self.PEM_PATH - with open(path, 'r') as f: + with open(path, "r") as f: pem = to_text(f.read()) with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({'instance_id': self.INSTANCE_ID, - 'key_data': pem, - }) + set_module_args( + { + "instance_id": self.INSTANCE_ID, + "key_data": pem, + } + ) module = setup_module_object() mock_client().get_password_data.return_value = { - 'InstanceId': self.INSTANCE_ID, - 'PasswordData': self.ENCRYPTED_DATA, - 'Timestamp': datetime.datetime.now(), + "InstanceId": self.INSTANCE_ID, + "PasswordData": self.ENCRYPTED_DATA, + "Timestamp": datetime.datetime.now(), } ec2_win_password(module) self.assertEqual( - exec_info.exception.args[0]['win_password'], + exec_info.exception.args[0]["win_password"], to_bytes(self.UNENCRYPTED_DATA), ) diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py deleted file mode 100644 index 11de7f477..000000000 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_iam_password_policy.py +++ /dev/null @@ -1,30 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import json -import pytest - -from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args - -from ansible_collections.community.aws.plugins.modules import iam_password_policy - - -def test_warn_if_state_not_specified(capsys): - set_module_args({ - "min_pw_length": "8", - "require_symbols": "false", - "require_numbers": "true", - "require_uppercase": "true", - "require_lowercase": "true", - "allow_pw_change": "true", - "pw_max_age": "60", - "pw_reuse_prevent": "5", - "pw_expire": "false" - }) - with pytest.raises(SystemExit): - iam_password_policy.main() - captured = capsys.readouterr() - - output = json.loads(captured.out) - assert 'missing required arguments' in output.get('msg', '') diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py index 836e2cf07..7dcd785c9 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_opensearch.py @@ -1,86 +1,85 @@ # Copyright: Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type import functools -from ansible_collections.community.aws.plugins.module_utils.opensearch import ( - compare_domain_versions, - parse_version, -) + +from ansible_collections.community.aws.plugins.module_utils.opensearch import compare_domain_versions +from ansible_collections.community.aws.plugins.module_utils.opensearch import parse_version def test_parse_version(): test_versions = [ - ['Elasticsearch_5.5', {'engine_type': 'Elasticsearch', 'major': 5, 'minor': 5}], - ['Elasticsearch_7.1', {'engine_type': 'Elasticsearch', 'major': 7, 'minor': 1}], - ['Elasticsearch_7.10', {'engine_type': 'Elasticsearch', 'major': 7, 'minor': 10}], - ['OpenSearch_1.0', {'engine_type': 'OpenSearch', 'major': 1, 'minor': 0}], - ['OpenSearch_1.1', {'engine_type': 'OpenSearch', 'major': 1, 'minor': 1}], - ['OpenSearch_a.b', None], - ['OpenSearch_1.b', None], - ['OpenSearch_1-1', None], - ['OpenSearch_1.1.2', None], - ['OpenSearch_foo_1.1', None], - ['OpenSearch_1', None], - ['OpenSearch-1.0', None], - ['Foo_1.0', None], + ["Elasticsearch_5.5", {"engine_type": "Elasticsearch", "major": 5, "minor": 5}], + ["Elasticsearch_7.1", {"engine_type": "Elasticsearch", "major": 7, "minor": 1}], + ["Elasticsearch_7.10", {"engine_type": "Elasticsearch", "major": 7, "minor": 10}], + ["OpenSearch_1.0", {"engine_type": "OpenSearch", "major": 1, "minor": 0}], + ["OpenSearch_1.1", {"engine_type": "OpenSearch", "major": 1, "minor": 1}], + ["OpenSearch_a.b", None], + ["OpenSearch_1.b", None], + ["OpenSearch_1-1", None], + ["OpenSearch_1.1.2", None], + ["OpenSearch_foo_1.1", None], + ["OpenSearch_1", None], + ["OpenSearch-1.0", None], + ["Foo_1.0", None], ] for expected in test_versions: ret = parse_version(expected[0]) if ret != expected[1]: - raise AssertionError( - f"parse_version({expected[0]} returned {ret}, expected {expected[1]}") + raise AssertionError(f"parse_version({expected[0]} returned {ret}, expected {expected[1]}") def test_version_compare(): test_versions = [ - ['Elasticsearch_5.5', 'Elasticsearch_5.5', 0], - ['Elasticsearch_5.5', 'Elasticsearch_7.1', -1], - ['Elasticsearch_7.1', 'Elasticsearch_7.1', 0], - ['Elasticsearch_7.1', 'Elasticsearch_7.2', -1], - ['Elasticsearch_7.1', 'Elasticsearch_7.10', -1], - ['Elasticsearch_7.2', 'Elasticsearch_7.10', -1], - ['Elasticsearch_7.10', 'Elasticsearch_7.2', 1], - ['Elasticsearch_7.2', 'Elasticsearch_5.5', 1], - ['Elasticsearch_7.2', 'OpenSearch_1.0', -1], - ['Elasticsearch_7.2', 'OpenSearch_1.1', -1], - ['OpenSearch_1.1', 'OpenSearch_1.1', 0], - ['OpenSearch_1.0', 'OpenSearch_1.1', -1], - ['OpenSearch_1.1', 'OpenSearch_1.0', 1], - ['foo_1.1', 'OpenSearch_1.0', -1], - ['Elasticsearch_5.5', 'foo_1.0', 1], + ["Elasticsearch_5.5", "Elasticsearch_5.5", 0], + ["Elasticsearch_5.5", "Elasticsearch_7.1", -1], + ["Elasticsearch_7.1", "Elasticsearch_7.1", 0], + ["Elasticsearch_7.1", "Elasticsearch_7.2", -1], + ["Elasticsearch_7.1", "Elasticsearch_7.10", -1], + ["Elasticsearch_7.2", "Elasticsearch_7.10", -1], + ["Elasticsearch_7.10", "Elasticsearch_7.2", 1], + ["Elasticsearch_7.2", "Elasticsearch_5.5", 1], + ["Elasticsearch_7.2", "OpenSearch_1.0", -1], + ["Elasticsearch_7.2", "OpenSearch_1.1", -1], + ["OpenSearch_1.1", "OpenSearch_1.1", 0], + ["OpenSearch_1.0", "OpenSearch_1.1", -1], + ["OpenSearch_1.1", "OpenSearch_1.0", 1], + ["foo_1.1", "OpenSearch_1.0", -1], + ["Elasticsearch_5.5", "foo_1.0", 1], ] for v in test_versions: ret = compare_domain_versions(v[0], v[1]) if ret != v[2]: - raise AssertionError( - f"compare({v[0]}, {v[1]} returned {ret}, expected {v[2]}") + raise AssertionError(f"compare({v[0]}, {v[1]} returned {ret}, expected {v[2]}") def test_sort_versions(): input_versions = [ - 'Elasticsearch_5.6', - 'Elasticsearch_5.5', - 'Elasticsearch_7.10', - 'Elasticsearch_7.2', - 'foo_10.5', - 'OpenSearch_1.1', - 'OpenSearch_1.0', - 'Elasticsearch_7.3', + "Elasticsearch_5.6", + "Elasticsearch_5.5", + "Elasticsearch_7.10", + "Elasticsearch_7.2", + "foo_10.5", + "OpenSearch_1.1", + "OpenSearch_1.0", + "Elasticsearch_7.3", ] expected_versions = [ - 'foo_10.5', - 'Elasticsearch_5.5', - 'Elasticsearch_5.6', - 'Elasticsearch_7.2', - 'Elasticsearch_7.3', - 'Elasticsearch_7.10', - 'OpenSearch_1.0', - 'OpenSearch_1.1', + "foo_10.5", + "Elasticsearch_5.5", + "Elasticsearch_5.6", + "Elasticsearch_7.2", + "Elasticsearch_7.3", + "Elasticsearch_7.10", + "OpenSearch_1.0", + "OpenSearch_1.1", ] input_versions = sorted(input_versions, key=functools.cmp_to_key(compare_domain_versions)) if input_versions != expected_versions: - raise AssertionError( - f"Expected {expected_versions}, got {input_versions}") + raise AssertionError(f"Expected {expected_versions}, got {input_versions}") diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py index 7b22d5b00..1342a8d58 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_redshift_cross_region_snapshots.py @@ -1,40 +1,41 @@ # Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + __metaclass__ = type from ansible_collections.community.aws.plugins.modules import redshift_cross_region_snapshots as rcrs mock_status_enabled = { - 'SnapshotCopyGrantName': 'snapshot-us-east-1-to-us-west-2', - 'DestinationRegion': 'us-west-2', - 'RetentionPeriod': 1, + "SnapshotCopyGrantName": "snapshot-us-east-1-to-us-west-2", + "DestinationRegion": "us-west-2", + "RetentionPeriod": 1, } mock_status_disabled = {} mock_request_illegal = { - 'snapshot_copy_grant': 'changed', - 'destination_region': 'us-west-2', - 'snapshot_retention_period': 1 + "snapshot_copy_grant": "changed", + "destination_region": "us-west-2", + "snapshot_retention_period": 1, } mock_request_update = { - 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2', - 'destination_region': 'us-west-2', - 'snapshot_retention_period': 3 + "snapshot_copy_grant": "snapshot-us-east-1-to-us-west-2", + "destination_region": "us-west-2", + "snapshot_retention_period": 3, } mock_request_no_update = { - 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2', - 'destination_region': 'us-west-2', - 'snapshot_retention_period': 1 + "snapshot_copy_grant": "snapshot-us-east-1-to-us-west-2", + "destination_region": "us-west-2", + "snapshot_retention_period": 1, } def test_fail_at_unsupported_operations(): - response = rcrs.requesting_unsupported_modifications( - mock_status_enabled, mock_request_illegal - ) + response = rcrs.requesting_unsupported_modifications(mock_status_enabled, mock_request_illegal) assert response is True @@ -44,9 +45,7 @@ def test_needs_update_true(): def test_no_change(): - response = rcrs.requesting_unsupported_modifications( - mock_status_enabled, mock_request_no_update - ) + response = rcrs.requesting_unsupported_modifications(mock_status_enabled, mock_request_no_update) needs_update_response = rcrs.needs_update(mock_status_enabled, mock_request_no_update) assert response is False assert needs_update_response is False diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py new file mode 100644 index 000000000..57ed705c5 --- /dev/null +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_route53_wait.py @@ -0,0 +1,240 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2023, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import pytest + +from ansible_collections.community.aws.plugins.modules.route53_wait import detect_task_results + +_SINGLE_RESULT_SUCCESS = { + "changed": True, + "diff": {}, + "failed": False, + "wait_id": None, +} + +_SINGLE_RESULT_FAILED = { + "changed": False, + "failed": True, + "msg": "value of type must be one of: A, AAAA, CAA, CNAME, MX, NS, PTR, SOA, SPF, SRV, TXT, got: bar", +} + +_MULTI_RESULT_SUCCESS = { + "ansible_loop_var": "item", + "changed": True, + "diff": {}, + "failed": False, + "invocation": { + "module_args": { + "access_key": "asdf", + "alias": None, + "alias_evaluate_target_health": False, + "alias_hosted_zone_id": None, + "aws_access_key": "asdf", + "aws_ca_bundle": None, + "aws_config": None, + "aws_secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER", + "debug_botocore_endpoint_logs": False, + "endpoint_url": None, + "failover": None, + "geo_location": None, + "health_check": None, + "hosted_zone_id": None, + "identifier": None, + "overwrite": True, + "private_zone": False, + "profile": None, + "record": "foo.example.org", + "region": None, + "retry_interval": 500, + "secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER", + "session_token": None, + "state": "present", + "ttl": 300, + "type": "TXT", + "validate_certs": True, + "value": ["foo"], + "vpc_id": None, + "wait": False, + "wait_timeout": 300, + "weight": None, + "zone": "example.org", + }, + }, + "item": {"record": "foo.example.org", "value": "foo"}, + "wait_id": None, +} + +_MULTI_RESULT_FAILED = { + "ansible_loop_var": "item", + "changed": False, + "failed": True, + "invocation": { + "module_args": { + "access_key": "asdf", + "alias": None, + "alias_evaluate_target_health": False, + "alias_hosted_zone_id": None, + "aws_access_key": "asdf", + "aws_ca_bundle": None, + "aws_config": None, + "aws_secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER", + "debug_botocore_endpoint_logs": False, + "endpoint_url": None, + "failover": None, + "geo_location": None, + "health_check": None, + "hosted_zone_id": None, + "identifier": None, + "overwrite": True, + "private_zone": False, + "profile": None, + "record": "foo.example.org", + "region": None, + "retry_interval": 500, + "secret_key": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER", + "session_token": None, + "state": "present", + "ttl": 300, + "type": "bar", + "validate_certs": True, + "value": ["foo"], + "vpc_id": None, + "wait": False, + "wait_timeout": 300, + "weight": None, + "zone": "example.org", + }, + }, + "item": {"record": "foo.example.org", "value": "foo"}, + "msg": "value of type must be one of: A, AAAA, CAA, CNAME, MX, NS, PTR, SOA, SPF, SRV, TXT, got: bar", +} + + +DETECT_TASK_RESULTS_DATA = [ + [ + _SINGLE_RESULT_SUCCESS, + [ + ( + "", + _SINGLE_RESULT_SUCCESS, + ), + ], + ], + [ + { + "changed": True, + "msg": "All items completed", + "results": [ + _MULTI_RESULT_SUCCESS, + ], + "skipped": False, + }, + [ + ( + " for result #1", + _MULTI_RESULT_SUCCESS, + ), + ], + ], + [ + _SINGLE_RESULT_FAILED, + [ + ( + "", + _SINGLE_RESULT_FAILED, + ), + ], + ], + [ + { + "changed": False, + "failed": True, + "msg": "One or more items failed", + "results": [ + _MULTI_RESULT_FAILED, + ], + "skipped": False, + }, + [ + ( + " for result #1", + _MULTI_RESULT_FAILED, + ), + ], + ], +] + + +@pytest.mark.parametrize( + "input, expected", + DETECT_TASK_RESULTS_DATA, +) +def test_detect_task_results(input, expected): + assert list(detect_task_results(input)) == expected + + +DETECT_TASK_RESULTS_FAIL_DATA = [ + [ + {}, + "missing changed key", + [], + ], + [ + {"changed": True}, + "missing failed key", + [], + ], + [ + {"results": None}, + "missing changed key", + [], + ], + [ + {"results": None, "changed": True, "msg": "foo"}, + "missing skipped key", + [], + ], + [ + {"results": None, "changed": True, "msg": "foo", "skipped": False}, + "results is present, but not a list", + [], + ], + [ + {"results": [None], "changed": True, "msg": "foo", "skipped": False}, + "result 1 is not a dictionary", + [], + ], + [ + {"results": [{}], "changed": True, "msg": "foo", "skipped": False}, + "missing changed key for result 1", + [], + ], + [ + { + "results": [{"changed": True, "failed": False, "ansible_loop_var": "item", "invocation": {}}, {}], + "changed": True, + "msg": "foo", + "skipped": False, + }, + "missing changed key for result 2", + [(" for result #1", {"changed": True, "failed": False, "ansible_loop_var": "item", "invocation": {}})], + ], +] + + +@pytest.mark.parametrize( + "input, expected_exc, expected_result", + DETECT_TASK_RESULTS_FAIL_DATA, +) +def test_detect_task_fail_results(input, expected_exc, expected_result): + result = [] + with pytest.raises(ValueError) as exc: + for res in detect_task_results(input): + result.append(res) + + print(exc.value.args[0]) + assert expected_exc == exc.value.args[0] + print(result) + assert expected_result == result diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py new file mode 100644 index 000000000..518a11a3b --- /dev/null +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/test_ssm_inventory_info.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- + +# Copyright: Contributors to the Ansible project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from botocore.exceptions import BotoCoreError + +from ansible_collections.community.aws.plugins.modules.ssm_inventory_info import SsmInventoryInfoFailure +from ansible_collections.community.aws.plugins.modules.ssm_inventory_info import execute_module +from ansible_collections.community.aws.plugins.modules.ssm_inventory_info import get_ssm_inventory + + +def test_get_ssm_inventory(): + connection = MagicMock() + inventory_response = MagicMock() + connection.get_inventory.return_value = inventory_response + filters = MagicMock() + + assert get_ssm_inventory(connection, filters) == inventory_response + connection.get_inventory.assert_called_once_with(Filters=filters) + + +def test_get_ssm_inventory_failure(): + connection = MagicMock() + connection.get_inventory.side_effect = BotoCoreError(error="failed", operation="get_ssm_inventory") + filters = MagicMock() + + with pytest.raises(SsmInventoryInfoFailure): + get_ssm_inventory(connection, filters) + + +@patch("ansible_collections.community.aws.plugins.modules.ssm_inventory_info.get_ssm_inventory") +def test_execute_module(m_get_ssm_inventory): + instance_id = "i-0202020202020202" + aws_inventory = { + "AgentType": "amazon-ssm-agent", + "AgentVersion": "3.2.582.0", + "ComputerName": "ip-172-31-44-166.ec2.internal", + "InstanceId": "i-039eb9b1f55934ab6", + "InstanceStatus": "Active", + "IpAddress": "172.31.44.166", + "PlatformName": "Fedora Linux", + "PlatformType": "Linux", + "PlatformVersion": "37", + "ResourceType": "EC2Instance", + } + + ansible_inventory = { + "agent_type": "amazon-ssm-agent", + "agent_version": "3.2.582.0", + "computer_name": "ip-172-31-44-166.ec2.internal", + "instance_id": "i-039eb9b1f55934ab6", + "instance_status": "Active", + "ip_address": "172.31.44.166", + "platform_name": "Fedora Linux", + "platform_type": "Linux", + "platform_version": "37", + "resource_type": "EC2Instance", + } + + m_get_ssm_inventory.return_value = { + "Entities": [{"Id": instance_id, "Data": {"AWS:InstanceInformation": {"Content": [aws_inventory]}}}], + "Status": 200, + } + + connection = MagicMock() + module = MagicMock() + module.params = dict(instance_id=instance_id) + module.exit_json.side_effect = SystemExit(1) + module.fail_json_aws.side_effect = SystemError(2) + + with pytest.raises(SystemExit): + execute_module(module, connection) + + module.exit_json.assert_called_once_with(changed=False, ssm_inventory=ansible_inventory) + + +@patch("ansible_collections.community.aws.plugins.modules.ssm_inventory_info.get_ssm_inventory") +def test_execute_module_no_data(m_get_ssm_inventory): + instance_id = "i-0202020202020202" + + m_get_ssm_inventory.return_value = { + "Entities": [{"Id": instance_id, "Data": {}}], + } + + connection = MagicMock() + module = MagicMock() + module.params = dict(instance_id=instance_id) + module.exit_json.side_effect = SystemExit(1) + module.fail_json_aws.side_effect = SystemError(2) + + with pytest.raises(SystemExit): + execute_module(module, connection) + + module.exit_json.assert_called_once_with(changed=False, ssm_inventory={}) + + +@patch("ansible_collections.community.aws.plugins.modules.ssm_inventory_info.get_ssm_inventory") +def test_execute_module_failure(m_get_ssm_inventory): + instance_id = "i-0202020202020202" + + m_get_ssm_inventory.side_effect = SsmInventoryInfoFailure( + exc=BotoCoreError(error="failed", operation="get_ssm_inventory"), msg="get_ssm_inventory() failed." + ) + + connection = MagicMock() + module = MagicMock() + module.params = dict(instance_id=instance_id) + module.exit_json.side_effect = SystemExit(1) + module.fail_json_aws.side_effect = SystemError(2) + + with pytest.raises(SystemError): + execute_module(module, connection) diff --git a/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py b/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py index 026bf2549..a3d9e31db 100644 --- a/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py +++ b/ansible_collections/community/aws/tests/unit/plugins/modules/utils.py @@ -1,23 +1,20 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - import json +import unittest +from unittest.mock import patch -from ansible_collections.community.aws.tests.unit.compat import unittest -from ansible_collections.community.aws.tests.unit.compat.mock import patch from ansible.module_utils import basic from ansible.module_utils._text import to_bytes def set_module_args(args): - if '_ansible_remote_tmp' not in args: - args['_ansible_remote_tmp'] = '/tmp' - if '_ansible_keep_remote_files' not in args: - args['_ansible_keep_remote_files'] = False + if "_ansible_remote_tmp" not in args: + args["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in args: + args["_ansible_keep_remote_files"] = False - args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + args = json.dumps({"ANSIBLE_MODULE_ARGS": args}) basic._ANSIBLE_ARGS = to_bytes(args) @@ -30,22 +27,21 @@ class AnsibleFailJson(Exception): def exit_json(*args, **kwargs): - if 'changed' not in kwargs: - kwargs['changed'] = False + if "changed" not in kwargs: + kwargs["changed"] = False raise AnsibleExitJson(kwargs) def fail_json(*args, **kwargs): - kwargs['failed'] = True + kwargs["failed"] = True raise AnsibleFailJson(kwargs) class ModuleTestCase(unittest.TestCase): - def setUp(self): self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json) self.mock_module.start() - self.mock_sleep = patch('time.sleep') + self.mock_sleep = patch("time.sleep") self.mock_sleep.start() set_module_args({}) self.addCleanup(self.mock_module.stop) diff --git a/ansible_collections/community/aws/tests/unit/requirements.yml b/ansible_collections/community/aws/tests/unit/requirements.yml new file mode 100644 index 000000000..99ce82a1b --- /dev/null +++ b/ansible_collections/community/aws/tests/unit/requirements.yml @@ -0,0 +1,5 @@ +--- +collections: + - name: https://github.com/ansible-collections/amazon.aws.git + type: git + version: main -- cgit v1.2.3