diff options
Diffstat (limited to '')
32 files changed, 2454 insertions, 0 deletions
diff --git a/test/units/utils/__init__.py b/test/units/utils/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/__init__.py diff --git a/test/units/utils/collection_loader/__init__.py b/test/units/utils/collection_loader/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/collection_loader/__init__.py diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py new file mode 100644 index 00000000..4041a338 --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py @@ -0,0 +1,4 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +raise Exception('this module should never be loaded') diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml new file mode 100644 index 00000000..f2e2fdec --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml @@ -0,0 +1,4 @@ +plugin_routing: + modules: + rerouted_module: + redirect: ansible.builtin.ping diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py new file mode 100644 index 00000000..9d30580f --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py @@ -0,0 +1,8 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ..module_utils.my_util import question + + +def action_code(): + return "hello from my_action.py" diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py new file mode 100644 index 00000000..35e1381b --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py @@ -0,0 +1,4 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from .my_util import question diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py new file mode 100644 index 00000000..c431c34c --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py @@ -0,0 +1,6 @@ +# WARNING: Changing line numbers of code in this file will break collection tests that use tracing to check paths and line numbers. +# Also, do not import division from __future__ as this will break detection of __future__ inheritance on Python 2. + + +def question(): + return 3 / 2 diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py new file mode 100644 index 00000000..6d697034 --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py @@ -0,0 +1,5 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +raise Exception('this should never run') diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py new file mode 100644 index 00000000..99320a0c --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py @@ -0,0 +1,6 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +def module_code(): + return "hello from amodule.py" diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py new file mode 100644 index 00000000..6068ac1a --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py @@ -0,0 +1,5 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +raise Exception('this code should never execute') diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py new file mode 100644 index 00000000..6068ac1a --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py @@ -0,0 +1,5 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +raise Exception('this code should never execute') diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py new file mode 100644 index 00000000..6068ac1a --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py @@ -0,0 +1,5 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +raise Exception('this code should never execute') diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py new file mode 100644 index 00000000..6068ac1a --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py @@ -0,0 +1,5 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +raise Exception('this code should never execute') diff --git a/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep diff --git a/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep diff --git a/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep diff --git a/test/units/utils/collection_loader/test_collection_loader.py b/test/units/utils/collection_loader/test_collection_loader.py new file mode 100644 index 00000000..6488188c --- /dev/null +++ b/test/units/utils/collection_loader/test_collection_loader.py @@ -0,0 +1,834 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import pkgutil +import pytest +import re +import sys + +from ansible.module_utils.six import PY3, string_types +from ansible.module_utils.compat.importlib import import_module +from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef +from ansible.utils.collection_loader._collection_finder import ( + _AnsibleCollectionFinder, _AnsibleCollectionLoader, _AnsibleCollectionNSPkgLoader, _AnsibleCollectionPkgLoader, + _AnsibleCollectionPkgLoaderBase, _AnsibleCollectionRootPkgLoader, _AnsiblePathHookFinder, + _get_collection_name_from_path, _get_collection_role_path, _get_collection_metadata, _iter_modules_impl +) +from ansible.utils.collection_loader._collection_config import _EventSource +from units.compat.mock import MagicMock, NonCallableMagicMock, patch + + +# fixture to ensure we always clean up the import stuff when we're done +@pytest.fixture(autouse=True, scope='function') +def teardown(*args, **kwargs): + yield + reset_collections_loader_state() + +# BEGIN STANDALONE TESTS - these exercise behaviors of the individual components without the import machinery + + +def test_finder_setup(): + # ensure scalar path is listified + f = _AnsibleCollectionFinder(paths='/bogus/bogus') + assert isinstance(f._n_collection_paths, list) + + # ensure sys.path paths that have an ansible_collections dir are added to the end of the collections paths + with patch.object(sys, 'path', ['/bogus', default_test_collection_paths[1], '/morebogus', default_test_collection_paths[0]]): + f = _AnsibleCollectionFinder(paths=['/explicit', '/other']) + assert f._n_collection_paths == ['/explicit', '/other', default_test_collection_paths[1], default_test_collection_paths[0]] + + configured_paths = ['/bogus'] + playbook_paths = ['/playbookdir'] + f = _AnsibleCollectionFinder(paths=configured_paths) + assert f._n_collection_paths == configured_paths + f.set_playbook_paths(playbook_paths) + assert f._n_collection_paths == extend_paths(playbook_paths, 'collections') + configured_paths + + # ensure scalar playbook_paths gets listified + f.set_playbook_paths(playbook_paths[0]) + assert f._n_collection_paths == extend_paths(playbook_paths, 'collections') + configured_paths + + +def test_finder_not_interested(): + f = get_default_finder() + assert f.find_module('nothanks') is None + assert f.find_module('nothanks.sub', path=['/bogus/dir']) is None + + +def test_finder_ns(): + # ensure we can still load ansible_collections and ansible_collections.ansible when they don't exist on disk + f = _AnsibleCollectionFinder(paths=['/bogus/bogus']) + loader = f.find_module('ansible_collections') + assert isinstance(loader, _AnsibleCollectionRootPkgLoader) + + loader = f.find_module('ansible_collections.ansible', path=['/bogus/bogus']) + assert isinstance(loader, _AnsibleCollectionNSPkgLoader) + + f = get_default_finder() + loader = f.find_module('ansible_collections') + assert isinstance(loader, _AnsibleCollectionRootPkgLoader) + + # path is not allowed for top-level + with pytest.raises(ValueError): + f.find_module('ansible_collections', path=['whatever']) + + # path is required for subpackages + with pytest.raises(ValueError): + f.find_module('ansible_collections.whatever', path=None) + + paths = [os.path.join(p, 'ansible_collections/nonexistns') for p in default_test_collection_paths] + + # test missing + loader = f.find_module('ansible_collections.nonexistns', paths) + assert loader is None + + +# keep these up top to make sure the loader install/remove are working, since we rely on them heavily in the tests +def test_loader_remove(): + fake_mp = [MagicMock(), _AnsibleCollectionFinder(), MagicMock(), _AnsibleCollectionFinder()] + fake_ph = [MagicMock().m1, MagicMock().m2, _AnsibleCollectionFinder()._ansible_collection_path_hook, NonCallableMagicMock] + # must nest until 2.6 compilation is totally donezo + with patch.object(sys, 'meta_path', fake_mp): + with patch.object(sys, 'path_hooks', fake_ph): + _AnsibleCollectionFinder()._remove() + assert len(sys.meta_path) == 2 + # no AnsibleCollectionFinders on the meta path after remove is called + assert all((not isinstance(mpf, _AnsibleCollectionFinder) for mpf in sys.meta_path)) + assert len(sys.path_hooks) == 3 + # none of the remaining path hooks should point at an AnsibleCollectionFinder + assert all((not isinstance(ph.__self__, _AnsibleCollectionFinder) for ph in sys.path_hooks if hasattr(ph, '__self__'))) + assert AnsibleCollectionConfig.collection_finder is None + + +def test_loader_install(): + fake_mp = [MagicMock(), _AnsibleCollectionFinder(), MagicMock(), _AnsibleCollectionFinder()] + fake_ph = [MagicMock().m1, MagicMock().m2, _AnsibleCollectionFinder()._ansible_collection_path_hook, NonCallableMagicMock] + # must nest until 2.6 compilation is totally donezo + with patch.object(sys, 'meta_path', fake_mp): + with patch.object(sys, 'path_hooks', fake_ph): + f = _AnsibleCollectionFinder() + f._install() + assert len(sys.meta_path) == 3 # should have removed the existing ACFs and installed a new one + assert sys.meta_path[0] is f # at the front + # the rest of the meta_path should not be AnsibleCollectionFinders + assert all((not isinstance(mpf, _AnsibleCollectionFinder) for mpf in sys.meta_path[1:])) + assert len(sys.path_hooks) == 4 # should have removed the existing ACF path hooks and installed a new one + # the first path hook should be ours, make sure it's pointing at the right instance + assert hasattr(sys.path_hooks[0], '__self__') and sys.path_hooks[0].__self__ is f + # the rest of the path_hooks should not point at an AnsibleCollectionFinder + assert all((not isinstance(ph.__self__, _AnsibleCollectionFinder) for ph in sys.path_hooks[1:] if hasattr(ph, '__self__'))) + assert AnsibleCollectionConfig.collection_finder is f + with pytest.raises(ValueError): + AnsibleCollectionConfig.collection_finder = f + + +def test_finder_coll(): + f = get_default_finder() + + tests = [ + {'name': 'ansible_collections.testns.testcoll', 'test_paths': [default_test_collection_paths]}, + {'name': 'ansible_collections.ansible.builtin', 'test_paths': [['/bogus'], default_test_collection_paths]}, + ] + # ensure finder works for legit paths and bogus paths + for test_dict in tests: + # splat the dict values to our locals + globals().update(test_dict) + parent_pkg = name.rpartition('.')[0] + for paths in test_paths: + paths = [os.path.join(p, parent_pkg.replace('.', '/')) for p in paths] + loader = f.find_module(name, path=paths) + assert isinstance(loader, _AnsibleCollectionPkgLoader) + + +def test_root_loader_not_interested(): + with pytest.raises(ImportError): + _AnsibleCollectionRootPkgLoader('not_ansible_collections_toplevel', path_list=[]) + + with pytest.raises(ImportError): + _AnsibleCollectionRootPkgLoader('ansible_collections.somens', path_list=['/bogus']) + + +def test_root_loader(): + name = 'ansible_collections' + # ensure this works even when ansible_collections doesn't exist on disk + for paths in [], default_test_collection_paths: + if name in sys.modules: + del sys.modules[name] + loader = _AnsibleCollectionRootPkgLoader(name, paths) + assert repr(loader).startswith('_AnsibleCollectionRootPkgLoader(path=') + module = loader.load_module(name) + assert module.__name__ == name + assert module.__path__ == [p for p in extend_paths(paths, name) if os.path.isdir(p)] + # even if the dir exists somewhere, this loader doesn't support get_data, so make __file__ a non-file + assert module.__file__ == '<ansible_synthetic_collection_package>' + assert module.__package__ == name + assert sys.modules.get(name) == module + + +def test_nspkg_loader_not_interested(): + with pytest.raises(ImportError): + _AnsibleCollectionNSPkgLoader('not_ansible_collections_toplevel.something', path_list=[]) + + with pytest.raises(ImportError): + _AnsibleCollectionNSPkgLoader('ansible_collections.somens.somecoll', path_list=[]) + + +def test_nspkg_loader_load_module(): + # ensure the loader behaves on the toplevel and ansible packages for both legit and missing/bogus paths + for name in ['ansible_collections.ansible', 'ansible_collections.testns']: + parent_pkg = name.partition('.')[0] + module_to_load = name.rpartition('.')[2] + paths = extend_paths(default_test_collection_paths, parent_pkg) + existing_child_paths = [p for p in extend_paths(paths, module_to_load) if os.path.exists(p)] + if name in sys.modules: + del sys.modules[name] + loader = _AnsibleCollectionNSPkgLoader(name, path_list=paths) + assert repr(loader).startswith('_AnsibleCollectionNSPkgLoader(path=') + module = loader.load_module(name) + assert module.__name__ == name + assert isinstance(module.__loader__, _AnsibleCollectionNSPkgLoader) + assert module.__path__ == existing_child_paths + assert module.__package__ == name + assert module.__file__ == '<ansible_synthetic_collection_package>' + assert sys.modules.get(name) == module + + +def test_collpkg_loader_not_interested(): + with pytest.raises(ImportError): + _AnsibleCollectionPkgLoader('not_ansible_collections', path_list=[]) + + with pytest.raises(ImportError): + _AnsibleCollectionPkgLoader('ansible_collections.ns', path_list=['/bogus/bogus']) + + +def test_collpkg_loader_load_module(): + reset_collections_loader_state() + with patch('ansible.utils.collection_loader.AnsibleCollectionConfig') as p: + for name in ['ansible_collections.ansible.builtin', 'ansible_collections.testns.testcoll']: + parent_pkg = name.rpartition('.')[0] + module_to_load = name.rpartition('.')[2] + paths = extend_paths(default_test_collection_paths, parent_pkg) + existing_child_paths = [p for p in extend_paths(paths, module_to_load) if os.path.exists(p)] + is_builtin = 'ansible.builtin' in name + if name in sys.modules: + del sys.modules[name] + loader = _AnsibleCollectionPkgLoader(name, path_list=paths) + assert repr(loader).startswith('_AnsibleCollectionPkgLoader(path=') + module = loader.load_module(name) + assert module.__name__ == name + assert isinstance(module.__loader__, _AnsibleCollectionPkgLoader) + if is_builtin: + assert module.__path__ == [] + else: + assert module.__path__ == [existing_child_paths[0]] + + assert module.__package__ == name + if is_builtin: + assert module.__file__ == '<ansible_synthetic_collection_package>' + else: + assert module.__file__.endswith('__synthetic__') and os.path.isdir(os.path.dirname(module.__file__)) + assert sys.modules.get(name) == module + + assert hasattr(module, '_collection_meta') and isinstance(module._collection_meta, dict) + + # FIXME: validate _collection_meta contents match what's on disk (or not) + + # if the module has metadata, try loading it with busted metadata + if module._collection_meta: + _collection_finder = import_module('ansible.utils.collection_loader._collection_finder') + with patch.object(_collection_finder, '_meta_yml_to_dict', side_effect=Exception('bang')): + with pytest.raises(Exception) as ex: + _AnsibleCollectionPkgLoader(name, path_list=paths).load_module(name) + assert 'error parsing collection metadata' in str(ex.value) + + +def test_coll_loader(): + with patch('ansible.utils.collection_loader.AnsibleCollectionConfig'): + with pytest.raises(ValueError): + # not a collection + _AnsibleCollectionLoader('ansible_collections') + + with pytest.raises(ValueError): + # bogus paths + _AnsibleCollectionLoader('ansible_collections.testns.testcoll', path_list=[]) + + # FIXME: more + + +def test_path_hook_setup(): + with patch.object(sys, 'path_hooks', []): + found_hook = None + pathhook_exc = None + try: + found_hook = _AnsiblePathHookFinder._get_filefinder_path_hook() + except Exception as phe: + pathhook_exc = phe + + if PY3: + assert str(pathhook_exc) == 'need exactly one FileFinder import hook (found 0)' + else: + assert found_hook is None + + assert repr(_AnsiblePathHookFinder(object(), '/bogus/path')) == "_AnsiblePathHookFinder(path='/bogus/path')" + + +def test_path_hook_importerror(): + # ensure that AnsiblePathHookFinder.find_module swallows ImportError from path hook delegation on Py3, eg if the delegated + # path hook gets passed a file on sys.path (python36.zip) + reset_collections_loader_state() + path_to_a_file = os.path.join(default_test_collection_paths[0], 'ansible_collections/testns/testcoll/plugins/action/my_action.py') + # it's a bug if the following pops an ImportError... + assert _AnsiblePathHookFinder(_AnsibleCollectionFinder(), path_to_a_file).find_module('foo.bar.my_action') is None + + +def test_new_or_existing_module(): + module_name = 'blar.test.module' + pkg_name = module_name.rpartition('.')[0] + + # create new module case + nuke_module_prefix(module_name) + with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name, __package__=pkg_name) as new_module: + # the module we just created should now exist in sys.modules + assert sys.modules.get(module_name) is new_module + assert new_module.__name__ == module_name + + # the module should stick since we didn't raise an exception in the contextmgr + assert sys.modules.get(module_name) is new_module + + # reuse existing module case + with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name, __attr1__=42, blar='yo') as existing_module: + assert sys.modules.get(module_name) is new_module # should be the same module we created earlier + assert hasattr(existing_module, '__package__') and existing_module.__package__ == pkg_name + assert hasattr(existing_module, '__attr1__') and existing_module.__attr1__ == 42 + assert hasattr(existing_module, 'blar') and existing_module.blar == 'yo' + + # exception during update existing shouldn't zap existing module from sys.modules + with pytest.raises(ValueError) as ve: + with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name) as existing_module: + err_to_raise = ValueError('bang') + raise err_to_raise + # make sure we got our error + assert ve.value is err_to_raise + # and that the module still exists + assert sys.modules.get(module_name) is existing_module + + # test module removal after exception during creation + nuke_module_prefix(module_name) + with pytest.raises(ValueError) as ve: + with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name) as new_module: + err_to_raise = ValueError('bang') + raise err_to_raise + # make sure we got our error + assert ve.value is err_to_raise + # and that the module was removed + assert sys.modules.get(module_name) is None + + +def test_iter_modules_impl(): + modules_trailer = 'ansible_collections/testns/testcoll/plugins' + modules_pkg_prefix = modules_trailer.replace('/', '.') + '.' + modules_path = os.path.join(default_test_collection_paths[0], modules_trailer) + modules = list(_iter_modules_impl([modules_path], modules_pkg_prefix)) + + assert modules + assert set([('ansible_collections.testns.testcoll.plugins.action', True), + ('ansible_collections.testns.testcoll.plugins.module_utils', True), + ('ansible_collections.testns.testcoll.plugins.modules', True)]) == set(modules) + + modules_trailer = 'ansible_collections/testns/testcoll/plugins/modules' + modules_pkg_prefix = modules_trailer.replace('/', '.') + '.' + modules_path = os.path.join(default_test_collection_paths[0], modules_trailer) + modules = list(_iter_modules_impl([modules_path], modules_pkg_prefix)) + + assert modules + assert len(modules) == 1 + assert modules[0][0] == 'ansible_collections.testns.testcoll.plugins.modules.amodule' # name + assert modules[0][1] is False # is_pkg + + # FIXME: more + + +# BEGIN IN-CIRCUIT TESTS - these exercise behaviors of the loader when wired up to the import machinery + + +def test_import_from_collection(monkeypatch): + collection_root = os.path.join(os.path.dirname(__file__), 'fixtures', 'collections') + collection_path = os.path.join(collection_root, 'ansible_collections/testns/testcoll/plugins/module_utils/my_util.py') + + # THIS IS UNSTABLE UNDER A DEBUGGER + # the trace we're expecting to be generated when running the code below: + # answer = question() + expected_trace_log = [ + (collection_path, 5, 'call'), + (collection_path, 6, 'line'), + (collection_path, 6, 'return'), + ] + + # define the collection root before any ansible code has been loaded + # otherwise config will have already been loaded and changing the environment will have no effect + monkeypatch.setenv('ANSIBLE_COLLECTIONS_PATH', collection_root) + + finder = _AnsibleCollectionFinder(paths=[collection_root]) + reset_collections_loader_state(finder) + + from ansible_collections.testns.testcoll.plugins.module_utils.my_util import question + + original_trace_function = sys.gettrace() + trace_log = [] + + if original_trace_function: + # enable tracing while preserving the existing trace function (coverage) + def my_trace_function(frame, event, arg): + trace_log.append((frame.f_code.co_filename, frame.f_lineno, event)) + + # the original trace function expects to have itself set as the trace function + sys.settrace(original_trace_function) + # call the original trace function + original_trace_function(frame, event, arg) + # restore our trace function + sys.settrace(my_trace_function) + + return my_trace_function + else: + # no existing trace function, so our trace function is much simpler + def my_trace_function(frame, event, arg): + trace_log.append((frame.f_code.co_filename, frame.f_lineno, event)) + + return my_trace_function + + sys.settrace(my_trace_function) + + try: + # run a minimal amount of code while the trace is running + # adding more code here, including use of a context manager, will add more to our trace + answer = question() + finally: + sys.settrace(original_trace_function) + + # make sure 'import ... as ...' works on builtin synthetic collections + # the following import is not supported (it tries to find module_utils in ansible.plugins) + # import ansible_collections.ansible.builtin.plugins.module_utils as c1 + import ansible_collections.ansible.builtin.plugins.action as c2 + import ansible_collections.ansible.builtin.plugins as c3 + import ansible_collections.ansible.builtin as c4 + import ansible_collections.ansible as c5 + import ansible_collections as c6 + + # make sure 'import ...' works on builtin synthetic collections + import ansible_collections.ansible.builtin.plugins.module_utils + + import ansible_collections.ansible.builtin.plugins.action + assert ansible_collections.ansible.builtin.plugins.action == c3.action == c2 + + import ansible_collections.ansible.builtin.plugins + assert ansible_collections.ansible.builtin.plugins == c4.plugins == c3 + + import ansible_collections.ansible.builtin + assert ansible_collections.ansible.builtin == c5.builtin == c4 + + import ansible_collections.ansible + assert ansible_collections.ansible == c6.ansible == c5 + + import ansible_collections + assert ansible_collections == c6 + + # make sure 'from ... import ...' works on builtin synthetic collections + from ansible_collections.ansible import builtin + from ansible_collections.ansible.builtin import plugins + assert builtin.plugins == plugins + + from ansible_collections.ansible.builtin.plugins import action + from ansible_collections.ansible.builtin.plugins.action import command + assert action.command == command + + from ansible_collections.ansible.builtin.plugins.module_utils import basic + from ansible_collections.ansible.builtin.plugins.module_utils.basic import AnsibleModule + assert basic.AnsibleModule == AnsibleModule + + # make sure relative imports work from collections code + # these require __package__ to be set correctly + import ansible_collections.testns.testcoll.plugins.module_utils.my_other_util + import ansible_collections.testns.testcoll.plugins.action.my_action + + # verify that code loaded from a collection does not inherit __future__ statements from the collection loader + if sys.version_info[0] == 2: + # if the collection code inherits the division future feature from the collection loader this will fail + assert answer == 1 + else: + assert answer == 1.5 + + # verify that the filename and line number reported by the trace is correct + # this makes sure that collection loading preserves file paths and line numbers + assert trace_log == expected_trace_log + + +def test_eventsource(): + es = _EventSource() + # fire when empty should succeed + es.fire(42) + handler1 = MagicMock() + handler2 = MagicMock() + es += handler1 + es.fire(99, my_kwarg='blah') + handler1.assert_called_with(99, my_kwarg='blah') + es += handler2 + es.fire(123, foo='bar') + handler1.assert_called_with(123, foo='bar') + handler2.assert_called_with(123, foo='bar') + es -= handler2 + handler1.reset_mock() + handler2.reset_mock() + es.fire(123, foo='bar') + handler1.assert_called_with(123, foo='bar') + handler2.assert_not_called() + es -= handler1 + handler1.reset_mock() + es.fire('blah', kwarg=None) + handler1.assert_not_called() + handler2.assert_not_called() + es -= handler1 # should succeed silently + handler_bang = MagicMock(side_effect=Exception('bang')) + es += handler_bang + with pytest.raises(Exception) as ex: + es.fire(123) + assert 'bang' in str(ex.value) + handler_bang.assert_called_with(123) + with pytest.raises(ValueError): + es += 42 + + +def test_on_collection_load(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + load_handler = MagicMock() + AnsibleCollectionConfig.on_collection_load += load_handler + + m = import_module('ansible_collections.testns.testcoll') + load_handler.assert_called_once_with(collection_name='testns.testcoll', collection_path=os.path.dirname(m.__file__)) + + _meta = _get_collection_metadata('testns.testcoll') + assert _meta + # FIXME: compare to disk + + finder = get_default_finder() + reset_collections_loader_state(finder) + + AnsibleCollectionConfig.on_collection_load += MagicMock(side_effect=Exception('bang')) + with pytest.raises(Exception) as ex: + import_module('ansible_collections.testns.testcoll') + assert 'bang' in str(ex.value) + + +def test_default_collection_config(): + finder = get_default_finder() + reset_collections_loader_state(finder) + assert AnsibleCollectionConfig.default_collection is None + AnsibleCollectionConfig.default_collection = 'foo.bar' + assert AnsibleCollectionConfig.default_collection == 'foo.bar' + with pytest.raises(ValueError): + AnsibleCollectionConfig.default_collection = 'bar.baz' + + +def test_default_collection_detection(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + # we're clearly not under a collection path + assert _get_collection_name_from_path('/') is None + + # something that looks like a collection path but isn't importable by our finder + assert _get_collection_name_from_path('/foo/ansible_collections/bogusns/boguscoll/bar') is None + + # legit, at the top of the collection + live_collection_path = os.path.join(os.path.dirname(__file__), 'fixtures/collections/ansible_collections/testns/testcoll') + assert _get_collection_name_from_path(live_collection_path) == 'testns.testcoll' + + # legit, deeper inside the collection + live_collection_deep_path = os.path.join(live_collection_path, 'plugins/modules') + assert _get_collection_name_from_path(live_collection_deep_path) == 'testns.testcoll' + + # this one should be hidden by the real testns.testcoll, so should not resolve + masked_collection_path = os.path.join(os.path.dirname(__file__), 'fixtures/collections_masked/ansible_collections/testns/testcoll') + assert _get_collection_name_from_path(masked_collection_path) is None + + +@pytest.mark.parametrize( + 'role_name,collection_list,expected_collection_name,expected_path_suffix', + [ + ('some_role', ['testns.testcoll', 'ansible.bogus'], 'testns.testcoll', 'testns/testcoll/roles/some_role'), + ('testns.testcoll.some_role', ['ansible.bogus', 'testns.testcoll'], 'testns.testcoll', 'testns/testcoll/roles/some_role'), + ('testns.testcoll.some_role', [], 'testns.testcoll', 'testns/testcoll/roles/some_role'), + ('testns.testcoll.some_role', None, 'testns.testcoll', 'testns/testcoll/roles/some_role'), + ('some_role', [], None, None), + ('some_role', None, None, None), + ]) +def test_collection_role_name_location(role_name, collection_list, expected_collection_name, expected_path_suffix): + finder = get_default_finder() + reset_collections_loader_state(finder) + + expected_path = None + if expected_path_suffix: + expected_path = os.path.join(os.path.dirname(__file__), 'fixtures/collections/ansible_collections', expected_path_suffix) + + found = _get_collection_role_path(role_name, collection_list) + + if found: + assert found[0] == role_name.rpartition('.')[2] + assert found[1] == expected_path + assert found[2] == expected_collection_name + else: + assert expected_collection_name is None and expected_path_suffix is None + + +def test_bogus_imports(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + # ensure ImportError on known-bogus imports + bogus_imports = ['bogus_toplevel', 'ansible_collections.bogusns', 'ansible_collections.testns.boguscoll', + 'ansible_collections.testns.testcoll.bogussub', 'ansible_collections.ansible.builtin.bogussub'] + for bogus_import in bogus_imports: + with pytest.raises(ImportError): + import_module(bogus_import) + + +def test_empty_vs_no_code(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + from ansible_collections.testns import testcoll # synthetic package with no code on disk + from ansible_collections.testns.testcoll.plugins import module_utils # real package with empty code file + + # ensure synthetic packages have no code object at all (prevent bogus coverage entries) + assert testcoll.__loader__.get_source(testcoll.__name__) is None + assert testcoll.__loader__.get_code(testcoll.__name__) is None + + # ensure empty package inits do have a code object + assert module_utils.__loader__.get_source(module_utils.__name__) == b'' + assert module_utils.__loader__.get_code(module_utils.__name__) is not None + + +def test_finder_playbook_paths(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + import ansible_collections + import ansible_collections.ansible + import ansible_collections.testns + + # ensure the package modules look like we expect + assert hasattr(ansible_collections, '__path__') and len(ansible_collections.__path__) > 0 + assert hasattr(ansible_collections.ansible, '__path__') and len(ansible_collections.ansible.__path__) > 0 + assert hasattr(ansible_collections.testns, '__path__') and len(ansible_collections.testns.__path__) > 0 + + # these shouldn't be visible yet, since we haven't added the playbook dir + with pytest.raises(ImportError): + import ansible_collections.ansible.playbook_adj_other + + with pytest.raises(ImportError): + import ansible_collections.testns.playbook_adj_other + + assert AnsibleCollectionConfig.playbook_paths == [] + playbook_path_fixture_dir = os.path.join(os.path.dirname(__file__), 'fixtures/playbook_path') + + # configure the playbook paths + AnsibleCollectionConfig.playbook_paths = [playbook_path_fixture_dir] + + # playbook paths go to the front of the line + assert AnsibleCollectionConfig.collection_paths[0] == os.path.join(playbook_path_fixture_dir, 'collections') + + # playbook paths should be updated on the existing root ansible_collections path, as well as on the 'ansible' namespace (but no others!) + assert ansible_collections.__path__[0] == os.path.join(playbook_path_fixture_dir, 'collections/ansible_collections') + assert ansible_collections.ansible.__path__[0] == os.path.join(playbook_path_fixture_dir, 'collections/ansible_collections/ansible') + assert all('playbook_path' not in p for p in ansible_collections.testns.__path__) + + # should succeed since we fixed up the package path + import ansible_collections.ansible.playbook_adj_other + # should succeed since we didn't import freshns before hacking in the path + import ansible_collections.freshns.playbook_adj_other + # should fail since we've already imported something from this path and didn't fix up its package path + with pytest.raises(ImportError): + import ansible_collections.testns.playbook_adj_other + + +def test_toplevel_iter_modules(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + modules = list(pkgutil.iter_modules(default_test_collection_paths, '')) + assert len(modules) == 1 + assert modules[0][1] == 'ansible_collections' + + +def test_iter_modules_namespaces(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + paths = extend_paths(default_test_collection_paths, 'ansible_collections') + modules = list(pkgutil.iter_modules(paths, 'ansible_collections.')) + assert len(modules) == 2 + assert all(m[2] is True for m in modules) + assert all(isinstance(m[0], _AnsiblePathHookFinder) for m in modules) + assert set(['ansible_collections.testns', 'ansible_collections.ansible']) == set(m[1] for m in modules) + + +def test_collection_get_data(): + finder = get_default_finder() + reset_collections_loader_state(finder) + + # something that's there + d = pkgutil.get_data('ansible_collections.testns.testcoll', 'plugins/action/my_action.py') + assert b'hello from my_action.py' in d + + # something that's not there + d = pkgutil.get_data('ansible_collections.testns.testcoll', 'bogus/bogus') + assert d is None + + with pytest.raises(ValueError): + plugins_pkg = import_module('ansible_collections.ansible.builtin') + assert not os.path.exists(os.path.dirname(plugins_pkg.__file__)) + d = pkgutil.get_data('ansible_collections.ansible.builtin', 'plugins/connection/local.py') + + +@pytest.mark.parametrize( + 'ref,ref_type,expected_collection,expected_subdirs,expected_resource,expected_python_pkg_name', + [ + ('ns.coll.myaction', 'action', 'ns.coll', '', 'myaction', 'ansible_collections.ns.coll.plugins.action'), + ('ns.coll.subdir1.subdir2.myaction', 'action', 'ns.coll', 'subdir1.subdir2', 'myaction', 'ansible_collections.ns.coll.plugins.action.subdir1.subdir2'), + ('ns.coll.myrole', 'role', 'ns.coll', '', 'myrole', 'ansible_collections.ns.coll.roles.myrole'), + ('ns.coll.subdir1.subdir2.myrole', 'role', 'ns.coll', 'subdir1.subdir2', 'myrole', 'ansible_collections.ns.coll.roles.subdir1.subdir2.myrole'), + ]) +def test_fqcr_parsing_valid(ref, ref_type, expected_collection, + expected_subdirs, expected_resource, expected_python_pkg_name): + assert AnsibleCollectionRef.is_valid_fqcr(ref, ref_type) + + r = AnsibleCollectionRef.from_fqcr(ref, ref_type) + assert r.collection == expected_collection + assert r.subdirs == expected_subdirs + assert r.resource == expected_resource + assert r.n_python_package_name == expected_python_pkg_name + + r = AnsibleCollectionRef.try_parse_fqcr(ref, ref_type) + assert r.collection == expected_collection + assert r.subdirs == expected_subdirs + assert r.resource == expected_resource + assert r.n_python_package_name == expected_python_pkg_name + + +@pytest.mark.parametrize( + 'ref,ref_type,expected_error_type,expected_error_expression', + [ + ('no_dots_at_all_action', 'action', ValueError, 'is not a valid collection reference'), + ('no_nscoll.myaction', 'action', ValueError, 'is not a valid collection reference'), + ('ns.coll.myaction', 'bogus', ValueError, 'invalid collection ref_type'), + ]) +def test_fqcr_parsing_invalid(ref, ref_type, expected_error_type, expected_error_expression): + assert not AnsibleCollectionRef.is_valid_fqcr(ref, ref_type) + + with pytest.raises(expected_error_type) as curerr: + AnsibleCollectionRef.from_fqcr(ref, ref_type) + + assert re.search(expected_error_expression, str(curerr.value)) + + r = AnsibleCollectionRef.try_parse_fqcr(ref, ref_type) + assert r is None + + +@pytest.mark.parametrize( + 'name,subdirs,resource,ref_type,python_pkg_name', + [ + ('ns.coll', None, 'res', 'doc_fragments', 'ansible_collections.ns.coll.plugins.doc_fragments'), + ('ns.coll', 'subdir1', 'res', 'doc_fragments', 'ansible_collections.ns.coll.plugins.doc_fragments.subdir1'), + ('ns.coll', 'subdir1.subdir2', 'res', 'action', 'ansible_collections.ns.coll.plugins.action.subdir1.subdir2'), + ]) +def test_collectionref_components_valid(name, subdirs, resource, ref_type, python_pkg_name): + x = AnsibleCollectionRef(name, subdirs, resource, ref_type) + + assert x.collection == name + if subdirs: + assert x.subdirs == subdirs + else: + assert x.subdirs == '' + + assert x.resource == resource + assert x.ref_type == ref_type + assert x.n_python_package_name == python_pkg_name + + +@pytest.mark.parametrize( + 'dirname,expected_result', + [ + ('become_plugins', 'become'), + ('cache_plugins', 'cache'), + ('connection_plugins', 'connection'), + ('library', 'modules'), + ('filter_plugins', 'filter'), + ('bogus_plugins', ValueError), + (None, ValueError) + ] +) +def test_legacy_plugin_dir_to_plugin_type(dirname, expected_result): + if isinstance(expected_result, string_types): + assert AnsibleCollectionRef.legacy_plugin_dir_to_plugin_type(dirname) == expected_result + else: + with pytest.raises(expected_result): + AnsibleCollectionRef.legacy_plugin_dir_to_plugin_type(dirname) + + +@pytest.mark.parametrize( + 'name,subdirs,resource,ref_type,expected_error_type,expected_error_expression', + [ + ('bad_ns', '', 'resource', 'action', ValueError, 'invalid collection name'), + ('ns.coll.', '', 'resource', 'action', ValueError, 'invalid collection name'), + ('ns.coll', 'badsubdir#', 'resource', 'action', ValueError, 'invalid subdirs entry'), + ('ns.coll', 'badsubdir.', 'resource', 'action', ValueError, 'invalid subdirs entry'), + ('ns.coll', '.badsubdir', 'resource', 'action', ValueError, 'invalid subdirs entry'), + ('ns.coll', '', 'resource', 'bogus', ValueError, 'invalid collection ref_type'), + ]) +def test_collectionref_components_invalid(name, subdirs, resource, ref_type, expected_error_type, expected_error_expression): + with pytest.raises(expected_error_type) as curerr: + AnsibleCollectionRef(name, subdirs, resource, ref_type) + + assert re.search(expected_error_expression, str(curerr.value)) + + +# BEGIN TEST SUPPORT + +default_test_collection_paths = [ + os.path.join(os.path.dirname(__file__), 'fixtures', 'collections'), + os.path.join(os.path.dirname(__file__), 'fixtures', 'collections_masked'), + '/bogus/bogussub' +] + + +def get_default_finder(): + return _AnsibleCollectionFinder(paths=default_test_collection_paths) + + +def extend_paths(path_list, suffix): + suffix = suffix.replace('.', '/') + return [os.path.join(p, suffix) for p in path_list] + + +def nuke_module_prefix(prefix): + for module_to_nuke in [m for m in sys.modules if m.startswith(prefix)]: + sys.modules.pop(module_to_nuke) + + +def reset_collections_loader_state(metapath_finder=None): + _AnsibleCollectionFinder._remove() + + nuke_module_prefix('ansible_collections') + nuke_module_prefix('ansible.modules') + nuke_module_prefix('ansible.plugins') + + # FIXME: better to move this someplace else that gets cleaned up automatically? + _AnsibleCollectionLoader._redirected_package_map = {} + + AnsibleCollectionConfig._default_collection = None + AnsibleCollectionConfig._on_collection_load = _EventSource() + + if metapath_finder: + metapath_finder._install() diff --git a/test/units/utils/display/test_display.py b/test/units/utils/display/test_display.py new file mode 100644 index 00000000..cdeb4966 --- /dev/null +++ b/test/units/utils/display/test_display.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2020 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +from ansible.utils.display import Display + + +def test_display_basic_message(capsys, mocker): + # Disable logging + mocker.patch('ansible.utils.display.logger', return_value=None) + + d = Display() + d.display(u'Some displayed message') + out, err = capsys.readouterr() + assert out == 'Some displayed message\n' + assert err == '' diff --git a/test/units/utils/display/test_logger.py b/test/units/utils/display/test_logger.py new file mode 100644 index 00000000..ed69393b --- /dev/null +++ b/test/units/utils/display/test_logger.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2020 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import logging +import sys + + +def test_logger(): + ''' + Avoid CVE-2019-14846 as 3rd party libs will disclose secrets when + logging is set to DEBUG + ''' + + # clear loaded modules to have unadultered test. + for loaded in list(sys.modules.keys()): + if 'ansible' in loaded: + del sys.modules[loaded] + + # force logger to exist via config + from ansible import constants as C + C.DEFAULT_LOG_PATH = '/dev/null' + + # initialize logger + from ansible.utils.display import logger + + assert logger.root.level != logging.DEBUG diff --git a/test/units/utils/display/test_warning.py b/test/units/utils/display/test_warning.py new file mode 100644 index 00000000..be63c348 --- /dev/null +++ b/test/units/utils/display/test_warning.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2020 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import pytest + +from ansible.utils.display import Display + + +@pytest.fixture +def warning_message(): + warning_message = 'bad things will happen' + expected_warning_message = '[WARNING]: {0}\n'.format(warning_message) + return warning_message, expected_warning_message + + +def test_warning(capsys, mocker, warning_message): + warning_message, expected_warning_message = warning_message + + mocker.patch('ansible.utils.color.ANSIBLE_COLOR', True) + mocker.patch('ansible.utils.color.parsecolor', return_value=u'1;35') # value for 'bright purple' + + d = Display() + d.warning(warning_message) + out, err = capsys.readouterr() + assert d._warns == {expected_warning_message: 1} + assert err == '\x1b[1;35m{0}\x1b[0m\n'.format(expected_warning_message.rstrip('\n')) + + +def test_warning_no_color(capsys, mocker, warning_message): + warning_message, expected_warning_message = warning_message + + mocker.patch('ansible.utils.color.ANSIBLE_COLOR', False) + + d = Display() + d.warning(warning_message) + out, err = capsys.readouterr() + assert d._warns == {expected_warning_message: 1} + assert err == expected_warning_message diff --git a/test/units/utils/test_cleanup_tmp_file.py b/test/units/utils/test_cleanup_tmp_file.py new file mode 100644 index 00000000..2a44a55b --- /dev/null +++ b/test/units/utils/test_cleanup_tmp_file.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2019, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import pytest +import tempfile + +from ansible.utils.path import cleanup_tmp_file + + +def raise_error(): + raise OSError + + +def test_cleanup_tmp_file_file(): + tmp_fd, tmp = tempfile.mkstemp() + cleanup_tmp_file(tmp) + + assert not os.path.exists(tmp) + + +def test_cleanup_tmp_file_dir(): + tmp = tempfile.mkdtemp() + cleanup_tmp_file(tmp) + + assert not os.path.exists(tmp) + + +def test_cleanup_tmp_file_nonexistant(): + assert None is cleanup_tmp_file('nope') + + +def test_cleanup_tmp_file_failure(mocker): + tmp = tempfile.mkdtemp() + with pytest.raises(Exception): + mocker.patch('shutil.rmtree', side_effect=raise_error()) + cleanup_tmp_file(tmp) + + +def test_cleanup_tmp_file_failure_warning(mocker, capsys): + tmp = tempfile.mkdtemp() + with pytest.raises(Exception): + mocker.patch('shutil.rmtree', side_effect=raise_error()) + cleanup_tmp_file(tmp, warn=True) diff --git a/test/units/utils/test_context_objects.py b/test/units/utils/test_context_objects.py new file mode 100644 index 00000000..c56a41d0 --- /dev/null +++ b/test/units/utils/test_context_objects.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2018, Toshio Kuratomi <tkuratomi@ansible.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import argparse + +import pytest + +from ansible.module_utils.common.collections import ImmutableDict +from ansible.utils import context_objects as co + + +MAKE_IMMUTABLE_DATA = ((u'くらとみ', u'くらとみ'), + (42, 42), + ({u'café': u'くらとみ'}, ImmutableDict({u'café': u'くらとみ'})), + ([1, u'café', u'くらとみ'], (1, u'café', u'くらとみ')), + (set((1, u'café', u'くらとみ')), frozenset((1, u'café', u'くらとみ'))), + ({u'café': [1, set(u'ñ')]}, + ImmutableDict({u'café': (1, frozenset(u'ñ'))})), + ([set((1, 2)), {u'くらとみ': 3}], + (frozenset((1, 2)), ImmutableDict({u'くらとみ': 3}))), + ) + + +@pytest.mark.parametrize('data, expected', MAKE_IMMUTABLE_DATA) +def test_make_immutable(data, expected): + assert co._make_immutable(data) == expected + + +def test_cliargs_from_dict(): + old_dict = {'tags': [u'production', u'webservers'], + 'check_mode': True, + 'start_at_task': u'Start with くらとみ'} + expected = frozenset((('tags', (u'production', u'webservers')), + ('check_mode', True), + ('start_at_task', u'Start with くらとみ'))) + + assert frozenset(co.CLIArgs(old_dict).items()) == expected + + +def test_cliargs(): + class FakeOptions: + pass + options = FakeOptions() + options.tags = [u'production', u'webservers'] + options.check_mode = True + options.start_at_task = u'Start with くらとみ' + + expected = frozenset((('tags', (u'production', u'webservers')), + ('check_mode', True), + ('start_at_task', u'Start with くらとみ'))) + + assert frozenset(co.CLIArgs.from_options(options).items()) == expected + + +def test_cliargs_argparse(): + parser = argparse.ArgumentParser(description='Process some integers.') + parser.add_argument('integers', metavar='N', type=int, nargs='+', + help='an integer for the accumulator') + parser.add_argument('--sum', dest='accumulate', action='store_const', + const=sum, default=max, + help='sum the integers (default: find the max)') + args = parser.parse_args([u'--sum', u'1', u'2']) + + expected = frozenset((('accumulate', sum), ('integers', (1, 2)))) + + assert frozenset(co.CLIArgs.from_options(args).items()) == expected diff --git a/test/units/utils/test_encrypt.py b/test/units/utils/test_encrypt.py new file mode 100644 index 00000000..2cbe828a --- /dev/null +++ b/test/units/utils/test_encrypt.py @@ -0,0 +1,168 @@ +# (c) 2018, Matthias Fuchs <matthias.s.fuchs@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import sys + +import pytest + +from ansible.errors import AnsibleError, AnsibleFilterError +from ansible.plugins.filter.core import get_encrypted_password +from ansible.utils import encrypt + + +class passlib_off(object): + def __init__(self): + self.orig = encrypt.PASSLIB_AVAILABLE + + def __enter__(self): + encrypt.PASSLIB_AVAILABLE = False + return self + + def __exit__(self, exception_type, exception_value, traceback): + encrypt.PASSLIB_AVAILABLE = self.orig + + +def assert_hash(expected, secret, algorithm, **settings): + + if encrypt.PASSLIB_AVAILABLE: + assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected + assert encrypt.PasslibHash(algorithm).hash(secret, **settings) == expected + else: + assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected + with pytest.raises(AnsibleError) as excinfo: + encrypt.PasslibHash(algorithm).hash(secret, **settings) + assert excinfo.value.args[0] == "passlib must be installed to hash with '%s'" % algorithm + + +@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib') +def test_encrypt_with_rounds_no_passlib(): + with passlib_off(): + assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7", + secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000) + assert_hash("$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/", + secret="123", algorithm="sha256_crypt", salt="12345678", rounds=10000) + assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.", + secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000) + + +# If passlib is not installed. this is identical to the test_encrypt_with_rounds_no_passlib() test +@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test') +def test_encrypt_with_rounds(): + assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7", + secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000) + assert_hash("$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/", + secret="123", algorithm="sha256_crypt", salt="12345678", rounds=10000) + assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.", + secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000) + + +@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib') +def test_encrypt_default_rounds_no_passlib(): + with passlib_off(): + assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/", + secret="123", algorithm="md5_crypt", salt="12345678") + assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7", + secret="123", algorithm="sha256_crypt", salt="12345678") + assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.", + secret="123", algorithm="sha512_crypt", salt="12345678") + + assert encrypt.CryptHash("md5_crypt").hash("123") + + +# If passlib is not installed. this is identical to the test_encrypt_default_rounds_no_passlib() test +@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test') +def test_encrypt_default_rounds(): + assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/", + secret="123", algorithm="md5_crypt", salt="12345678") + assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7", + secret="123", algorithm="sha256_crypt", salt="12345678") + assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.", + secret="123", algorithm="sha512_crypt", salt="12345678") + + assert encrypt.PasslibHash("md5_crypt").hash("123") + + +@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib') +def test_password_hash_filter_no_passlib(): + with passlib_off(): + assert not encrypt.PASSLIB_AVAILABLE + assert get_encrypted_password("123", "md5", salt="12345678") == "$1$12345678$tRy4cXc3kmcfRZVj4iFXr/" + + with pytest.raises(AnsibleFilterError): + get_encrypted_password("123", "crypt16", salt="12") + + +def test_password_hash_filter_passlib(): + if not encrypt.PASSLIB_AVAILABLE: + pytest.skip("passlib not available") + + with pytest.raises(AnsibleFilterError): + get_encrypted_password("123", "sha257", salt="12345678") + + # Uses 5000 rounds by default for sha256 matching crypt behaviour + assert get_encrypted_password("123", "sha256", salt="12345678") == "$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7" + assert get_encrypted_password("123", "sha256", salt="12345678", rounds=5000) == "$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7" + + assert (get_encrypted_password("123", "sha256", salt="12345678", rounds=10000) == + "$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/") + + assert (get_encrypted_password("123", "sha512", salt="12345678", rounds=6000) == + "$6$rounds=6000$12345678$l/fC67BdJwZrJ7qneKGP1b6PcatfBr0dI7W6JLBrsv8P1wnv/0pu4WJsWq5p6WiXgZ2gt9Aoir3MeORJxg4.Z/") + + assert (get_encrypted_password("123", "sha512", salt="12345678", rounds=5000) == + "$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.") + + assert get_encrypted_password("123", "crypt16", salt="12") == "12pELHK2ME3McUFlHxel6uMM" + + # Try algorithm that uses a raw salt + assert get_encrypted_password("123", "pbkdf2_sha256") + + +@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib') +def test_do_encrypt_no_passlib(): + with passlib_off(): + assert not encrypt.PASSLIB_AVAILABLE + assert encrypt.do_encrypt("123", "md5_crypt", salt="12345678") == "$1$12345678$tRy4cXc3kmcfRZVj4iFXr/" + + with pytest.raises(AnsibleError): + encrypt.do_encrypt("123", "crypt16", salt="12") + + +def test_do_encrypt_passlib(): + if not encrypt.PASSLIB_AVAILABLE: + pytest.skip("passlib not available") + + with pytest.raises(AnsibleError): + encrypt.do_encrypt("123", "sha257_crypt", salt="12345678") + + # Uses 5000 rounds by default for sha256 matching crypt behaviour. + assert encrypt.do_encrypt("123", "sha256_crypt", salt="12345678") == "$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7" + + assert encrypt.do_encrypt("123", "md5_crypt", salt="12345678") == "$1$12345678$tRy4cXc3kmcfRZVj4iFXr/" + + assert encrypt.do_encrypt("123", "crypt16", salt="12") == "12pELHK2ME3McUFlHxel6uMM" + + +def test_random_salt(): + res = encrypt.random_salt() + expected_salt_candidate_chars = u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789./' + assert len(res) == 8 + for res_char in res: + assert res_char in expected_salt_candidate_chars diff --git a/test/units/utils/test_helpers.py b/test/units/utils/test_helpers.py new file mode 100644 index 00000000..ec37b39b --- /dev/null +++ b/test/units/utils/test_helpers.py @@ -0,0 +1,34 @@ +# (c) 2015, Marius Gedminas <marius@gedmin.as> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest + +from ansible.utils.helpers import pct_to_int + + +class TestHelpers(unittest.TestCase): + + def test_pct_to_int(self): + self.assertEqual(pct_to_int(1, 100), 1) + self.assertEqual(pct_to_int(-1, 100), -1) + self.assertEqual(pct_to_int("1%", 10), 1) + self.assertEqual(pct_to_int("1%", 10, 0), 0) + self.assertEqual(pct_to_int("1", 100), 1) + self.assertEqual(pct_to_int("10%", 100), 10) diff --git a/test/units/utils/test_isidentifier.py b/test/units/utils/test_isidentifier.py new file mode 100644 index 00000000..de6de642 --- /dev/null +++ b/test/units/utils/test_isidentifier.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2020 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible.utils.vars import isidentifier + + +# Originally posted at: http://stackoverflow.com/a/29586366 + + +@pytest.mark.parametrize( + "identifier", [ + "foo", "foo1_23", + ] +) +def test_valid_identifier(identifier): + assert isidentifier(identifier) + + +@pytest.mark.parametrize( + "identifier", [ + "pass", "foo ", " foo", "1234", "1234abc", "", " ", "foo bar", "no-dashed-names-for-you", + ] +) +def test_invalid_identifier(identifier): + assert not isidentifier(identifier) + + +def test_keywords_not_in_PY2(): + """In Python 2 ("True", "False", "None") are not keywords. The isidentifier + method ensures that those are treated as keywords on both Python 2 and 3. + """ + assert not isidentifier("True") + assert not isidentifier("False") + assert not isidentifier("None") + + +def test_non_ascii(): + """In Python 3 non-ascii characters are allowed as opposed to Python 2. The + isidentifier method ensures that those are treated as keywords on both + Python 2 and 3. + """ + assert not isidentifier("křížek") diff --git a/test/units/utils/test_plugin_docs.py b/test/units/utils/test_plugin_docs.py new file mode 100644 index 00000000..ff973b1e --- /dev/null +++ b/test/units/utils/test_plugin_docs.py @@ -0,0 +1,333 @@ +# -*- coding: utf-8 -*- +# (c) 2020 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import copy + +import pytest + +from ansible.utils.plugin_docs import ( + add_collection_to_versions_and_dates, +) + + +ADD_TESTS = [ + ( + # Module options + True, + False, + { + 'author': 'x', + 'version_added': '1.0.0', + 'deprecated': { + 'removed_in': '2.0.0', + }, + 'options': { + 'test': { + 'description': '', + 'type': 'str', + 'version_added': '1.1.0', + 'deprecated': { + # should not be touched since this isn't a plugin + 'removed_in': '2.0.0', + }, + 'env': [ + # should not be touched since this isn't a plugin + { + 'version_added': '1.3.0', + 'deprecated': { + 'version': '2.0.0', + }, + }, + ], + 'ini': [ + # should not be touched since this isn't a plugin + { + 'version_added': '1.3.0', + 'deprecated': { + 'version': '2.0.0', + }, + }, + ], + 'vars': [ + # should not be touched since this isn't a plugin + { + 'version_added': '1.3.0', + 'deprecated': { + 'removed_at_date': '2020-01-01', + }, + }, + ], + }, + 'subtest': { + 'description': '', + 'type': 'dict', + 'deprecated': { + # should not be touched since this isn't a plugin + 'version': '2.0.0', + }, + 'suboptions': { + 'suboption': { + 'description': '', + 'type': 'int', + 'version_added': '1.2.0', + } + }, + } + }, + }, + { + 'author': 'x', + 'version_added': '1.0.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + 'removed_in': '2.0.0', + 'removed_from_collection': 'foo.bar', + }, + 'options': { + 'test': { + 'description': '', + 'type': 'str', + 'version_added': '1.1.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + # should not be touched since this isn't a plugin + 'removed_in': '2.0.0', + }, + 'env': [ + # should not be touched since this isn't a plugin + { + 'version_added': '1.3.0', + 'deprecated': { + 'version': '2.0.0', + }, + }, + ], + 'ini': [ + # should not be touched since this isn't a plugin + { + 'version_added': '1.3.0', + 'deprecated': { + 'version': '2.0.0', + }, + }, + ], + 'vars': [ + # should not be touched since this isn't a plugin + { + 'version_added': '1.3.0', + 'deprecated': { + 'removed_at_date': '2020-01-01', + }, + }, + ], + }, + 'subtest': { + 'description': '', + 'type': 'dict', + 'deprecated': { + # should not be touched since this isn't a plugin + 'version': '2.0.0', + }, + 'suboptions': { + 'suboption': { + 'description': '', + 'type': 'int', + 'version_added': '1.2.0', + 'version_added_collection': 'foo.bar', + } + }, + } + }, + }, + ), + ( + # Module options + True, + False, + { + 'author': 'x', + 'deprecated': { + 'removed_at_date': '2020-01-01', + }, + }, + { + 'author': 'x', + 'deprecated': { + 'removed_at_date': '2020-01-01', + 'removed_from_collection': 'foo.bar', + }, + }, + ), + ( + # Plugin options + False, + False, + { + 'author': 'x', + 'version_added': '1.0.0', + 'deprecated': { + 'removed_in': '2.0.0', + }, + 'options': { + 'test': { + 'description': '', + 'type': 'str', + 'version_added': '1.1.0', + 'deprecated': { + # should not be touched since this is the wrong name + 'removed_in': '2.0.0', + }, + 'env': [ + { + 'version_added': '1.3.0', + 'deprecated': { + 'version': '2.0.0', + }, + }, + ], + 'ini': [ + { + 'version_added': '1.3.0', + 'deprecated': { + 'version': '2.0.0', + }, + }, + ], + 'vars': [ + { + 'version_added': '1.3.0', + 'deprecated': { + 'removed_at_date': '2020-01-01', + }, + }, + ], + }, + 'subtest': { + 'description': '', + 'type': 'dict', + 'deprecated': { + 'version': '2.0.0', + }, + 'suboptions': { + 'suboption': { + 'description': '', + 'type': 'int', + 'version_added': '1.2.0', + } + }, + } + }, + }, + { + 'author': 'x', + 'version_added': '1.0.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + 'removed_in': '2.0.0', + 'removed_from_collection': 'foo.bar', + }, + 'options': { + 'test': { + 'description': '', + 'type': 'str', + 'version_added': '1.1.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + # should not be touched since this is the wrong name + 'removed_in': '2.0.0', + }, + 'env': [ + { + 'version_added': '1.3.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + 'version': '2.0.0', + 'collection_name': 'foo.bar', + }, + }, + ], + 'ini': [ + { + 'version_added': '1.3.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + 'version': '2.0.0', + 'collection_name': 'foo.bar', + }, + }, + ], + 'vars': [ + { + 'version_added': '1.3.0', + 'version_added_collection': 'foo.bar', + 'deprecated': { + 'removed_at_date': '2020-01-01', + 'collection_name': 'foo.bar', + }, + }, + ], + }, + 'subtest': { + 'description': '', + 'type': 'dict', + 'deprecated': { + 'version': '2.0.0', + 'collection_name': 'foo.bar', + }, + 'suboptions': { + 'suboption': { + 'description': '', + 'type': 'int', + 'version_added': '1.2.0', + 'version_added_collection': 'foo.bar', + } + }, + } + }, + }, + ), + ( + # Return values + True, # this value is is ignored + True, + { + 'rv1': { + 'version_added': '1.0.0', + 'type': 'dict', + 'contains': { + 'srv1': { + 'version_added': '1.1.0', + }, + 'srv2': { + }, + } + }, + }, + { + 'rv1': { + 'version_added': '1.0.0', + 'version_added_collection': 'foo.bar', + 'type': 'dict', + 'contains': { + 'srv1': { + 'version_added': '1.1.0', + 'version_added_collection': 'foo.bar', + }, + 'srv2': { + }, + } + }, + }, + ), +] + + +@pytest.mark.parametrize('is_module,return_docs,fragment,expected_fragment', ADD_TESTS) +def test_add(is_module, return_docs, fragment, expected_fragment): + fragment_copy = copy.deepcopy(fragment) + add_collection_to_versions_and_dates(fragment_copy, 'foo.bar', is_module, return_docs) + assert fragment_copy == expected_fragment diff --git a/test/units/utils/test_shlex.py b/test/units/utils/test_shlex.py new file mode 100644 index 00000000..e13d302d --- /dev/null +++ b/test/units/utils/test_shlex.py @@ -0,0 +1,41 @@ +# (c) 2015, Marius Gedminas <marius@gedmin.as> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest + +from ansible.utils.shlex import shlex_split + + +class TestSplit(unittest.TestCase): + + def test_trivial(self): + self.assertEqual(shlex_split("a b c"), ["a", "b", "c"]) + + def test_unicode(self): + self.assertEqual(shlex_split(u"a b \u010D"), [u"a", u"b", u"\u010D"]) + + def test_quoted(self): + self.assertEqual(shlex_split('"a b" c'), ["a b", "c"]) + + def test_comments(self): + self.assertEqual(shlex_split('"a b" c # d', comments=True), ["a b", "c"]) + + def test_error(self): + self.assertRaises(ValueError, shlex_split, 'a "b') diff --git a/test/units/utils/test_unsafe_proxy.py b/test/units/utils/test_unsafe_proxy.py new file mode 100644 index 00000000..205c0c65 --- /dev/null +++ b/test/units/utils/test_unsafe_proxy.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +# (c) 2018 Matt Martz <matt@sivel.net> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from ansible.module_utils.six import PY3 +from ansible.utils.unsafe_proxy import AnsibleUnsafe, AnsibleUnsafeBytes, AnsibleUnsafeText, wrap_var + + +def test_wrap_var_text(): + assert isinstance(wrap_var(u'foo'), AnsibleUnsafeText) + + +def test_wrap_var_bytes(): + assert isinstance(wrap_var(b'foo'), AnsibleUnsafeBytes) + + +def test_wrap_var_string(): + if PY3: + assert isinstance(wrap_var('foo'), AnsibleUnsafeText) + else: + assert isinstance(wrap_var('foo'), AnsibleUnsafeBytes) + + +def test_wrap_var_dict(): + assert isinstance(wrap_var(dict(foo='bar')), dict) + assert not isinstance(wrap_var(dict(foo='bar')), AnsibleUnsafe) + assert isinstance(wrap_var(dict(foo=u'bar'))['foo'], AnsibleUnsafeText) + + +def test_wrap_var_dict_None(): + assert wrap_var(dict(foo=None))['foo'] is None + assert not isinstance(wrap_var(dict(foo=None))['foo'], AnsibleUnsafe) + + +def test_wrap_var_list(): + assert isinstance(wrap_var(['foo']), list) + assert not isinstance(wrap_var(['foo']), AnsibleUnsafe) + assert isinstance(wrap_var([u'foo'])[0], AnsibleUnsafeText) + + +def test_wrap_var_list_None(): + assert wrap_var([None])[0] is None + assert not isinstance(wrap_var([None])[0], AnsibleUnsafe) + + +def test_wrap_var_set(): + assert isinstance(wrap_var(set(['foo'])), set) + assert not isinstance(wrap_var(set(['foo'])), AnsibleUnsafe) + for item in wrap_var(set([u'foo'])): + assert isinstance(item, AnsibleUnsafeText) + + +def test_wrap_var_set_None(): + for item in wrap_var(set([None])): + assert item is None + assert not isinstance(item, AnsibleUnsafe) + + +def test_wrap_var_tuple(): + assert isinstance(wrap_var(('foo',)), tuple) + assert not isinstance(wrap_var(('foo',)), AnsibleUnsafe) + assert isinstance(wrap_var(('foo',))[0], AnsibleUnsafe) + + +def test_wrap_var_tuple_None(): + assert wrap_var((None,))[0] is None + assert not isinstance(wrap_var((None,))[0], AnsibleUnsafe) + + +def test_wrap_var_None(): + assert wrap_var(None) is None + assert not isinstance(wrap_var(None), AnsibleUnsafe) + + +def test_wrap_var_unsafe_text(): + assert isinstance(wrap_var(AnsibleUnsafeText(u'foo')), AnsibleUnsafeText) + + +def test_wrap_var_unsafe_bytes(): + assert isinstance(wrap_var(AnsibleUnsafeBytes(b'foo')), AnsibleUnsafeBytes) + + +def test_wrap_var_no_ref(): + thing = { + 'foo': { + 'bar': 'baz' + }, + 'bar': ['baz', 'qux'], + 'baz': ('qux',), + 'none': None, + 'text': 'text', + } + wrapped_thing = wrap_var(thing) + thing is not wrapped_thing + thing['foo'] is not wrapped_thing['foo'] + thing['bar'][0] is not wrapped_thing['bar'][0] + thing['baz'][0] is not wrapped_thing['baz'][0] + thing['none'] is not wrapped_thing['none'] + thing['text'] is not wrapped_thing['text'] + + +def test_AnsibleUnsafeText(): + assert isinstance(AnsibleUnsafeText(u'foo'), AnsibleUnsafe) + + +def test_AnsibleUnsafeBytes(): + assert isinstance(AnsibleUnsafeBytes(b'foo'), AnsibleUnsafe) diff --git a/test/units/utils/test_vars.py b/test/units/utils/test_vars.py new file mode 100644 index 00000000..c92ce4b6 --- /dev/null +++ b/test/units/utils/test_vars.py @@ -0,0 +1,282 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2015, Toshio Kuraotmi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from collections import defaultdict + +from units.compat import mock, unittest +from ansible.errors import AnsibleError +from ansible.utils.vars import combine_vars, merge_hash + + +class TestVariableUtils(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + combine_vars_merge_data = ( + dict( + a=dict(a=1), + b=dict(b=2), + result=dict(a=1, b=2), + ), + dict( + a=dict(a=1, c=dict(foo='bar')), + b=dict(b=2, c=dict(baz='bam')), + result=dict(a=1, b=2, c=dict(foo='bar', baz='bam')) + ), + dict( + a=defaultdict(a=1, c=defaultdict(foo='bar')), + b=dict(b=2, c=dict(baz='bam')), + result=defaultdict(a=1, b=2, c=defaultdict(foo='bar', baz='bam')) + ), + ) + combine_vars_replace_data = ( + dict( + a=dict(a=1), + b=dict(b=2), + result=dict(a=1, b=2) + ), + dict( + a=dict(a=1, c=dict(foo='bar')), + b=dict(b=2, c=dict(baz='bam')), + result=dict(a=1, b=2, c=dict(baz='bam')) + ), + dict( + a=defaultdict(a=1, c=dict(foo='bar')), + b=dict(b=2, c=defaultdict(baz='bam')), + result=defaultdict(a=1, b=2, c=defaultdict(baz='bam')) + ), + ) + + def test_combine_vars_improper_args(self): + with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'replace'): + with self.assertRaises(AnsibleError): + combine_vars([1, 2, 3], dict(a=1)) + with self.assertRaises(AnsibleError): + combine_vars(dict(a=1), [1, 2, 3]) + + with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'merge'): + with self.assertRaises(AnsibleError): + combine_vars([1, 2, 3], dict(a=1)) + with self.assertRaises(AnsibleError): + combine_vars(dict(a=1), [1, 2, 3]) + + def test_combine_vars_replace(self): + with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'replace'): + for test in self.combine_vars_replace_data: + self.assertEqual(combine_vars(test['a'], test['b']), test['result']) + + def test_combine_vars_merge(self): + with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'merge'): + for test in self.combine_vars_merge_data: + self.assertEqual(combine_vars(test['a'], test['b']), test['result']) + + merge_hash_data = { + "low_prio": { + "a": { + "a'": { + "x": "low_value", + "y": "low_value", + "list": ["low_value"] + } + }, + "b": [1, 1, 2, 3] + }, + "high_prio": { + "a": { + "a'": { + "y": "high_value", + "z": "high_value", + "list": ["high_value"] + } + }, + "b": [3, 4, 4, {"5": "value"}] + } + } + + def test_merge_hash_simple(self): + for test in self.combine_vars_merge_data: + self.assertEqual(merge_hash(test['a'], test['b']), test['result']) + + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["high_value"] + } + }, + "b": high['b'] + } + self.assertEqual(merge_hash(low, high), expected) + + def test_merge_hash_non_recursive_and_list_replace(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = high + self.assertEqual(merge_hash(low, high, False, 'replace'), expected) + + def test_merge_hash_non_recursive_and_list_keep(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": high['a'], + "b": low['b'] + } + self.assertEqual(merge_hash(low, high, False, 'keep'), expected) + + def test_merge_hash_non_recursive_and_list_append(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": high['a'], + "b": low['b'] + high['b'] + } + self.assertEqual(merge_hash(low, high, False, 'append'), expected) + + def test_merge_hash_non_recursive_and_list_prepend(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": high['a'], + "b": high['b'] + low['b'] + } + self.assertEqual(merge_hash(low, high, False, 'prepend'), expected) + + def test_merge_hash_non_recursive_and_list_append_rp(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": high['a'], + "b": [1, 1, 2] + high['b'] + } + self.assertEqual(merge_hash(low, high, False, 'append_rp'), expected) + + def test_merge_hash_non_recursive_and_list_prepend_rp(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": high['a'], + "b": high['b'] + [1, 1, 2] + } + self.assertEqual(merge_hash(low, high, False, 'prepend_rp'), expected) + + def test_merge_hash_recursive_and_list_replace(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["high_value"] + } + }, + "b": high['b'] + } + self.assertEqual(merge_hash(low, high, True, 'replace'), expected) + + def test_merge_hash_recursive_and_list_keep(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["low_value"] + } + }, + "b": low['b'] + } + self.assertEqual(merge_hash(low, high, True, 'keep'), expected) + + def test_merge_hash_recursive_and_list_append(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["low_value", "high_value"] + } + }, + "b": low['b'] + high['b'] + } + self.assertEqual(merge_hash(low, high, True, 'append'), expected) + + def test_merge_hash_recursive_and_list_prepend(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["high_value", "low_value"] + } + }, + "b": high['b'] + low['b'] + } + self.assertEqual(merge_hash(low, high, True, 'prepend'), expected) + + def test_merge_hash_recursive_and_list_append_rp(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["low_value", "high_value"] + } + }, + "b": [1, 1, 2] + high['b'] + } + self.assertEqual(merge_hash(low, high, True, 'append_rp'), expected) + + def test_merge_hash_recursive_and_list_prepend_rp(self): + low = self.merge_hash_data['low_prio'] + high = self.merge_hash_data['high_prio'] + expected = { + "a": { + "a'": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["high_value", "low_value"] + } + }, + "b": high['b'] + [1, 1, 2] + } + self.assertEqual(merge_hash(low, high, True, 'prepend_rp'), expected) diff --git a/test/units/utils/test_version.py b/test/units/utils/test_version.py new file mode 100644 index 00000000..7d04c112 --- /dev/null +++ b/test/units/utils/test_version.py @@ -0,0 +1,335 @@ +# -*- coding: utf-8 -*- +# (c) 2020 Matt Martz <matt@sivel.net> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from distutils.version import LooseVersion, StrictVersion + +import pytest + +from ansible.utils.version import _Alpha, _Numeric, SemanticVersion + + +EQ = [ + ('1.0.0', '1.0.0', True), + ('1.0.0', '1.0.0-beta', False), + ('1.0.0-beta2+build1', '1.0.0-beta.2+build.1', False), + ('1.0.0-beta+build', '1.0.0-beta+build', True), + ('1.0.0-beta+build1', '1.0.0-beta+build2', True), + ('1.0.0-beta+a', '1.0.0-alpha+bar', False), +] + +NE = [ + ('1.0.0', '1.0.0', False), + ('1.0.0', '1.0.0-beta', True), + ('1.0.0-beta2+build1', '1.0.0-beta.2+build.1', True), + ('1.0.0-beta+build', '1.0.0-beta+build', False), + ('1.0.0-beta+a', '1.0.0-alpha+bar', True), +] + +LT = [ + ('1.0.0', '2.0.0', True), + ('1.0.0-beta', '2.0.0-alpha', True), + ('1.0.0-alpha', '2.0.0-beta', True), + ('1.0.0-alpha', '1.0.0', True), + ('1.0.0-beta', '1.0.0-alpha3', False), + ('1.0.0+foo', '1.0.0-alpha', False), + ('1.0.0-beta.1', '1.0.0-beta.a', True), + ('1.0.0-beta+a', '1.0.0-alpha+bar', False), +] + +GT = [ + ('1.0.0', '2.0.0', False), + ('1.0.0-beta', '2.0.0-alpha', False), + ('1.0.0-alpha', '2.0.0-beta', False), + ('1.0.0-alpha', '1.0.0', False), + ('1.0.0-beta', '1.0.0-alpha3', True), + ('1.0.0+foo', '1.0.0-alpha', True), + ('1.0.0-beta.1', '1.0.0-beta.a', False), + ('1.0.0-beta+a', '1.0.0-alpha+bar', True), +] + +LE = [ + ('1.0.0', '1.0.0', True), + ('1.0.0', '2.0.0', True), + ('1.0.0-alpha', '1.0.0-beta', True), + ('1.0.0-beta', '1.0.0-alpha', False), +] + +GE = [ + ('1.0.0', '1.0.0', True), + ('1.0.0', '2.0.0', False), + ('1.0.0-alpha', '1.0.0-beta', False), + ('1.0.0-beta', '1.0.0-alpha', True), +] + +VALID = [ + "0.0.4", + "1.2.3", + "10.20.30", + "1.1.2-prerelease+meta", + "1.1.2+meta", + "1.1.2+meta-valid", + "1.0.0-alpha", + "1.0.0-beta", + "1.0.0-alpha.beta", + "1.0.0-alpha.beta.1", + "1.0.0-alpha.1", + "1.0.0-alpha0.valid", + "1.0.0-alpha.0valid", + "1.0.0-alpha-a.b-c-somethinglong+build.1-aef.1-its-okay", + "1.0.0-rc.1+build.1", + "2.0.0-rc.1+build.123", + "1.2.3-beta", + "10.2.3-DEV-SNAPSHOT", + "1.2.3-SNAPSHOT-123", + "1.0.0", + "2.0.0", + "1.1.7", + "2.0.0+build.1848", + "2.0.1-alpha.1227", + "1.0.0-alpha+beta", + "1.2.3----RC-SNAPSHOT.12.9.1--.12+788", + "1.2.3----R-S.12.9.1--.12+meta", + "1.2.3----RC-SNAPSHOT.12.9.1--.12", + "1.0.0+0.build.1-rc.10000aaa-kk-0.1", + "99999999999999999999999.999999999999999999.99999999999999999", + "1.0.0-0A.is.legal", +] + +INVALID = [ + "1", + "1.2", + "1.2.3-0123", + "1.2.3-0123.0123", + "1.1.2+.123", + "+invalid", + "-invalid", + "-invalid+invalid", + "-invalid.01", + "alpha", + "alpha.beta", + "alpha.beta.1", + "alpha.1", + "alpha+beta", + "alpha_beta", + "alpha.", + "alpha..", + "beta", + "1.0.0-alpha_beta", + "-alpha.", + "1.0.0-alpha..", + "1.0.0-alpha..1", + "1.0.0-alpha...1", + "1.0.0-alpha....1", + "1.0.0-alpha.....1", + "1.0.0-alpha......1", + "1.0.0-alpha.......1", + "01.1.1", + "1.01.1", + "1.1.01", + "1.2", + "1.2.3.DEV", + "1.2-SNAPSHOT", + "1.2.31.2.3----RC-SNAPSHOT.12.09.1--..12+788", + "1.2-RC-SNAPSHOT", + "-1.0.3-gamma+b7718", + "+justmeta", + "9.8.7+meta+meta", + "9.8.7-whatever+meta+meta", +] + +PRERELEASE = [ + ('1.0.0-alpha', True), + ('1.0.0-alpha.1', True), + ('1.0.0-0.3.7', True), + ('1.0.0-x.7.z.92', True), + ('0.1.2', False), + ('0.1.2+bob', False), + ('1.0.0', False), +] + +STABLE = [ + ('1.0.0-alpha', False), + ('1.0.0-alpha.1', False), + ('1.0.0-0.3.7', False), + ('1.0.0-x.7.z.92', False), + ('0.1.2', False), + ('0.1.2+bob', False), + ('1.0.0', True), + ('1.0.0+bob', True), +] + +LOOSE_VERSION = [ + (LooseVersion('1'), SemanticVersion('1.0.0')), + (LooseVersion('1-alpha'), SemanticVersion('1.0.0-alpha')), + (LooseVersion('1.0.0-alpha+build'), SemanticVersion('1.0.0-alpha+build')), +] + +LOOSE_VERSION_INVALID = [ + LooseVersion('1.a.3'), + LooseVersion(), + 'bar', + StrictVersion('1.2.3'), +] + + +def test_semanticversion_none(): + assert SemanticVersion().major is None + + +@pytest.mark.parametrize('left,right,expected', EQ) +def test_eq(left, right, expected): + assert (SemanticVersion(left) == SemanticVersion(right)) is expected + + +@pytest.mark.parametrize('left,right,expected', NE) +def test_ne(left, right, expected): + assert (SemanticVersion(left) != SemanticVersion(right)) is expected + + +@pytest.mark.parametrize('left,right,expected', LT) +def test_lt(left, right, expected): + assert (SemanticVersion(left) < SemanticVersion(right)) is expected + + +@pytest.mark.parametrize('left,right,expected', LE) +def test_le(left, right, expected): + assert (SemanticVersion(left) <= SemanticVersion(right)) is expected + + +@pytest.mark.parametrize('left,right,expected', GT) +def test_gt(left, right, expected): + assert (SemanticVersion(left) > SemanticVersion(right)) is expected + + +@pytest.mark.parametrize('left,right,expected', GE) +def test_ge(left, right, expected): + assert (SemanticVersion(left) >= SemanticVersion(right)) is expected + + +@pytest.mark.parametrize('value', VALID) +def test_valid(value): + SemanticVersion(value) + + +@pytest.mark.parametrize('value', INVALID) +def test_invalid(value): + pytest.raises(ValueError, SemanticVersion, value) + + +def test_example_precedence(): + # https://semver.org/#spec-item-11 + sv = SemanticVersion + assert sv('1.0.0') < sv('2.0.0') < sv('2.1.0') < sv('2.1.1') + assert sv('1.0.0-alpha') < sv('1.0.0') + assert sv('1.0.0-alpha') < sv('1.0.0-alpha.1') < sv('1.0.0-alpha.beta') + assert sv('1.0.0-beta') < sv('1.0.0-beta.2') < sv('1.0.0-beta.11') < sv('1.0.0-rc.1') < sv('1.0.0') + + +@pytest.mark.parametrize('value,expected', PRERELEASE) +def test_prerelease(value, expected): + assert SemanticVersion(value).is_prerelease is expected + + +@pytest.mark.parametrize('value,expected', STABLE) +def test_stable(value, expected): + assert SemanticVersion(value).is_stable is expected + + +@pytest.mark.parametrize('value,expected', LOOSE_VERSION) +def test_from_loose_version(value, expected): + assert SemanticVersion.from_loose_version(value) == expected + + +@pytest.mark.parametrize('value', LOOSE_VERSION_INVALID) +def test_from_loose_version_invalid(value): + pytest.raises((AttributeError, ValueError), SemanticVersion.from_loose_version, value) + + +def test_comparison_with_string(): + assert SemanticVersion('1.0.0') > '0.1.0' + + +def test_alpha(): + assert _Alpha('a') == _Alpha('a') + assert _Alpha('a') == 'a' + assert _Alpha('a') != _Alpha('b') + assert _Alpha('a') != 1 + assert _Alpha('a') < _Alpha('b') + assert _Alpha('a') < 'c' + assert _Alpha('a') > _Numeric(1) + with pytest.raises(ValueError): + _Alpha('a') < None + assert _Alpha('a') <= _Alpha('a') + assert _Alpha('a') <= _Alpha('b') + assert _Alpha('b') >= _Alpha('a') + assert _Alpha('b') >= _Alpha('b') + + # The following 3*6 tests check that all comparison operators perform + # as expected. DO NOT remove any of them, or reformulate them (to remove + # the explicit `not`)! + + assert _Alpha('a') == _Alpha('a') + assert not _Alpha('a') != _Alpha('a') # pylint: disable=unneeded-not + assert not _Alpha('a') < _Alpha('a') # pylint: disable=unneeded-not + assert _Alpha('a') <= _Alpha('a') + assert not _Alpha('a') > _Alpha('a') # pylint: disable=unneeded-not + assert _Alpha('a') >= _Alpha('a') + + assert not _Alpha('a') == _Alpha('b') # pylint: disable=unneeded-not + assert _Alpha('a') != _Alpha('b') + assert _Alpha('a') < _Alpha('b') + assert _Alpha('a') <= _Alpha('b') + assert not _Alpha('a') > _Alpha('b') # pylint: disable=unneeded-not + assert not _Alpha('a') >= _Alpha('b') # pylint: disable=unneeded-not + + assert not _Alpha('b') == _Alpha('a') # pylint: disable=unneeded-not + assert _Alpha('b') != _Alpha('a') + assert not _Alpha('b') < _Alpha('a') # pylint: disable=unneeded-not + assert not _Alpha('b') <= _Alpha('a') # pylint: disable=unneeded-not + assert _Alpha('b') > _Alpha('a') + assert _Alpha('b') >= _Alpha('a') + + +def test_numeric(): + assert _Numeric(1) == _Numeric(1) + assert _Numeric(1) == 1 + assert _Numeric(1) != _Numeric(2) + assert _Numeric(1) != 'a' + assert _Numeric(1) < _Numeric(2) + assert _Numeric(1) < 3 + assert _Numeric(1) < _Alpha('b') + with pytest.raises(ValueError): + _Numeric(1) < None + assert _Numeric(1) <= _Numeric(1) + assert _Numeric(1) <= _Numeric(2) + assert _Numeric(2) >= _Numeric(1) + assert _Numeric(2) >= _Numeric(2) + + # The following 3*6 tests check that all comparison operators perform + # as expected. DO NOT remove any of them, or reformulate them (to remove + # the explicit `not`)! + + assert _Numeric(1) == _Numeric(1) + assert not _Numeric(1) != _Numeric(1) # pylint: disable=unneeded-not + assert not _Numeric(1) < _Numeric(1) # pylint: disable=unneeded-not + assert _Numeric(1) <= _Numeric(1) + assert not _Numeric(1) > _Numeric(1) # pylint: disable=unneeded-not + assert _Numeric(1) >= _Numeric(1) + + assert not _Numeric(1) == _Numeric(2) # pylint: disable=unneeded-not + assert _Numeric(1) != _Numeric(2) + assert _Numeric(1) < _Numeric(2) + assert _Numeric(1) <= _Numeric(2) + assert not _Numeric(1) > _Numeric(2) # pylint: disable=unneeded-not + assert not _Numeric(1) >= _Numeric(2) # pylint: disable=unneeded-not + + assert not _Numeric(2) == _Numeric(1) # pylint: disable=unneeded-not + assert _Numeric(2) != _Numeric(1) + assert not _Numeric(2) < _Numeric(1) # pylint: disable=unneeded-not + assert not _Numeric(2) <= _Numeric(1) # pylint: disable=unneeded-not + assert _Numeric(2) > _Numeric(1) + assert _Numeric(2) >= _Numeric(1) |