diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
commit | 975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch) | |
tree | 89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/community/windows/tests/unit | |
parent | Initial commit. (diff) | |
download | ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip |
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/windows/tests/unit')
17 files changed, 1130 insertions, 0 deletions
diff --git a/ansible_collections/community/windows/tests/unit/__init__.py b/ansible_collections/community/windows/tests/unit/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/__init__.py diff --git a/ansible_collections/community/windows/tests/unit/compat/__init__.py b/ansible_collections/community/windows/tests/unit/compat/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/compat/__init__.py diff --git a/ansible_collections/community/windows/tests/unit/compat/mock.py b/ansible_collections/community/windows/tests/unit/compat/mock.py new file mode 100644 index 000000000..3dcd2687f --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/compat/mock.py @@ -0,0 +1,42 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +''' +Compat module for Python3.x's unittest.mock module +''' + +# Python 2.7 + +# Note: Could use the pypi mock library on python3.x as well as python2.x. It +# is the same as the python3 stdlib mock library + +try: + # Allow wildcard import because we really do want to import all of mock's + # symbols into this compat shim + # pylint: disable=wildcard-import,unused-wildcard-import + from unittest.mock import * +except ImportError: + # Python 2 + # pylint: disable=wildcard-import,unused-wildcard-import + try: + from mock import * + except ImportError: + print('You need the mock library installed on python2.x to run tests') diff --git a/ansible_collections/community/windows/tests/unit/conftest.py b/ansible_collections/community/windows/tests/unit/conftest.py new file mode 100644 index 000000000..e3f2ec4a0 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/conftest.py @@ -0,0 +1,43 @@ +"""Enable unit testing of Ansible collections. PYTEST_DONT_REWRITE""" +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import os.path + +from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder + + +ANSIBLE_COLLECTIONS_PATH = os.path.abspath(os.path.join(__file__, '..', '..', '..', '..', '..')) + + +# this monkeypatch to _pytest.pathlib.resolve_package_path fixes PEP420 resolution for collections in pytest >= 6.0.0 +def collection_resolve_package_path(path): + """Configure the Python package path so that pytest can find our collections.""" + for parent in path.parents: + if str(parent) == ANSIBLE_COLLECTIONS_PATH: + return parent + + raise Exception('File "%s" not found in collection path "%s".' % (path, ANSIBLE_COLLECTIONS_PATH)) + + +def pytest_configure(): + """Configure this pytest plugin.""" + + try: + if pytest_configure.executed: + return + except AttributeError: + pytest_configure.executed = True + + # allow unit tests to import code from collections + + # noinspection PyProtectedMember + _AnsibleCollectionFinder(paths=[os.path.dirname(ANSIBLE_COLLECTIONS_PATH)])._install() # pylint: disable=protected-access + + # noinspection PyProtectedMember + from _pytest import pathlib as pytest_pathlib + pytest_pathlib.resolve_package_path = collection_resolve_package_path + + +pytest_configure() diff --git a/ansible_collections/community/windows/tests/unit/mock/__init__.py b/ansible_collections/community/windows/tests/unit/mock/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/mock/__init__.py diff --git a/ansible_collections/community/windows/tests/unit/mock/loader.py b/ansible_collections/community/windows/tests/unit/mock/loader.py new file mode 100644 index 000000000..e5dff78c1 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/mock/loader.py @@ -0,0 +1,116 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from ansible.errors import AnsibleParserError +from ansible.parsing.dataloader import DataLoader +from ansible.module_utils._text import to_bytes, to_text + + +class DictDataLoader(DataLoader): + + def __init__(self, file_mapping=None): + file_mapping = {} if file_mapping is None else file_mapping + assert type(file_mapping) == dict + + super(DictDataLoader, self).__init__() + + self._file_mapping = file_mapping + self._build_known_directories() + self._vault_secrets = None + + def load_from_file(self, path, cache=True, unsafe=False): + path = to_text(path) + if path in self._file_mapping: + return self.load(self._file_mapping[path], path) + return None + + # TODO: the real _get_file_contents returns a bytestring, so we actually convert the + # unicode/text it's created with to utf-8 + def _get_file_contents(self, file_name): + path = to_text(file_name) + if path in self._file_mapping: + return (to_bytes(self._file_mapping[path]), False) + else: + raise AnsibleParserError("file not found: %s" % path) + + def path_exists(self, path): + path = to_text(path) + return path in self._file_mapping or path in self._known_directories + + def is_file(self, path): + path = to_text(path) + return path in self._file_mapping + + def is_directory(self, path): + path = to_text(path) + return path in self._known_directories + + def list_directory(self, path): + ret = [] + path = to_text(path) + for x in (list(self._file_mapping.keys()) + self._known_directories): + if x.startswith(path): + if os.path.dirname(x) == path: + ret.append(os.path.basename(x)) + return ret + + def is_executable(self, path): + # FIXME: figure out a way to make paths return true for this + return False + + def _add_known_directory(self, directory): + if directory not in self._known_directories: + self._known_directories.append(directory) + + def _build_known_directories(self): + self._known_directories = [] + for path in self._file_mapping: + dirname = os.path.dirname(path) + while dirname not in ('/', ''): + self._add_known_directory(dirname) + dirname = os.path.dirname(dirname) + + def push(self, path, content): + rebuild_dirs = False + if path not in self._file_mapping: + rebuild_dirs = True + + self._file_mapping[path] = content + + if rebuild_dirs: + self._build_known_directories() + + def pop(self, path): + if path in self._file_mapping: + del self._file_mapping[path] + self._build_known_directories() + + def clear(self): + self._file_mapping = dict() + self._known_directories = [] + + def get_basedir(self): + return os.getcwd() + + def set_vault_secrets(self, vault_secrets): + self._vault_secrets = vault_secrets diff --git a/ansible_collections/community/windows/tests/unit/mock/path.py b/ansible_collections/community/windows/tests/unit/mock/path.py new file mode 100644 index 000000000..54858b13d --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/mock/path.py @@ -0,0 +1,8 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible_collections.ansible.windows.tests.unit.compat.mock import MagicMock +from ansible.utils.path import unfrackpath + + +mock_unfrackpath_noop = MagicMock(spec_set=unfrackpath, side_effect=lambda x, *args, **kwargs: x) diff --git a/ansible_collections/community/windows/tests/unit/mock/procenv.py b/ansible_collections/community/windows/tests/unit/mock/procenv.py new file mode 100644 index 000000000..3cb1b5b2f --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/mock/procenv.py @@ -0,0 +1,90 @@ +# (c) 2016, Matt Davis <mdavis@ansible.com> +# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import sys +import json + +from contextlib import contextmanager +from io import BytesIO, StringIO +from ansible_collections.community.windows.tests.unit.compat import unittest +from ansible.module_utils.six import PY3 +from ansible.module_utils._text import to_bytes + + +@contextmanager +def swap_stdin_and_argv(stdin_data='', argv_data=tuple()): + """ + context manager that temporarily masks the test runner's values for stdin and argv + """ + real_stdin = sys.stdin + real_argv = sys.argv + + if PY3: + fake_stream = StringIO(stdin_data) + fake_stream.buffer = BytesIO(to_bytes(stdin_data)) + else: + fake_stream = BytesIO(to_bytes(stdin_data)) + + try: + sys.stdin = fake_stream + sys.argv = argv_data + + yield + finally: + sys.stdin = real_stdin + sys.argv = real_argv + + +@contextmanager +def swap_stdout(): + """ + context manager that temporarily replaces stdout for tests that need to verify output + """ + old_stdout = sys.stdout + + if PY3: + fake_stream = StringIO() + else: + fake_stream = BytesIO() + + try: + sys.stdout = fake_stream + + yield fake_stream + finally: + sys.stdout = old_stdout + + +class ModuleTestCase(unittest.TestCase): + def setUp(self, module_args=None): + if module_args is None: + module_args = {'_ansible_remote_tmp': '/tmp', '_ansible_keep_remote_files': False} + + args = json.dumps(dict(ANSIBLE_MODULE_ARGS=module_args)) + + # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually + self.stdin_swap = swap_stdin_and_argv(stdin_data=args) + self.stdin_swap.__enter__() + + def tearDown(self): + # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually + self.stdin_swap.__exit__(None, None, None) diff --git a/ansible_collections/community/windows/tests/unit/mock/vault_helper.py b/ansible_collections/community/windows/tests/unit/mock/vault_helper.py new file mode 100644 index 000000000..dcce9c784 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/mock/vault_helper.py @@ -0,0 +1,39 @@ +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.module_utils._text import to_bytes + +from ansible.parsing.vault import VaultSecret + + +class TextVaultSecret(VaultSecret): + '''A secret piece of text. ie, a password. Tracks text encoding. + + The text encoding of the text may not be the default text encoding so + we keep track of the encoding so we encode it to the same bytes.''' + + def __init__(self, text, encoding=None, errors=None, _bytes=None): + super(TextVaultSecret, self).__init__() + self.text = text + self.encoding = encoding or 'utf-8' + self._bytes = _bytes + self.errors = errors or 'strict' + + @property + def bytes(self): + '''The text encoded with encoding, unless we specifically set _bytes.''' + return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors) diff --git a/ansible_collections/community/windows/tests/unit/mock/yaml_helper.py b/ansible_collections/community/windows/tests/unit/mock/yaml_helper.py new file mode 100644 index 000000000..1ef172159 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/mock/yaml_helper.py @@ -0,0 +1,124 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import io +import yaml + +from ansible.module_utils.six import PY3 +from ansible.parsing.yaml.loader import AnsibleLoader +from ansible.parsing.yaml.dumper import AnsibleDumper + + +class YamlTestUtils(object): + """Mixin class to combine with a unittest.TestCase subclass.""" + def _loader(self, stream): + """Vault related tests will want to override this. + + Vault cases should setup a AnsibleLoader that has the vault password.""" + return AnsibleLoader(stream) + + def _dump_stream(self, obj, stream, dumper=None): + """Dump to a py2-unicode or py3-string stream.""" + if PY3: + return yaml.dump(obj, stream, Dumper=dumper) + else: + return yaml.dump(obj, stream, Dumper=dumper, encoding=None) + + def _dump_string(self, obj, dumper=None): + """Dump to a py2-unicode or py3-string""" + if PY3: + return yaml.dump(obj, Dumper=dumper) + else: + return yaml.dump(obj, Dumper=dumper, encoding=None) + + def _dump_load_cycle(self, obj): + # Each pass though a dump or load revs the 'generation' + # obj to yaml string + string_from_object_dump = self._dump_string(obj, dumper=AnsibleDumper) + + # wrap a stream/file like StringIO around that yaml + stream_from_object_dump = io.StringIO(string_from_object_dump) + loader = self._loader(stream_from_object_dump) + # load the yaml stream to create a new instance of the object (gen 2) + obj_2 = loader.get_data() + + # dump the gen 2 objects directory to strings + string_from_object_dump_2 = self._dump_string(obj_2, + dumper=AnsibleDumper) + + # The gen 1 and gen 2 yaml strings + self.assertEqual(string_from_object_dump, string_from_object_dump_2) + # the gen 1 (orig) and gen 2 py object + self.assertEqual(obj, obj_2) + + # again! gen 3... load strings into py objects + stream_3 = io.StringIO(string_from_object_dump_2) + loader_3 = self._loader(stream_3) + obj_3 = loader_3.get_data() + + string_from_object_dump_3 = self._dump_string(obj_3, dumper=AnsibleDumper) + + self.assertEqual(obj, obj_3) + # should be transitive, but... + self.assertEqual(obj_2, obj_3) + self.assertEqual(string_from_object_dump, string_from_object_dump_3) + + def _old_dump_load_cycle(self, obj): + '''Dump the passed in object to yaml, load it back up, dump again, compare.''' + stream = io.StringIO() + + yaml_string = self._dump_string(obj, dumper=AnsibleDumper) + self._dump_stream(obj, stream, dumper=AnsibleDumper) + + yaml_string_from_stream = stream.getvalue() + + # reset stream + stream.seek(0) + + loader = self._loader(stream) + # loader = AnsibleLoader(stream, vault_password=self.vault_password) + obj_from_stream = loader.get_data() + + stream_from_string = io.StringIO(yaml_string) + loader2 = self._loader(stream_from_string) + # loader2 = AnsibleLoader(stream_from_string, vault_password=self.vault_password) + obj_from_string = loader2.get_data() + + stream_obj_from_stream = io.StringIO() + stream_obj_from_string = io.StringIO() + + if PY3: + yaml.dump(obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper) + yaml.dump(obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper) + else: + yaml.dump(obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper, encoding=None) + yaml.dump(obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper, encoding=None) + + yaml_string_stream_obj_from_stream = stream_obj_from_stream.getvalue() + yaml_string_stream_obj_from_string = stream_obj_from_string.getvalue() + + stream_obj_from_stream.seek(0) + stream_obj_from_string.seek(0) + + if PY3: + yaml_string_obj_from_stream = yaml.dump(obj_from_stream, Dumper=AnsibleDumper) + yaml_string_obj_from_string = yaml.dump(obj_from_string, Dumper=AnsibleDumper) + else: + yaml_string_obj_from_stream = yaml.dump(obj_from_stream, Dumper=AnsibleDumper, encoding=None) + yaml_string_obj_from_string = yaml.dump(obj_from_string, Dumper=AnsibleDumper, encoding=None) + + assert yaml_string == yaml_string_obj_from_stream + assert yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string + assert (yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string == yaml_string_stream_obj_from_stream == + yaml_string_stream_obj_from_string) + assert obj == obj_from_stream + assert obj == obj_from_string + assert obj == yaml_string_obj_from_stream + assert obj == yaml_string_obj_from_string + assert obj == obj_from_stream == obj_from_string == yaml_string_obj_from_stream == yaml_string_obj_from_string + return {'obj': obj, + 'yaml_string': yaml_string, + 'yaml_string_from_stream': yaml_string_from_stream, + 'obj_from_stream': obj_from_stream, + 'obj_from_string': obj_from_string, + 'yaml_string_obj_from_string': yaml_string_obj_from_string} diff --git a/ansible_collections/community/windows/tests/unit/modules/__init__.py b/ansible_collections/community/windows/tests/unit/modules/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/modules/__init__.py diff --git a/ansible_collections/community/windows/tests/unit/modules/utils.py b/ansible_collections/community/windows/tests/unit/modules/utils.py new file mode 100644 index 000000000..bc627df64 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/modules/utils.py @@ -0,0 +1,50 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +from ansible_collections.community.windows.tests.unit.compat import unittest +from ansible_collections.community.windows.tests.unit.compat.mock import patch +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes + + +def set_module_args(args): + if '_ansible_remote_tmp' not in args: + args['_ansible_remote_tmp'] = '/tmp' + if '_ansible_keep_remote_files' not in args: + args['_ansible_keep_remote_files'] = False + + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) + + +class AnsibleExitJson(Exception): + pass + + +class AnsibleFailJson(Exception): + pass + + +def exit_json(*args, **kwargs): + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class ModuleTestCase(unittest.TestCase): + + def setUp(self): + self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json) + self.mock_module.start() + self.mock_sleep = patch('time.sleep') + self.mock_sleep.start() + set_module_args({}) + self.addCleanup(self.mock_module.stop) + self.addCleanup(self.mock_sleep.stop) diff --git a/ansible_collections/community/windows/tests/unit/plugins/__init__.py b/ansible_collections/community/windows/tests/unit/plugins/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/plugins/__init__.py diff --git a/ansible_collections/community/windows/tests/unit/plugins/lookup/__init__.py b/ansible_collections/community/windows/tests/unit/plugins/lookup/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/plugins/lookup/__init__.py diff --git a/ansible_collections/community/windows/tests/unit/plugins/lookup/fixtures/avi.json b/ansible_collections/community/windows/tests/unit/plugins/lookup/fixtures/avi.json new file mode 100644 index 000000000..ae89ca689 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/plugins/lookup/fixtures/avi.json @@ -0,0 +1,104 @@ +{ + "mock_single_obj": { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "PG-123", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-2084-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-2084-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-2084-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + }, + "mock_multiple_obj": { + "results": [ + { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "J-PG-0682", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-2084-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-2084-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-2084-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + }, + { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "J-PG-0231", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-1627-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-1627-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-1627-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + }, + { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "J-PG-0535", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-1934-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-1934-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-1934-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + }, + { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "J-PG-0094", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-1458-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-1458-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-1458-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + }, + { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "J-PG-0437", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-1836-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-1836-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-1836-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + }, + { + "_last_modified": "", + "cloud_ref": "https://192.0.2.132/api/cloud/cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "dhcp_enabled": true, + "exclude_discovered_subnets": false, + "name": "J-PG-0673", + "synced_from_se": true, + "tenant_ref": "https://192.0.2.132/api/tenant/admin", + "url": "https://192.0.2.132/api/network/dvportgroup-2075-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "uuid": "dvportgroup-2075-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vcenter_dvs": true, + "vimgrnw_ref": "https://192.0.2.132/api/vimgrnwruntime/dvportgroup-2075-cloud-4d063be1-99c2-44cf-8b28-977bd970524c", + "vrf_context_ref": "https://192.0.2.132/api/vrfcontext/vrfcontext-31f1b55f-319c-44eb-862f-69d79ffdf295" + } + ] + } +} diff --git a/ansible_collections/community/windows/tests/unit/plugins/lookup/test_laps_password.py b/ansible_collections/community/windows/tests/unit/plugins/lookup/test_laps_password.py new file mode 100644 index 000000000..29e2b938a --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/plugins/lookup/test_laps_password.py @@ -0,0 +1,511 @@ +# -*- coding: utf-8 -*- +# (c) 2019, Jordan Borean <jborean@redhat.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import platform +import pytest +import sys + +from ansible_collections.community.windows.tests.unit.compat.mock import MagicMock + +from ansible.errors import AnsibleLookupError +from ansible.plugins.loader import lookup_loader + + +class FakeLDAPError(Exception): + pass + + +class FakeLDAPAuthUnknownError(Exception): + pass + + +class FakeLdap(object): + SASL_AVAIL = 1 + TLS_AVAIL = 1 + + SCOPE_SUBTREE = 2 + + OPT_PROTOCOL_VERSION = 17 + OPT_REFERRALS = 8 + + OPT_X_TLS_NEVER = 0 + OPT_X_TLS_DEMAND = 2 + OPT_X_TLS_ALLOW = 3 + OPT_X_TLS_TRY = 4 + + OPT_X_TLS_CACERTFILE = 24578 + OPT_X_TLS_REQUIRE_CERT = 24582 + + LDAPError = FakeLDAPError + AUTH_UNKNOWN = FakeLDAPAuthUnknownError + + @staticmethod + def initialize(uri, bytes_mode=None, **kwargs): + return MagicMock() + + @staticmethod + def set_option(option, invalue): + pass + + +class FakeLdapUrl(object): + + def __init__(self, ldapUrl=None, urlscheme='ldap', hostport='', **kwargs): + url = ldapUrl if ldapUrl else "%s://%s" % (urlscheme, hostport) + self.urlscheme = url.split('://', 2)[0].lower() + self._url = url + + def initializeUrl(self): + return self._url + + +def fake_is_ldap_url(s): + s_lower = s.lower() + return s_lower.startswith("ldap://") or s_lower.startswith("ldaps://") or s_lower.startswith("ldapi://") + + +@pytest.fixture(autouse=True) +def laps_password(): + """Imports and the laps_password lookup with a mocks laps module for testing""" + + # Build the fake ldap and ldapurl Python modules + fake_ldap_obj = FakeLdap() + fake_ldap_url_obj = MagicMock() + fake_ldap_url_obj.isLDAPUrl.side_effect = fake_is_ldap_url + fake_ldap_url_obj.LDAPUrl.side_effect = FakeLdapUrl + + # Take a snapshot of sys.modules before we manipulate it + orig_modules = sys.modules.copy() + try: + sys.modules["ldap"] = fake_ldap_obj + sys.modules["ldapurl"] = fake_ldap_url_obj + + from ansible_collections.community.windows.plugins.lookup import laps_password as lookup_obj + + # ensure the changes to these globals aren't persisted after each test + orig_has_ldap = lookup_obj.HAS_LDAP + orig_ldap_imp_err = lookup_obj.LDAP_IMP_ERR + + yield lookup_obj + + lookup_obj.HAS_LDAP = orig_has_ldap + lookup_obj.LDAP_IMP_ERR = orig_ldap_imp_err + finally: + # Restore sys.modules back to our pre-shenanigans + sys.modules = orig_modules + + +def test_missing_ldap(laps_password): + laps_password.HAS_LDAP = False + laps_password.LDAP_IMP_ERR = "no import for you!" + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="test") + + assert str(err.value).startswith( + "Failed to import the required Python library (python-ldap) on %s's Python %s. See " + "https://pypi.org/project/python-ldap/ for more info. Please " + % (platform.node(), sys.executable) + ) + assert str(err.value).endswith(". Import Error: no import for you!") + + +def test_gssapi_without_sasl(monkeypatch, ): + monkeypatch.setattr("ldap.SASL_AVAIL", 0) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="test") + + assert str(err.value) == "Cannot use auth=gssapi when SASL is not configured with the local LDAP install" + + +def test_simple_auth_without_credentials(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="test", auth="simple") + + assert str(err.value) == "The username and password values are required when auth=simple" + + +def test_gssapi_auth_with_credentials(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="test", auth="gssapi", username="u", + password="p") + + assert str(err.value) == "Explicit credentials are not supported when auth='gssapi'. Call kinit outside of Ansible" + + +def test_not_encrypted_without_override(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="dc01", auth="simple", + username="test", password="test") + + assert str(err.value) == "Current configuration will result in plaintext traffic exposing credentials. Set " \ + "auth=gssapi, scheme=ldaps, start_tls=True, or allow_plaintext=True to continue" + + +def test_ldaps_without_tls(monkeypatch, ): + monkeypatch.setattr("ldap.TLS_AVAIL", 0) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="dc01", scheme="ldaps") + + assert str(err.value) == "Cannot use TLS as the local LDAP installed has not been configured to support it" + + +def test_start_tls_without_tls(monkeypatch, ): + monkeypatch.setattr("ldap.TLS_AVAIL", 0) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run(["host"], domain="dc01", start_tls=True) + + assert str(err.value) == "Cannot use TLS as the local LDAP installed has not been configured to support it" + + +def test_normal_run(monkeypatch, laps_password): + def get_laps_password(conn, cn, search_base): + return "CN=%s,%s" % (cn, search_base) + + mock_ldap = MagicMock() + mock_ldap.return_value.read_rootdse_s.return_value = {"defaultNamingContext": ["DC=domain,DC=com"]} + monkeypatch.setattr("ldap.initialize", mock_ldap) + + mock_get_laps_password = MagicMock(side_effect=get_laps_password) + monkeypatch.setattr(laps_password, "get_laps_password", mock_get_laps_password) + + actual = lookup_loader.get('community.windows.laps_password').run(["host1", "host2"], domain="dc01") + assert actual == ["CN=host1,DC=domain,DC=com", "CN=host2,DC=domain,DC=com"] + + # Verify the call count to get_laps_password + assert mock_get_laps_password.call_count == 2 + + # Verify the initialize() method call + assert mock_ldap.call_count == 1 + assert mock_ldap.call_args[0] == ("ldap://dc01:389",) + assert mock_ldap.call_args[1] == {"bytes_mode": False} + + # Verify the number of calls made to the mocked LDAP object + assert mock_ldap.mock_calls[1][0] == "().set_option" + assert mock_ldap.mock_calls[1][1] == (FakeLdap.OPT_PROTOCOL_VERSION, 3) + + assert mock_ldap.mock_calls[2][0] == "().set_option" + assert mock_ldap.mock_calls[2][1] == (FakeLdap.OPT_REFERRALS, 0) + + assert mock_ldap.mock_calls[3][0] == '().sasl_gssapi_bind_s' + assert mock_ldap.mock_calls[3][1] == () + + assert mock_ldap.mock_calls[4][0] == "().read_rootdse_s" + assert mock_ldap.mock_calls[4][1] == () + + assert mock_ldap.mock_calls[5][0] == "().unbind_s" + assert mock_ldap.mock_calls[5][1] == () + + +def test_run_with_simple_auth_and_search_base(monkeypatch, laps_password): + def get_laps_password(conn, cn, search_base): + return "CN=%s,%s" % (cn, search_base) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + mock_get_laps_password = MagicMock(side_effect=get_laps_password) + monkeypatch.setattr(laps_password, "get_laps_password", mock_get_laps_password) + + actual = lookup_loader.get('community.windows.laps_password').run(["host1", "host2"], domain="dc01", auth="simple", + username="user", password="pass", + allow_plaintext=True, + search_base="OU=Workstations,DC=domain,DC=com") + assert actual == ["CN=host1,OU=Workstations,DC=domain,DC=com", "CN=host2,OU=Workstations,DC=domain,DC=com"] + + # Verify the call count to get_laps_password + assert mock_get_laps_password.call_count == 2 + + # Verify the initialize() method call + assert mock_ldap.call_count == 1 + assert mock_ldap.call_args[0] == ("ldap://dc01:389",) + assert mock_ldap.call_args[1] == {"bytes_mode": False} + + # Verify the number of calls made to the mocked LDAP object + assert mock_ldap.mock_calls[1][0] == "().set_option" + assert mock_ldap.mock_calls[1][1] == (FakeLdap.OPT_PROTOCOL_VERSION, 3) + + assert mock_ldap.mock_calls[2][0] == "().set_option" + assert mock_ldap.mock_calls[2][1] == (FakeLdap.OPT_REFERRALS, 0) + + assert mock_ldap.mock_calls[3][0] == '().bind_s' + assert mock_ldap.mock_calls[3][1] == (u"user", u"pass") + + assert mock_ldap.mock_calls[4][0] == "().unbind_s" + assert mock_ldap.mock_calls[4][1] == () + + +@pytest.mark.parametrize("kwargs, expected", [ + [{"domain": "dc01"}, "ldap://dc01:389"], + [{"domain": "dc02", "port": 1234}, "ldap://dc02:1234"], + [{"domain": "dc03", "scheme": "ldaps"}, "ldaps://dc03:636"], + # Verifies that an explicit URI ignores port and scheme + [{"domain": "ldap://dc04", "port": 1234, "scheme": "ldaps"}, "ldap://dc04"], +]) +def test_uri_options(monkeypatch, kwargs, expected): + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('community.windows.laps_password').run([], **kwargs) + + assert mock_ldap.call_count == 1 + assert mock_ldap.call_args[0] == (expected,) + assert mock_ldap.call_args[1] == {"bytes_mode": False} + + +@pytest.mark.parametrize("validate, expected", [ + ["never", FakeLdap.OPT_X_TLS_NEVER], + ["allow", FakeLdap.OPT_X_TLS_ALLOW], + ["try", FakeLdap.OPT_X_TLS_TRY], + ["demand", FakeLdap.OPT_X_TLS_DEMAND], +]) +def test_certificate_validation(monkeypatch, validate, expected): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", start_tls=True, + validate_certs=validate) + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, expected) + + assert mock_ldap.mock_calls[3][0] == "().start_tls_s" + assert mock_ldap.mock_calls[3][1] == () + + assert mock_ldap.mock_calls[4][0] == "().sasl_gssapi_bind_s" + assert mock_ldap.mock_calls[4][1] == () + + +def test_certificate_validate_with_custom_cacert(monkeypatch): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + monkeypatch.setattr(os.path, 'exists', lambda x: True) + + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", scheme="ldaps", + cacert_file="cacert.pem") + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, FakeLdap.OPT_X_TLS_DEMAND) + assert mock_ldap_option.mock_calls[1][1] == (FakeLdap.OPT_X_TLS_CACERTFILE, u"cacert.pem") + + assert mock_ldap.mock_calls[3][0] == "().sasl_gssapi_bind_s" + assert mock_ldap.mock_calls[3][1] == () + + +def test_certificate_validate_with_custom_cacert_fail(monkeypatch): + def set_option(self, key, value): + if key == FakeLdap.OPT_X_TLS_CACERTFILE: + raise ValueError("set_option() failed") + + monkeypatch.setattr(FakeLdap, "set_option", set_option) + monkeypatch.setattr(os.path, 'exists', lambda x: True) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", scheme="ldaps", + cacert_file="cacert.pem") + + assert str(err.value) == "Failed to set path to cacert file, this is a known issue with older OpenLDAP " \ + "libraries on the host. Update OpenLDAP and reinstall python-ldap to continue" + + +@pytest.mark.parametrize("path", [ + "cacert.pem", + "~/.certs/cacert.pem", + "~/.certs/$USER/cacert.pem", +]) +def test_certificate_invalid_path(monkeypatch, path): + lookup = lookup_loader.get('community.windows.laps_password') + monkeypatch.setattr(os.path, 'exists', lambda x: False) + expected_path = os.path.expanduser(os.path.expandvars(path)) + + with pytest.raises(AnsibleLookupError) as err: + lookup.run([], domain="dc01", scheme="ldaps", cacert_file=path) + + assert str(err.value) == "The cacert_file specified '%s' does not exist" % expected_path + + +def test_simple_auth_with_ldaps(monkeypatch): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", scheme="ldaps", auth="simple", + username="user", password="pass") + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, FakeLdap.OPT_X_TLS_DEMAND) + + assert mock_ldap.mock_calls[3][0] == '().bind_s' + assert mock_ldap.mock_calls[3][1] == (u"user", u"pass") + + assert mock_ldap.mock_calls[4][0] == "().read_rootdse_s" + assert mock_ldap.mock_calls[4][1] == () + + +def test_simple_auth_with_start_tls(monkeypatch): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", start_tls=True, auth="simple", + username="user", password="pass") + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, FakeLdap.OPT_X_TLS_DEMAND) + + assert mock_ldap.mock_calls[3][0] == "().start_tls_s" + assert mock_ldap.mock_calls[3][1] == () + + assert mock_ldap.mock_calls[4][0] == '().bind_s' + assert mock_ldap.mock_calls[4][1] == (u"user", u"pass") + + assert mock_ldap.mock_calls[5][0] == "().read_rootdse_s" + assert mock_ldap.mock_calls[5][1] == () + + +def test_start_tls_ldap_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.start_tls_s.side_effect = FakeLDAPError("fake error") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", start_tls=True) + + assert str(err.value) == "Failed to send StartTLS to LDAP host 'ldap://dc01:389': fake error" + + +def test_simple_bind_ldap_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.bind_s.side_effect = FakeLDAPError("fake error") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run([], domain="dc01", auth="simple", username="user", + password="pass", allow_plaintext=True) + + assert str(err.value) == "Failed to simple bind against LDAP host 'ldap://dc01:389': fake error" + + +def test_sasl_bind_ldap_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.sasl_gssapi_bind_s.side_effect = FakeLDAPError("fake error") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run([], domain="dc01") + + assert str(err.value) == "Failed to do a sasl bind against LDAP host 'ldap://dc01:389': fake error" + + +def test_sasl_bind_ldap_no_mechs_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.sasl_gssapi_bind_s.side_effect = FakeLDAPAuthUnknownError("no mechs") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('community.windows.laps_password').run([], domain="dc01") + + assert str(err.value) == "Failed to do a sasl bind against LDAP host 'ldap://dc01:389', the GSSAPI mech is " \ + "not installed: no mechs" + + +def test_get_password_valid(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + ("CN=server,DC=domain,DC=local", + {"ms-Mcs-AdmPwd": ["pass"], "distinguishedName": ["CN=server,DC=domain,DC=local"]}), + # Replicates the 3 extra entries AD returns that aren't server objects + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + actual = laps_password.get_laps_password(mock_conn, "server", "DC=domain,DC=local") + assert actual == "pass" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=domain,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} + + +def test_get_password_laps_not_configured(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + ("CN=server,DC=domain,DC=local", {"distinguishedName": ["CN=server,DC=domain,DC=local"]}), + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + with pytest.raises(AnsibleLookupError) as err: + laps_password.get_laps_password(mock_conn, "server2", "DC=test,DC=local") + assert str(err.value) == \ + "The server 'CN=server,DC=domain,DC=local' did not have the LAPS attribute 'ms-Mcs-AdmPwd'" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=test,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server2))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} + + +def test_get_password_no_results(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + with pytest.raises(AnsibleLookupError) as err: + laps_password.get_laps_password(mock_conn, "server", "DC=domain,DC=local") + assert str(err.value) == "Failed to find the server 'server' in the base 'DC=domain,DC=local'" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=domain,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} + + +def test_get_password_multiple_results(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + ("CN=server,OU=Workstations,DC=domain,DC=local", + {"ms-Mcs-AdmPwd": ["pass"], "distinguishedName": ["CN=server,OU=Workstations,DC=domain,DC=local"]}), + ("CN=server,OU=Servers,DC=domain,DC=local", + {"ms-Mcs-AdmPwd": ["pass"], "distinguishedName": ["CN=server,OU=Servers,DC=domain,DC=local"]}), + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + with pytest.raises(AnsibleLookupError) as err: + laps_password.get_laps_password(mock_conn, "server", "DC=domain,DC=local") + assert str(err.value) == \ + "Found too many results for the server 'server' in the base 'DC=domain,DC=local'. Specify a more explicit " \ + "search base for the server required. Found servers 'CN=server,OU=Workstations,DC=domain,DC=local', " \ + "'CN=server,OU=Servers,DC=domain,DC=local'" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=domain,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} diff --git a/ansible_collections/community/windows/tests/unit/requirements.txt b/ansible_collections/community/windows/tests/unit/requirements.txt new file mode 100644 index 000000000..c4e4edb59 --- /dev/null +++ b/ansible_collections/community/windows/tests/unit/requirements.txt @@ -0,0 +1,3 @@ +setuptools > 0.6 # pytest-xdist installed via requirements does not work with very old setuptools (sanity_ok) +unittest2 ; python_version < '2.7' +importlib ; python_version < '2.7' |