diff options
Diffstat (limited to 'ansible_collections/cisco/aci/tests/unit')
23 files changed, 1087 insertions, 0 deletions
diff --git a/ansible_collections/cisco/aci/tests/unit/__init__.py b/ansible_collections/cisco/aci/tests/unit/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/__init__.py diff --git a/ansible_collections/cisco/aci/tests/unit/compat/__init__.py b/ansible_collections/cisco/aci/tests/unit/compat/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/compat/__init__.py diff --git a/ansible_collections/cisco/aci/tests/unit/compat/builtins.py b/ansible_collections/cisco/aci/tests/unit/compat/builtins.py new file mode 100644 index 000000000..bfc8adfbe --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/compat/builtins.py @@ -0,0 +1,34 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +# +# Compat for python2.7 +# + +# One unittest needs to import builtins via __import__() so we need to have +# the string that represents it +try: + import __builtin__ +except ImportError: + BUILTINS = "builtins" +else: + BUILTINS = "__builtin__" diff --git a/ansible_collections/cisco/aci/tests/unit/compat/mock.py b/ansible_collections/cisco/aci/tests/unit/compat/mock.py new file mode 100644 index 000000000..54c1d20e7 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/compat/mock.py @@ -0,0 +1,125 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +""" +Compat module for Python3.x's unittest.mock module +""" +import sys + +# Python 2.7 + +# Note: Could use the pypi mock library on python3.x as well as python2.x. It +# is the same as the python3 stdlib mock library + +try: + # Allow wildcard import because we really do want to import all of mock's + # symbols into this compat shim + # pylint: disable=wildcard-import,unused-wildcard-import + from unittest.mock import * +except ImportError: + # Python 2 + # pylint: disable=wildcard-import,unused-wildcard-import + try: + from mock import * + except ImportError: + print("You need the mock library installed on python2.x to run tests") + + +# Prior to 3.4.4, mock_open cannot handle binary read_data +if sys.version_info >= (3,) and sys.version_info < (3, 4, 4): + file_spec = None + + def _iterate_read_data(read_data): + # Helper for mock_open: + # Retrieve lines from read_data via a generator so that separate calls to + # readline, read, and readlines are properly interleaved + sep = b"\n" if isinstance(read_data, bytes) else "\n" + data_as_list = [l + sep for l in read_data.split(sep)] + + if data_as_list[-1] == sep: + # If the last line ended in a newline, the list comprehension will have an + # extra entry that's just a newline. Remove this. + data_as_list = data_as_list[:-1] + else: + # If there wasn't an extra newline by itself, then the file being + # emulated doesn't have a newline to end the last line remove the + # newline that our naive format() added + data_as_list[-1] = data_as_list[-1][:-1] + + for line in data_as_list: + yield line + + def mock_open(mock=None, read_data=""): + """ + A helper function to create a mock to replace the use of `open`. It works + for `open` called directly or used as a context manager. + + The `mock` argument is the mock object to configure. If `None` (the + default) then a `MagicMock` will be created for you, with the API limited + to methods or attributes available on standard file handles. + + `read_data` is a string for the `read` methoddline`, and `readlines` of the + file handle to return. This is an empty string by default. + """ + + def _readlines_side_effect(*args, **kwargs): + if handle.readlines.return_value is not None: + return handle.readlines.return_value + return list(_data) + + def _read_side_effect(*args, **kwargs): + if handle.read.return_value is not None: + return handle.read.return_value + return type(read_data)().join(_data) + + def _readline_side_effect(): + if handle.readline.return_value is not None: + while True: + yield handle.readline.return_value + for line in _data: + yield line + + global file_spec + if file_spec is None: + import _io + + file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) + + if mock is None: + mock = MagicMock(name="open", spec=open) + + handle = MagicMock(spec=file_spec) + handle.__enter__.return_value = handle + + _data = _iterate_read_data(read_data) + + handle.write.return_value = None + handle.read.return_value = None + handle.readline.return_value = None + handle.readlines.return_value = None + + handle.read.side_effect = _read_side_effect + handle.readline.side_effect = _readline_side_effect() + handle.readlines.side_effect = _readlines_side_effect + + mock.return_value = handle + return mock diff --git a/ansible_collections/cisco/aci/tests/unit/compat/unittest.py b/ansible_collections/cisco/aci/tests/unit/compat/unittest.py new file mode 100644 index 000000000..df3379b82 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/compat/unittest.py @@ -0,0 +1,39 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +""" +Compat module for Python2.7's unittest module +""" + +import sys + +# Allow wildcard import because we really do want to import all of +# unittests's symbols into this compat shim +# pylint: disable=wildcard-import,unused-wildcard-import +if sys.version_info < (2, 7): + try: + # Need unittest2 on python2.6 + from unittest2 import * + except ImportError: + print("You need unittest2 installed on python2.6.x to run tests") +else: + from unittest import * diff --git a/ansible_collections/cisco/aci/tests/unit/mock/__init__.py b/ansible_collections/cisco/aci/tests/unit/mock/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/mock/__init__.py diff --git a/ansible_collections/cisco/aci/tests/unit/mock/loader.py b/ansible_collections/cisco/aci/tests/unit/mock/loader.py new file mode 100644 index 000000000..524870cfa --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/mock/loader.py @@ -0,0 +1,116 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import os + +from ansible.errors import AnsibleParserError +from ansible.parsing.dataloader import DataLoader +from ansible.module_utils._text import to_bytes, to_text + + +class DictDataLoader(DataLoader): + def __init__(self, file_mapping=None): + file_mapping = {} if file_mapping is None else file_mapping + assert type(file_mapping) == dict + + super(DictDataLoader, self).__init__() + + self._file_mapping = file_mapping + self._build_known_directories() + self._vault_secrets = None + + def load_from_file(self, path, cache=True, unsafe=False): + path = to_text(path) + if path in self._file_mapping: + return self.load(self._file_mapping[path], path) + return None + + # TODO: the real _get_file_contents returns a bytestring, so we actually convert the + # unicode/text it's created with to utf-8 + def _get_file_contents(self, file_name): + file_name = to_text(file_name) + if file_name in self._file_mapping: + return (to_bytes(self._file_mapping[file_name]), False) + else: + raise AnsibleParserError("file not found: %s" % file_name) + + def path_exists(self, path): + path = to_text(path) + return path in self._file_mapping or path in self._known_directories + + def is_file(self, path): + path = to_text(path) + return path in self._file_mapping + + def is_directory(self, path): + path = to_text(path) + return path in self._known_directories + + def list_directory(self, path): + ret = [] + path = to_text(path) + for x in list(self._file_mapping.keys()) + self._known_directories: + if x.startswith(path): + if os.path.dirname(x) == path: + ret.append(os.path.basename(x)) + return ret + + def is_executable(self, path): + # FIXME: figure out a way to make paths return true for this + return False + + def _add_known_directory(self, directory): + if directory not in self._known_directories: + self._known_directories.append(directory) + + def _build_known_directories(self): + self._known_directories = [] + for path in self._file_mapping: + dirname = os.path.dirname(path) + while dirname not in ("/", ""): + self._add_known_directory(dirname) + dirname = os.path.dirname(dirname) + + def push(self, path, content): + rebuild_dirs = False + if path not in self._file_mapping: + rebuild_dirs = True + + self._file_mapping[path] = content + + if rebuild_dirs: + self._build_known_directories() + + def pop(self, path): + if path in self._file_mapping: + del self._file_mapping[path] + self._build_known_directories() + + def clear(self): + self._file_mapping = dict() + self._known_directories = [] + + def get_basedir(self): + return os.getcwd() + + def set_vault_secrets(self, vault_secrets): + self._vault_secrets = vault_secrets diff --git a/ansible_collections/cisco/aci/tests/unit/mock/path.py b/ansible_collections/cisco/aci/tests/unit/mock/path.py new file mode 100644 index 000000000..1e9f6663e --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/mock/path.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from ansible_collections.cisco.aci.tests.unit.compat.mock import MagicMock +from ansible.utils.path import unfrackpath + + +mock_unfrackpath_noop = MagicMock(spec_set=unfrackpath, side_effect=lambda x, *args, **kwargs: x) diff --git a/ansible_collections/cisco/aci/tests/unit/mock/procenv.py b/ansible_collections/cisco/aci/tests/unit/mock/procenv.py new file mode 100644 index 000000000..58d94b7a2 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/mock/procenv.py @@ -0,0 +1,91 @@ +# (c) 2016, Matt Davis <mdavis@ansible.com> +# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import sys +import json + +from contextlib import contextmanager +from io import BytesIO, StringIO +from ansible_collections.cisco.aci.tests.unit.compat import unittest +from ansible.module_utils.six import PY3 +from ansible.module_utils._text import to_bytes + + +@contextmanager +def swap_stdin_and_argv(stdin_data="", argv_data=tuple()): + """ + context manager that temporarily masks the test runner's values for stdin and argv + """ + real_stdin = sys.stdin + real_argv = sys.argv + + if PY3: + fake_stream = StringIO(stdin_data) + fake_stream.buffer = BytesIO(to_bytes(stdin_data)) + else: + fake_stream = BytesIO(to_bytes(stdin_data)) + + try: + sys.stdin = fake_stream + sys.argv = argv_data + + yield + finally: + sys.stdin = real_stdin + sys.argv = real_argv + + +@contextmanager +def swap_stdout(): + """ + context manager that temporarily replaces stdout for tests that need to verify output + """ + old_stdout = sys.stdout + + if PY3: + fake_stream = StringIO() + else: + fake_stream = BytesIO() + + try: + sys.stdout = fake_stream + + yield fake_stream + finally: + sys.stdout = old_stdout + + +class ModuleTestCase(unittest.TestCase): + def setUp(self, module_args=None): + if module_args is None: + module_args = {"_ansible_remote_tmp": "/tmp", "_ansible_keep_remote_files": False} + + args = json.dumps(dict(ANSIBLE_MODULE_ARGS=module_args)) + + # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually + self.stdin_swap = swap_stdin_and_argv(stdin_data=args) + self.stdin_swap.__enter__() + + def tearDown(self): + # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually + self.stdin_swap.__exit__(None, None, None) diff --git a/ansible_collections/cisco/aci/tests/unit/mock/vault_helper.py b/ansible_collections/cisco/aci/tests/unit/mock/vault_helper.py new file mode 100644 index 000000000..b1e79b0b3 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/mock/vault_helper.py @@ -0,0 +1,40 @@ +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from ansible.module_utils._text import to_bytes + +from ansible.parsing.vault import VaultSecret + + +class TextVaultSecret(VaultSecret): + """A secret piece of text. ie, a password. Tracks text encoding. + + The text encoding of the text may not be the default text encoding so + we keep track of the encoding so we encode it to the same bytes.""" + + def __init__(self, text, encoding=None, errors=None, _bytes=None): + super(TextVaultSecret, self).__init__() + self.text = text + self.encoding = encoding or "utf-8" + self._bytes = _bytes + self.errors = errors or "strict" + + @property + def bytes(self): + """The text encoded with encoding, unless we specifically set _bytes.""" + return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors) diff --git a/ansible_collections/cisco/aci/tests/unit/mock/yaml_helper.py b/ansible_collections/cisco/aci/tests/unit/mock/yaml_helper.py new file mode 100644 index 000000000..e8cc55955 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/mock/yaml_helper.py @@ -0,0 +1,132 @@ +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import io +import yaml + +from ansible.module_utils.six import PY3 +from ansible.parsing.yaml.loader import AnsibleLoader +from ansible.parsing.yaml.dumper import AnsibleDumper + + +class YamlTestUtils(object): + """Mixin class to combine with a unittest.TestCase subclass.""" + + def _loader(self, stream): + """Vault related tests will want to override this. + + Vault cases should setup a AnsibleLoader that has the vault password.""" + return AnsibleLoader(stream) + + def _dump_stream(self, obj, stream, dumper=None): + """Dump to a py2-unicode or py3-string stream.""" + if PY3: + return yaml.dump(obj, stream, Dumper=dumper) + else: + return yaml.dump(obj, stream, Dumper=dumper, encoding=None) + + def _dump_string(self, obj, dumper=None): + """Dump to a py2-unicode or py3-string""" + if PY3: + return yaml.dump(obj, Dumper=dumper) + else: + return yaml.dump(obj, Dumper=dumper, encoding=None) + + def _dump_load_cycle(self, obj): + # Each pass though a dump or load revs the 'generation' + # obj to yaml string + string_from_object_dump = self._dump_string(obj, dumper=AnsibleDumper) + + # wrap a stream/file like StringIO around that yaml + stream_from_object_dump = io.StringIO(string_from_object_dump) + loader = self._loader(stream_from_object_dump) + # load the yaml stream to create a new instance of the object (gen 2) + obj_2 = loader.get_data() + + # dump the gen 2 objects directory to strings + string_from_object_dump_2 = self._dump_string(obj_2, dumper=AnsibleDumper) + + # The gen 1 and gen 2 yaml strings + self.assertEqual(string_from_object_dump, string_from_object_dump_2) + # the gen 1 (orig) and gen 2 py object + self.assertEqual(obj, obj_2) + + # again! gen 3... load strings into py objects + stream_3 = io.StringIO(string_from_object_dump_2) + loader_3 = self._loader(stream_3) + obj_3 = loader_3.get_data() + + string_from_object_dump_3 = self._dump_string(obj_3, dumper=AnsibleDumper) + + self.assertEqual(obj, obj_3) + # should be transitive, but... + self.assertEqual(obj_2, obj_3) + self.assertEqual(string_from_object_dump, string_from_object_dump_3) + + def _old_dump_load_cycle(self, obj): + """Dump the passed in object to yaml, load it back up, dump again, compare.""" + stream = io.StringIO() + + yaml_string = self._dump_string(obj, dumper=AnsibleDumper) + self._dump_stream(obj, stream, dumper=AnsibleDumper) + + yaml_string_from_stream = stream.getvalue() + + # reset stream + stream.seek(0) + + loader = self._loader(stream) + # loader = AnsibleLoader(stream, vault_password=self.vault_password) + obj_from_stream = loader.get_data() + + stream_from_string = io.StringIO(yaml_string) + loader2 = self._loader(stream_from_string) + # loader2 = AnsibleLoader(stream_from_string, vault_password=self.vault_password) + obj_from_string = loader2.get_data() + + stream_obj_from_stream = io.StringIO() + stream_obj_from_string = io.StringIO() + + if PY3: + yaml.dump(obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper) + yaml.dump(obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper) + else: + yaml.dump(obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper, encoding=None) + yaml.dump(obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper, encoding=None) + + yaml_string_stream_obj_from_stream = stream_obj_from_stream.getvalue() + yaml_string_stream_obj_from_string = stream_obj_from_string.getvalue() + + stream_obj_from_stream.seek(0) + stream_obj_from_string.seek(0) + + if PY3: + yaml_string_obj_from_stream = yaml.dump(obj_from_stream, Dumper=AnsibleDumper) + yaml_string_obj_from_string = yaml.dump(obj_from_string, Dumper=AnsibleDumper) + else: + yaml_string_obj_from_stream = yaml.dump(obj_from_stream, Dumper=AnsibleDumper, encoding=None) + yaml_string_obj_from_string = yaml.dump(obj_from_string, Dumper=AnsibleDumper, encoding=None) + + assert yaml_string == yaml_string_obj_from_stream + assert yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string + assert ( + yaml_string + == yaml_string_obj_from_stream + == yaml_string_obj_from_string + == yaml_string_stream_obj_from_stream + == yaml_string_stream_obj_from_string + ) + assert obj == obj_from_stream + assert obj == obj_from_string + assert obj == yaml_string_obj_from_stream + assert obj == yaml_string_obj_from_string + assert obj == obj_from_stream == obj_from_string == yaml_string_obj_from_stream == yaml_string_obj_from_string + return { + "obj": obj, + "yaml_string": yaml_string, + "yaml_string_from_stream": yaml_string_from_stream, + "obj_from_stream": obj_from_stream, + "obj_from_string": obj_from_string, + "yaml_string_obj_from_string": yaml_string_obj_from_string, + } diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/.gitignore b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/.gitignore new file mode 100644 index 000000000..bc1a1f616 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/.gitignore @@ -0,0 +1,2 @@ +# Created by pytest automatically. +* diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/CACHEDIR.TAG b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/CACHEDIR.TAG new file mode 100644 index 000000000..381f03a59 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/CACHEDIR.TAG @@ -0,0 +1,4 @@ +Signature: 8a477f597d28d172789f06886806bc55 +# This file is a cache directory tag created by pytest. +# For information about cache directory tags, see: +# http://www.bford.info/cachedir/spec.html diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/README.md b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/README.md new file mode 100644 index 000000000..bb78ba07e --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/README.md @@ -0,0 +1,8 @@ +# pytest cache directory # + +This directory contains data from the pytest's cache plugin, +which provides the `--lf` and `--ff` options, as well as the `cache` fixture. + +**Do not** commit this to version control. + +See [the docs](https://docs.pytest.org/en/latest/cache.html) for more information. diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/lastfailed b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/lastfailed new file mode 100644 index 000000000..49c16432f --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/lastfailed @@ -0,0 +1,3 @@ +{ + "test_aci.py": true +}
\ No newline at end of file diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/nodeids b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/nodeids new file mode 100644 index 000000000..0637a088a --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/nodeids @@ -0,0 +1 @@ +[]
\ No newline at end of file diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/stepwise b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/stepwise new file mode 100644 index 000000000..0637a088a --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/.pytest_cache/v/cache/stepwise @@ -0,0 +1 @@ +[]
\ No newline at end of file diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/__init__.py b/ansible_collections/cisco/aci/tests/unit/module_utils/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/__init__.py diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/conftest.py b/ansible_collections/cisco/aci/tests/unit/module_utils/conftest.py new file mode 100644 index 000000000..c4a2d92ec --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/conftest.py @@ -0,0 +1,74 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ +# see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import json +import sys +from io import BytesIO + +import pytest + +import ansible.module_utils.basic +from ansible.module_utils.six import PY3, string_types +from ansible.module_utils._text import to_bytes +from ansible.module_utils.common._collections_compat import MutableMapping + + +@pytest.fixture +def stdin(mocker, request): + old_args = ansible.module_utils.basic._ANSIBLE_ARGS + ansible.module_utils.basic._ANSIBLE_ARGS = None + old_argv = sys.argv + sys.argv = ["ansible_unittest"] + + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if "ANSIBLE_MODULE_ARGS" not in request.param: + request.param = {"ANSIBLE_MODULE_ARGS": request.param} + if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"]["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"]["_ansible_keep_remote_files"] = False + args = json.dumps(request.param) + else: + raise Exception("Malformed data to the stdin pytest fixture") + + fake_stdin = BytesIO(to_bytes(args, errors="surrogate_or_strict")) + if PY3: + mocker.patch("ansible.module_utils.basic.sys.stdin", mocker.MagicMock()) + mocker.patch("ansible.module_utils.basic.sys.stdin.buffer", fake_stdin) + else: + mocker.patch("ansible.module_utils.basic.sys.stdin", fake_stdin) + + yield fake_stdin + + ansible.module_utils.basic._ANSIBLE_ARGS = old_args + sys.argv = old_argv + + +@pytest.fixture +def am(stdin, request): + old_args = ansible.module_utils.basic._ANSIBLE_ARGS + ansible.module_utils.basic._ANSIBLE_ARGS = None + old_argv = sys.argv + sys.argv = ["ansible_unittest"] + + argspec = {} + if hasattr(request, "param"): + if isinstance(request.param, dict): + argspec = request.param + + am = ansible.module_utils.basic.AnsibleModule( + argument_spec=argspec, + ) + am._name = "ansible_unittest" + + yield am + + ansible.module_utils.basic._ANSIBLE_ARGS = old_args + sys.argv = old_argv diff --git a/ansible_collections/cisco/aci/tests/unit/module_utils/test_aci.py b/ansible_collections/cisco/aci/tests/unit/module_utils/test_aci.py new file mode 100644 index 000000000..a1e07ed24 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/module_utils/test_aci.py @@ -0,0 +1,345 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2017, Dag Wieers (@dagwieers) <dag@wieers.com> +# GNU General Public License v3.0+ +# see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import sys + +from ansible_collections.cisco.aci.tests.unit.compat import unittest +from ansible_collections.cisco.aci.plugins.module_utils.aci import ACIModule +from ansible.module_utils.six import PY2 +from ansible.module_utils._text import to_native + +import pytest + + +class AltModule: + params = dict( + hostname="dummy", + port=123, + protocol="https", + state="present", + ) + + +class AltACIModule(ACIModule): + def __init__(self): + self.result = dict(changed=False) + self.module = AltModule + self.params = self.module.params + + +aci = AltACIModule() + + +try: + from lxml import etree + + if sys.version_info >= (2, 7): + from xmljson import cobra +except ImportError: + pytestmark = pytest.mark.skip("ACI Ansible modules require the lxml and xmljson Python libraries") + + +class AciRest(unittest.TestCase): + def test_invalid_aci_login(self): + self.maxDiff = None + + error = dict( + code="401", + text=("Username or password is incorrect - " "FAILED local authentication"), + ) + + imdata = [ + { + "error": { + "attributes": { + "code": "401", + "text": ("Username or password is incorrect - " "FAILED local authentication"), + }, + }, + } + ] + + totalCount = 1 + + json_response = '{"totalCount":"1","imdata":[{"error":{"attributes":{"code":"401","text":"Username or password is incorrect - FAILED local authentication"}}}]}' # NOQA + aci.response_json(json_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.imdata, imdata) + self.assertEqual(aci.totalCount, totalCount) + + # Python 2.7+ is needed for xmljson + if sys.version_info < (2, 7): + return + + xml_response = """<?xml version="1.0" encoding="UTF-8"?> + <imdata totalCount="1"> + <error + code="401" + text="Username or password is incorrect - FAILED local +authentication" + /> + </imdata> + """ + aci.response_xml(xml_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.imdata, imdata) + self.assertEqual(aci.totalCount, totalCount) + + def test_valid_aci_login(self): + self.maxDiff = None + + imdata = [ + { + "aaaLogin": { + "attributes": { + "token": "ZldYAsoO9d0FfAQM8xaEVWvQPSOYwpnqzhwpIC1r4MaToknJjlIuAt9+TvXqrZ8lWYIGPj6VnZkWiS8nJfaiaX/AyrdD35jsSxiP3zydh+849xym7ALCw/fFNsc7b5ik1HaMuSUtdrN8fmCEUy7Pq/QNpGEqkE8m7HaxAuHpmvXgtdW1bA+KKJu2zY1c/tem", # NOQA + "siteFingerprint": "NdxD72K/uXaUK0wn", + "refreshTimeoutSeconds": "600", + "maximumLifetimeSeconds": "86400", + "guiIdleTimeoutSeconds": "1200", + "restTimeoutSeconds": "90", + "creationTime": "1500134817", + "firstLoginTime": "1500134817", + "userName": "admin", + "remoteUser": "false", + "unixUserId": "15374", + "sessionId": "o7hObsqNTfCmDGcZI5c4ng==", + "lastName": "", + "firstName": "", + "version": "2.0(2f)", + "buildTime": "Sat Aug 20 23:07:07 PDT 2016", + "node": "topology/pod-1/node-1", + }, + "children": [ + { + "aaaUserDomain": { + "attributes": { + "name": "all", + "rolesR": "admin", + "rolesW": "admin", + }, + "children": [ + { + "aaaReadRoles": { + "attributes": {}, + }, + }, + { + "aaaWriteRoles": { + "attributes": {}, + "children": [ + { + "role": { + "attributes": { + "name": "admin", + }, + }, + } + ], + }, + }, + ], + }, + }, + { + "DnDomainMapEntry": { + "attributes": { + "dn": "uni/tn-common", + "readPrivileges": "admin", + "writePrivileges": "admin", + }, + }, + }, + { + "DnDomainMapEntry": { + "attributes": { + "dn": "uni/tn-infra", + "readPrivileges": "admin", + "writePrivileges": "admin", + }, + }, + }, + { + "DnDomainMapEntry": { + "attributes": { + "dn": "uni/tn-mgmt", + "readPrivileges": "admin", + "writePrivileges": "admin", + }, + }, + }, + ], + }, + } + ] + + totalCount = 1 + + json_response = '{"totalCount":"1","imdata":[{"aaaLogin":{"attributes":{"token":"ZldYAsoO9d0FfAQM8xaEVWvQPSOYwpnqzhwpIC1r4MaToknJjlIuAt9+TvXqrZ8lWYIGPj6VnZkWiS8nJfaiaX/AyrdD35jsSxiP3zydh+849xym7ALCw/fFNsc7b5ik1HaMuSUtdrN8fmCEUy7Pq/QNpGEqkE8m7HaxAuHpmvXgtdW1bA+KKJu2zY1c/tem","siteFingerprint":"NdxD72K/uXaUK0wn","refreshTimeoutSeconds":"600","maximumLifetimeSeconds":"86400","guiIdleTimeoutSeconds":"1200","restTimeoutSeconds":"90","creationTime":"1500134817","firstLoginTime":"1500134817","userName":"admin","remoteUser":"false","unixUserId":"15374","sessionId":"o7hObsqNTfCmDGcZI5c4ng==","lastName":"","firstName":"","version":"2.0(2f)","buildTime":"Sat Aug 20 23:07:07 PDT 2016","node":"topology/pod-1/node-1"},"children":[{"aaaUserDomain":{"attributes":{"name":"all","rolesR":"admin","rolesW":"admin"},"children":[{"aaaReadRoles":{"attributes":{}}},{"aaaWriteRoles":{"attributes":{},"children":[{"role":{"attributes":{"name":"admin"}}}]}}]}},{"DnDomainMapEntry":{"attributes":{"dn":"uni/tn-common","readPrivileges":"admin","writePrivileges":"admin"}}},{"DnDomainMapEntry":{"attributes":{"dn":"uni/tn-infra","readPrivileges":"admin","writePrivileges":"admin"}}},{"DnDomainMapEntry":{"attributes":{"dn":"uni/tn-mgmt","readPrivileges":"admin","writePrivileges":"admin"}}}]}}]}' # NOQA + aci.response_json(json_response) + self.assertEqual(aci.imdata, imdata) + self.assertEqual(aci.totalCount, totalCount) + + # Python 2.7+ is needed for xmljson + if sys.version_info < (2, 7): + return + + xml_response = '<?xml version="1.0" encoding="UTF-8"?><imdata totalCount="1">\n<aaaLogin token="ZldYAsoO9d0FfAQM8xaEVWvQPSOYwpnqzhwpIC1r4MaToknJjlIuAt9+TvXqrZ8lWYIGPj6VnZkWiS8nJfaiaX/AyrdD35jsSxiP3zydh+849xym7ALCw/fFNsc7b5ik1HaMuSUtdrN8fmCEUy7Pq/QNpGEqkE8m7HaxAuHpmvXgtdW1bA+KKJu2zY1c/tem" siteFingerprint="NdxD72K/uXaUK0wn" refreshTimeoutSeconds="600" maximumLifetimeSeconds="86400" guiIdleTimeoutSeconds="1200" restTimeoutSeconds="90" creationTime="1500134817" firstLoginTime="1500134817" userName="admin" remoteUser="false" unixUserId="15374" sessionId="o7hObsqNTfCmDGcZI5c4ng==" lastName="" firstName="" version="2.0(2f)" buildTime="Sat Aug 20 23:07:07 PDT 2016" node="topology/pod-1/node-1">\n<aaaUserDomain name="all" rolesR="admin" rolesW="admin">\n<aaaReadRoles/>\n<aaaWriteRoles>\n<role name="admin"/>\n</aaaWriteRoles>\n</aaaUserDomain>\n<DnDomainMapEntry dn="uni/tn-common" readPrivileges="admin" writePrivileges="admin"/>\n<DnDomainMapEntry dn="uni/tn-infra" readPrivileges="admin" writePrivileges="admin"/>\n<DnDomainMapEntry dn="uni/tn-mgmt" readPrivileges="admin" writePrivileges="admin"/>\n</aaaLogin></imdata>\n' # NOQA + aci.response_xml(xml_response) + self.assertEqual(aci.imdata, imdata) + self.assertEqual(aci.totalCount, totalCount) + + def test_invalid_input(self): + self.maxDiff = None + + error = dict( + code="401", + text=("Username or password is incorrect - " "FAILED local authentication"), + ) + + imdata = [ + { + "error": { + "attributes": { + "code": "401", + "text": ("Username or password is incorrect - " "FAILED local authentication"), + }, + }, + } + ] + + totalCount = 1 + + json_response = '{"totalCount":"1","imdata":[{"error":{"attributes":{"code":"401","text":"Username or password is incorrect - FAILED local authentication"}}}]}' # NOQA + aci.response_json(json_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.imdata, imdata) + self.assertEqual(aci.totalCount, totalCount) + + # Python 2.7+ is needed for xmljson + if sys.version_info < (2, 7): + return + + xml_response = """<?xml version="1.0" encoding="UTF-8"?><imdata totalCount="1"> + <error + code="401" + text="Username or password is incorrect - FAILED local +authentication" + /> + </imdata> + """ + aci.response_xml(xml_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.imdata, imdata) + self.assertEqual(aci.totalCount, totalCount) + + def test_empty_response(self): + self.maxDiffi = None + + if PY2: + error_text = "Unable to parse output as JSON, see 'raw' output. No JSON object could be decoded" + else: + error_text = "Unable to parse output as JSON, see 'raw' output. Expecting value: line 1 column 1 (char 0)" + + error = dict( + code=-1, + text=error_text, + ) + raw = "" + + json_response = "" + aci.response_json(json_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.result["raw"], raw) + + # Python 2.7+ is needed for xmljson + if sys.version_info < (2, 7): + return + + elif etree.LXML_VERSION < (3, 3, 0, 0): + error_text = "Unable to parse output as XML, see 'raw' output. None" + elif etree.LXML_VERSION < (4, 0, 0, 0): + error_text = to_native("Unable to parse output as XML, see 'raw' output. None (line 0)", errors="surrogate_or_strict") + elif PY2: + error_text = "Unable to parse output as XML, see 'raw' output. Document is empty, line 1, column 1 (line 1)" + else: + error_text = None + + xml_response = "" + aci.response_xml(xml_response) + + if error_text is None: + # errors vary on Python 3.8+ for unknown reasons + # accept any of the following error messages + errors = ( + ("Unable to parse output as XML, see 'raw' output. None (line 0)"), + ("Unable to parse output as XML, see 'raw' output. Document is empty, line 1, column 1 (<string>, line 1)"), + ) + + for error in errors: + if error in aci.error["text"]: + error_text = error + break + + error = dict( + code=-1, + text=error_text, + ) + + raw = "" + + self.assertEqual(aci.error, error) + self.assertEqual(aci.result["raw"], raw) + + def test_invalid_response(self): + self.maxDiff = None + + if sys.version_info < (2, 7): + error_text = "Unable to parse output as JSON, see 'raw' output. Expecting object: line 1 column 8 (char 8)" + elif PY2: + error_text = "Unable to parse output as JSON, see 'raw' output. No JSON object could be decoded" + else: + error_text = "Unable to parse output as JSON, see 'raw' output. Expecting value: line 1 column 9 (char 8)" + + error = dict( + code=-1, + text=error_text, + ) + + raw = '{ "aaa":' + + json_response = '{ "aaa":' + aci.response_json(json_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.result["raw"], raw) + + # Python 2.7+ is needed for xmljson + if sys.version_info < (2, 7): + return + + elif etree.LXML_VERSION < (3, 3, 0, 0): + error_text = "Unable to parse output as XML, see 'raw' output. Couldn't find end of Start Tag aaa line 1, line 1, column 5" + elif PY2: + error_text = "Unable to parse output as XML, see 'raw' output. Couldn't find end of Start Tag aaa line 1, line 1, column 6 (line 1)" + + else: + error_text = "Unable to parse output as XML, see 'raw' output. Couldn't find end of Start Tag aaa line 1, line 1, column 6 (<string>, line 1)" + + error = dict( + code=-1, + text=error_text, + ) + + raw = "<aaa " + + xml_response = "<aaa " + aci.response_xml(xml_response) + self.assertEqual(aci.error, error) + self.assertEqual(aci.result["raw"], raw) diff --git a/ansible_collections/cisco/aci/tests/unit/modules/__init__.py b/ansible_collections/cisco/aci/tests/unit/modules/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/modules/__init__.py diff --git a/ansible_collections/cisco/aci/tests/unit/modules/utils.py b/ansible_collections/cisco/aci/tests/unit/modules/utils.py new file mode 100644 index 000000000..80a77fdc1 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/modules/utils.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import json + +from ansible_collections.cisco.aci.tests.unit.compat import unittest +from ansible_collections.cisco.aci.tests.unit.compat.mock import patch +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes + + +def set_module_args(args): + if "_ansible_remote_tmp" not in args: + args["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in args: + args["_ansible_keep_remote_files"] = False + + args = json.dumps({"ANSIBLE_MODULE_ARGS": args}) + basic._ANSIBLE_ARGS = to_bytes(args) + + +class AnsibleExitJson(Exception): + pass + + +class AnsibleFailJson(Exception): + pass + + +def exit_json(*args, **kwargs): + if "changed" not in kwargs: + kwargs["changed"] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + kwargs["failed"] = True + raise AnsibleFailJson(kwargs) + + +class ModuleTestCase(unittest.TestCase): + def setUp(self): + self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json) + self.mock_module.start() + self.mock_sleep = patch("time.sleep") + self.mock_sleep.start() + set_module_args({}) + self.addCleanup(self.mock_module.stop) + self.addCleanup(self.mock_sleep.stop) diff --git a/ansible_collections/cisco/aci/tests/unit/requirements.txt b/ansible_collections/cisco/aci/tests/unit/requirements.txt new file mode 100644 index 000000000..4c76ff2c0 --- /dev/null +++ b/ansible_collections/cisco/aci/tests/unit/requirements.txt @@ -0,0 +1,13 @@ +requests +setuptools > 0.6 # pytest-xdist installed via requirements does not work with very old setuptools (sanity_ok) +unittest2 ; python_version < '2.7' +importlib ; python_version < '2.7' +netaddr +ipaddress + +# requirement for aci_rest module +xmljson +lxml + +# requirement for aci_aaa_user module +python-dateutil
\ No newline at end of file |