summaryrefslogtreecommitdiffstats
path: root/test/units/utils
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/utils')
-rw-r--r--test/units/utils/__init__.py0
-rw-r--r--test/units/utils/collection_loader/__init__.py0
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py4
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml4
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py8
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py0
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py4
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py6
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py5
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py6
-rw-r--r--test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep0
-rw-r--r--test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py5
-rw-r--r--test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py5
-rw-r--r--test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py5
-rw-r--r--test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py5
-rw-r--r--test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep0
-rw-r--r--test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep0
-rw-r--r--test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep0
-rw-r--r--test/units/utils/collection_loader/test_collection_loader.py868
-rw-r--r--test/units/utils/display/test_broken_cowsay.py27
-rw-r--r--test/units/utils/display/test_display.py20
-rw-r--r--test/units/utils/display/test_logger.py31
-rw-r--r--test/units/utils/display/test_warning.py42
-rw-r--r--test/units/utils/test_cleanup_tmp_file.py48
-rw-r--r--test/units/utils/test_context_objects.py70
-rw-r--r--test/units/utils/test_display.py135
-rw-r--r--test/units/utils/test_encrypt.py220
-rw-r--r--test/units/utils/test_helpers.py34
-rw-r--r--test/units/utils/test_isidentifier.py49
-rw-r--r--test/units/utils/test_plugin_docs.py333
-rw-r--r--test/units/utils/test_shlex.py41
-rw-r--r--test/units/utils/test_unsafe_proxy.py121
-rw-r--r--test/units/utils/test_vars.py284
-rw-r--r--test/units/utils/test_version.py335
34 files changed, 2715 insertions, 0 deletions
diff --git a/test/units/utils/__init__.py b/test/units/utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/__init__.py
diff --git a/test/units/utils/collection_loader/__init__.py b/test/units/utils/collection_loader/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/collection_loader/__init__.py
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py
new file mode 100644
index 0000000..4041a33
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/ansible/builtin/plugins/modules/shouldnotload.py
@@ -0,0 +1,4 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+raise Exception('this module should never be loaded')
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml
new file mode 100644
index 0000000..f2e2fde
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/meta/runtime.yml
@@ -0,0 +1,4 @@
+plugin_routing:
+ modules:
+ rerouted_module:
+ redirect: ansible.builtin.ping
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py
new file mode 100644
index 0000000..9d30580
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/action/my_action.py
@@ -0,0 +1,8 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ..module_utils.my_util import question
+
+
+def action_code():
+ return "hello from my_action.py"
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/__init__.py
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py
new file mode 100644
index 0000000..35e1381
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_other_util.py
@@ -0,0 +1,4 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from .my_util import question
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py
new file mode 100644
index 0000000..c431c34
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/module_utils/my_util.py
@@ -0,0 +1,6 @@
+# WARNING: Changing line numbers of code in this file will break collection tests that use tracing to check paths and line numbers.
+# Also, do not import division from __future__ as this will break detection of __future__ inheritance on Python 2.
+
+
+def question():
+ return 3 / 2
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py
new file mode 100644
index 0000000..6d69703
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/__init__.py
@@ -0,0 +1,5 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+raise Exception('this should never run')
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py
new file mode 100644
index 0000000..99320a0
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/plugins/modules/amodule.py
@@ -0,0 +1,6 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+def module_code():
+ return "hello from amodule.py"
diff --git a/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections/ansible_collections/testns/testcoll/roles/some_role/.gitkeep
diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py
new file mode 100644
index 0000000..6068ac1
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/__init__.py
@@ -0,0 +1,5 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+raise Exception('this code should never execute')
diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py
new file mode 100644
index 0000000..6068ac1
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/ansible/__init__.py
@@ -0,0 +1,5 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+raise Exception('this code should never execute')
diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py
new file mode 100644
index 0000000..6068ac1
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/__init__.py
@@ -0,0 +1,5 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+raise Exception('this code should never execute')
diff --git a/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py
new file mode 100644
index 0000000..6068ac1
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/collections_masked/ansible_collections/testns/testcoll/__init__.py
@@ -0,0 +1,5 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+raise Exception('this code should never execute')
diff --git a/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/ansible/playbook_adj_other/.gitkeep
diff --git a/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/freshns/playbook_adj_other/.gitkeep
diff --git a/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/utils/collection_loader/fixtures/playbook_path/collections/ansible_collections/testns/playbook_adj_other/.gitkeep
diff --git a/test/units/utils/collection_loader/test_collection_loader.py b/test/units/utils/collection_loader/test_collection_loader.py
new file mode 100644
index 0000000..f7050dc
--- /dev/null
+++ b/test/units/utils/collection_loader/test_collection_loader.py
@@ -0,0 +1,868 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import pkgutil
+import pytest
+import re
+import sys
+
+from ansible.module_utils.six import PY3, string_types
+from ansible.module_utils.compat.importlib import import_module
+from ansible.modules import ping as ping_module
+from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef
+from ansible.utils.collection_loader._collection_finder import (
+ _AnsibleCollectionFinder, _AnsibleCollectionLoader, _AnsibleCollectionNSPkgLoader, _AnsibleCollectionPkgLoader,
+ _AnsibleCollectionPkgLoaderBase, _AnsibleCollectionRootPkgLoader, _AnsiblePathHookFinder,
+ _get_collection_name_from_path, _get_collection_role_path, _get_collection_metadata, _iter_modules_impl
+)
+from ansible.utils.collection_loader._collection_config import _EventSource
+from unittest.mock import MagicMock, NonCallableMagicMock, patch
+
+
+# fixture to ensure we always clean up the import stuff when we're done
+@pytest.fixture(autouse=True, scope='function')
+def teardown(*args, **kwargs):
+ yield
+ reset_collections_loader_state()
+
+# BEGIN STANDALONE TESTS - these exercise behaviors of the individual components without the import machinery
+
+
+@pytest.mark.skipif(not PY3, reason='Testing Python 2 codepath (find_module) on Python 3')
+def test_find_module_py3():
+ dir_to_a_file = os.path.dirname(ping_module.__file__)
+ path_hook_finder = _AnsiblePathHookFinder(_AnsibleCollectionFinder(), dir_to_a_file)
+
+ # setuptools may fall back to find_module on Python 3 if find_spec returns None
+ # see https://github.com/pypa/setuptools/pull/2918
+ assert path_hook_finder.find_spec('missing') is None
+ assert path_hook_finder.find_module('missing') is None
+
+
+def test_finder_setup():
+ # ensure scalar path is listified
+ f = _AnsibleCollectionFinder(paths='/bogus/bogus')
+ assert isinstance(f._n_collection_paths, list)
+
+ # ensure sys.path paths that have an ansible_collections dir are added to the end of the collections paths
+ with patch.object(sys, 'path', ['/bogus', default_test_collection_paths[1], '/morebogus', default_test_collection_paths[0]]):
+ with patch('os.path.isdir', side_effect=lambda x: b'bogus' not in x):
+ f = _AnsibleCollectionFinder(paths=['/explicit', '/other'])
+ assert f._n_collection_paths == ['/explicit', '/other', default_test_collection_paths[1], default_test_collection_paths[0]]
+
+ configured_paths = ['/bogus']
+ playbook_paths = ['/playbookdir']
+ with patch.object(sys, 'path', ['/bogus', '/playbookdir']) and patch('os.path.isdir', side_effect=lambda x: b'bogus' in x):
+ f = _AnsibleCollectionFinder(paths=configured_paths)
+ assert f._n_collection_paths == configured_paths
+
+ f.set_playbook_paths(playbook_paths)
+ assert f._n_collection_paths == extend_paths(playbook_paths, 'collections') + configured_paths
+
+ # ensure scalar playbook_paths gets listified
+ f.set_playbook_paths(playbook_paths[0])
+ assert f._n_collection_paths == extend_paths(playbook_paths, 'collections') + configured_paths
+
+
+def test_finder_not_interested():
+ f = get_default_finder()
+ assert f.find_module('nothanks') is None
+ assert f.find_module('nothanks.sub', path=['/bogus/dir']) is None
+
+
+def test_finder_ns():
+ # ensure we can still load ansible_collections and ansible_collections.ansible when they don't exist on disk
+ f = _AnsibleCollectionFinder(paths=['/bogus/bogus'])
+ loader = f.find_module('ansible_collections')
+ assert isinstance(loader, _AnsibleCollectionRootPkgLoader)
+
+ loader = f.find_module('ansible_collections.ansible', path=['/bogus/bogus'])
+ assert isinstance(loader, _AnsibleCollectionNSPkgLoader)
+
+ f = get_default_finder()
+ loader = f.find_module('ansible_collections')
+ assert isinstance(loader, _AnsibleCollectionRootPkgLoader)
+
+ # path is not allowed for top-level
+ with pytest.raises(ValueError):
+ f.find_module('ansible_collections', path=['whatever'])
+
+ # path is required for subpackages
+ with pytest.raises(ValueError):
+ f.find_module('ansible_collections.whatever', path=None)
+
+ paths = [os.path.join(p, 'ansible_collections/nonexistns') for p in default_test_collection_paths]
+
+ # test missing
+ loader = f.find_module('ansible_collections.nonexistns', paths)
+ assert loader is None
+
+
+# keep these up top to make sure the loader install/remove are working, since we rely on them heavily in the tests
+def test_loader_remove():
+ fake_mp = [MagicMock(), _AnsibleCollectionFinder(), MagicMock(), _AnsibleCollectionFinder()]
+ fake_ph = [MagicMock().m1, MagicMock().m2, _AnsibleCollectionFinder()._ansible_collection_path_hook, NonCallableMagicMock]
+ # must nest until 2.6 compilation is totally donezo
+ with patch.object(sys, 'meta_path', fake_mp):
+ with patch.object(sys, 'path_hooks', fake_ph):
+ _AnsibleCollectionFinder()._remove()
+ assert len(sys.meta_path) == 2
+ # no AnsibleCollectionFinders on the meta path after remove is called
+ assert all((not isinstance(mpf, _AnsibleCollectionFinder) for mpf in sys.meta_path))
+ assert len(sys.path_hooks) == 3
+ # none of the remaining path hooks should point at an AnsibleCollectionFinder
+ assert all((not isinstance(ph.__self__, _AnsibleCollectionFinder) for ph in sys.path_hooks if hasattr(ph, '__self__')))
+ assert AnsibleCollectionConfig.collection_finder is None
+
+
+def test_loader_install():
+ fake_mp = [MagicMock(), _AnsibleCollectionFinder(), MagicMock(), _AnsibleCollectionFinder()]
+ fake_ph = [MagicMock().m1, MagicMock().m2, _AnsibleCollectionFinder()._ansible_collection_path_hook, NonCallableMagicMock]
+ # must nest until 2.6 compilation is totally donezo
+ with patch.object(sys, 'meta_path', fake_mp):
+ with patch.object(sys, 'path_hooks', fake_ph):
+ f = _AnsibleCollectionFinder()
+ f._install()
+ assert len(sys.meta_path) == 3 # should have removed the existing ACFs and installed a new one
+ assert sys.meta_path[0] is f # at the front
+ # the rest of the meta_path should not be AnsibleCollectionFinders
+ assert all((not isinstance(mpf, _AnsibleCollectionFinder) for mpf in sys.meta_path[1:]))
+ assert len(sys.path_hooks) == 4 # should have removed the existing ACF path hooks and installed a new one
+ # the first path hook should be ours, make sure it's pointing at the right instance
+ assert hasattr(sys.path_hooks[0], '__self__') and sys.path_hooks[0].__self__ is f
+ # the rest of the path_hooks should not point at an AnsibleCollectionFinder
+ assert all((not isinstance(ph.__self__, _AnsibleCollectionFinder) for ph in sys.path_hooks[1:] if hasattr(ph, '__self__')))
+ assert AnsibleCollectionConfig.collection_finder is f
+ with pytest.raises(ValueError):
+ AnsibleCollectionConfig.collection_finder = f
+
+
+def test_finder_coll():
+ f = get_default_finder()
+
+ tests = [
+ {'name': 'ansible_collections.testns.testcoll', 'test_paths': [default_test_collection_paths]},
+ {'name': 'ansible_collections.ansible.builtin', 'test_paths': [['/bogus'], default_test_collection_paths]},
+ ]
+ # ensure finder works for legit paths and bogus paths
+ for test_dict in tests:
+ # splat the dict values to our locals
+ globals().update(test_dict)
+ parent_pkg = name.rpartition('.')[0]
+ for paths in test_paths:
+ paths = [os.path.join(p, parent_pkg.replace('.', '/')) for p in paths]
+ loader = f.find_module(name, path=paths)
+ assert isinstance(loader, _AnsibleCollectionPkgLoader)
+
+
+def test_root_loader_not_interested():
+ with pytest.raises(ImportError):
+ _AnsibleCollectionRootPkgLoader('not_ansible_collections_toplevel', path_list=[])
+
+ with pytest.raises(ImportError):
+ _AnsibleCollectionRootPkgLoader('ansible_collections.somens', path_list=['/bogus'])
+
+
+def test_root_loader():
+ name = 'ansible_collections'
+ # ensure this works even when ansible_collections doesn't exist on disk
+ for paths in [], default_test_collection_paths:
+ if name in sys.modules:
+ del sys.modules[name]
+ loader = _AnsibleCollectionRootPkgLoader(name, paths)
+ assert repr(loader).startswith('_AnsibleCollectionRootPkgLoader(path=')
+ module = loader.load_module(name)
+ assert module.__name__ == name
+ assert module.__path__ == [p for p in extend_paths(paths, name) if os.path.isdir(p)]
+ # even if the dir exists somewhere, this loader doesn't support get_data, so make __file__ a non-file
+ assert module.__file__ == '<ansible_synthetic_collection_package>'
+ assert module.__package__ == name
+ assert sys.modules.get(name) == module
+
+
+def test_nspkg_loader_not_interested():
+ with pytest.raises(ImportError):
+ _AnsibleCollectionNSPkgLoader('not_ansible_collections_toplevel.something', path_list=[])
+
+ with pytest.raises(ImportError):
+ _AnsibleCollectionNSPkgLoader('ansible_collections.somens.somecoll', path_list=[])
+
+
+def test_nspkg_loader_load_module():
+ # ensure the loader behaves on the toplevel and ansible packages for both legit and missing/bogus paths
+ for name in ['ansible_collections.ansible', 'ansible_collections.testns']:
+ parent_pkg = name.partition('.')[0]
+ module_to_load = name.rpartition('.')[2]
+ paths = extend_paths(default_test_collection_paths, parent_pkg)
+ existing_child_paths = [p for p in extend_paths(paths, module_to_load) if os.path.exists(p)]
+ if name in sys.modules:
+ del sys.modules[name]
+ loader = _AnsibleCollectionNSPkgLoader(name, path_list=paths)
+ assert repr(loader).startswith('_AnsibleCollectionNSPkgLoader(path=')
+ module = loader.load_module(name)
+ assert module.__name__ == name
+ assert isinstance(module.__loader__, _AnsibleCollectionNSPkgLoader)
+ assert module.__path__ == existing_child_paths
+ assert module.__package__ == name
+ assert module.__file__ == '<ansible_synthetic_collection_package>'
+ assert sys.modules.get(name) == module
+
+
+def test_collpkg_loader_not_interested():
+ with pytest.raises(ImportError):
+ _AnsibleCollectionPkgLoader('not_ansible_collections', path_list=[])
+
+ with pytest.raises(ImportError):
+ _AnsibleCollectionPkgLoader('ansible_collections.ns', path_list=['/bogus/bogus'])
+
+
+def test_collpkg_loader_load_module():
+ reset_collections_loader_state()
+ with patch('ansible.utils.collection_loader.AnsibleCollectionConfig') as p:
+ for name in ['ansible_collections.ansible.builtin', 'ansible_collections.testns.testcoll']:
+ parent_pkg = name.rpartition('.')[0]
+ module_to_load = name.rpartition('.')[2]
+ paths = extend_paths(default_test_collection_paths, parent_pkg)
+ existing_child_paths = [p for p in extend_paths(paths, module_to_load) if os.path.exists(p)]
+ is_builtin = 'ansible.builtin' in name
+ if name in sys.modules:
+ del sys.modules[name]
+ loader = _AnsibleCollectionPkgLoader(name, path_list=paths)
+ assert repr(loader).startswith('_AnsibleCollectionPkgLoader(path=')
+ module = loader.load_module(name)
+ assert module.__name__ == name
+ assert isinstance(module.__loader__, _AnsibleCollectionPkgLoader)
+ if is_builtin:
+ assert module.__path__ == []
+ else:
+ assert module.__path__ == [existing_child_paths[0]]
+
+ assert module.__package__ == name
+ if is_builtin:
+ assert module.__file__ == '<ansible_synthetic_collection_package>'
+ else:
+ assert module.__file__.endswith('__synthetic__') and os.path.isdir(os.path.dirname(module.__file__))
+ assert sys.modules.get(name) == module
+
+ assert hasattr(module, '_collection_meta') and isinstance(module._collection_meta, dict)
+
+ # FIXME: validate _collection_meta contents match what's on disk (or not)
+
+ # if the module has metadata, try loading it with busted metadata
+ if module._collection_meta:
+ _collection_finder = import_module('ansible.utils.collection_loader._collection_finder')
+ with patch.object(_collection_finder, '_meta_yml_to_dict', side_effect=Exception('bang')):
+ with pytest.raises(Exception) as ex:
+ _AnsibleCollectionPkgLoader(name, path_list=paths).load_module(name)
+ assert 'error parsing collection metadata' in str(ex.value)
+
+
+def test_coll_loader():
+ with patch('ansible.utils.collection_loader.AnsibleCollectionConfig'):
+ with pytest.raises(ValueError):
+ # not a collection
+ _AnsibleCollectionLoader('ansible_collections')
+
+ with pytest.raises(ValueError):
+ # bogus paths
+ _AnsibleCollectionLoader('ansible_collections.testns.testcoll', path_list=[])
+
+ # FIXME: more
+
+
+def test_path_hook_setup():
+ with patch.object(sys, 'path_hooks', []):
+ found_hook = None
+ pathhook_exc = None
+ try:
+ found_hook = _AnsiblePathHookFinder._get_filefinder_path_hook()
+ except Exception as phe:
+ pathhook_exc = phe
+
+ if PY3:
+ assert str(pathhook_exc) == 'need exactly one FileFinder import hook (found 0)'
+ else:
+ assert found_hook is None
+
+ assert repr(_AnsiblePathHookFinder(object(), '/bogus/path')) == "_AnsiblePathHookFinder(path='/bogus/path')"
+
+
+def test_path_hook_importerror():
+ # ensure that AnsiblePathHookFinder.find_module swallows ImportError from path hook delegation on Py3, eg if the delegated
+ # path hook gets passed a file on sys.path (python36.zip)
+ reset_collections_loader_state()
+ path_to_a_file = os.path.join(default_test_collection_paths[0], 'ansible_collections/testns/testcoll/plugins/action/my_action.py')
+ # it's a bug if the following pops an ImportError...
+ assert _AnsiblePathHookFinder(_AnsibleCollectionFinder(), path_to_a_file).find_module('foo.bar.my_action') is None
+
+
+def test_new_or_existing_module():
+ module_name = 'blar.test.module'
+ pkg_name = module_name.rpartition('.')[0]
+
+ # create new module case
+ nuke_module_prefix(module_name)
+ with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name, __package__=pkg_name) as new_module:
+ # the module we just created should now exist in sys.modules
+ assert sys.modules.get(module_name) is new_module
+ assert new_module.__name__ == module_name
+
+ # the module should stick since we didn't raise an exception in the contextmgr
+ assert sys.modules.get(module_name) is new_module
+
+ # reuse existing module case
+ with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name, __attr1__=42, blar='yo') as existing_module:
+ assert sys.modules.get(module_name) is new_module # should be the same module we created earlier
+ assert hasattr(existing_module, '__package__') and existing_module.__package__ == pkg_name
+ assert hasattr(existing_module, '__attr1__') and existing_module.__attr1__ == 42
+ assert hasattr(existing_module, 'blar') and existing_module.blar == 'yo'
+
+ # exception during update existing shouldn't zap existing module from sys.modules
+ with pytest.raises(ValueError) as ve:
+ with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name) as existing_module:
+ err_to_raise = ValueError('bang')
+ raise err_to_raise
+ # make sure we got our error
+ assert ve.value is err_to_raise
+ # and that the module still exists
+ assert sys.modules.get(module_name) is existing_module
+
+ # test module removal after exception during creation
+ nuke_module_prefix(module_name)
+ with pytest.raises(ValueError) as ve:
+ with _AnsibleCollectionPkgLoaderBase._new_or_existing_module(module_name) as new_module:
+ err_to_raise = ValueError('bang')
+ raise err_to_raise
+ # make sure we got our error
+ assert ve.value is err_to_raise
+ # and that the module was removed
+ assert sys.modules.get(module_name) is None
+
+
+def test_iter_modules_impl():
+ modules_trailer = 'ansible_collections/testns/testcoll/plugins'
+ modules_pkg_prefix = modules_trailer.replace('/', '.') + '.'
+ modules_path = os.path.join(default_test_collection_paths[0], modules_trailer)
+ modules = list(_iter_modules_impl([modules_path], modules_pkg_prefix))
+
+ assert modules
+ assert set([('ansible_collections.testns.testcoll.plugins.action', True),
+ ('ansible_collections.testns.testcoll.plugins.module_utils', True),
+ ('ansible_collections.testns.testcoll.plugins.modules', True)]) == set(modules)
+
+ modules_trailer = 'ansible_collections/testns/testcoll/plugins/modules'
+ modules_pkg_prefix = modules_trailer.replace('/', '.') + '.'
+ modules_path = os.path.join(default_test_collection_paths[0], modules_trailer)
+ modules = list(_iter_modules_impl([modules_path], modules_pkg_prefix))
+
+ assert modules
+ assert len(modules) == 1
+ assert modules[0][0] == 'ansible_collections.testns.testcoll.plugins.modules.amodule' # name
+ assert modules[0][1] is False # is_pkg
+
+ # FIXME: more
+
+
+# BEGIN IN-CIRCUIT TESTS - these exercise behaviors of the loader when wired up to the import machinery
+
+
+def test_import_from_collection(monkeypatch):
+ collection_root = os.path.join(os.path.dirname(__file__), 'fixtures', 'collections')
+ collection_path = os.path.join(collection_root, 'ansible_collections/testns/testcoll/plugins/module_utils/my_util.py')
+
+ # THIS IS UNSTABLE UNDER A DEBUGGER
+ # the trace we're expecting to be generated when running the code below:
+ # answer = question()
+ expected_trace_log = [
+ (collection_path, 5, 'call'),
+ (collection_path, 6, 'line'),
+ (collection_path, 6, 'return'),
+ ]
+
+ # define the collection root before any ansible code has been loaded
+ # otherwise config will have already been loaded and changing the environment will have no effect
+ monkeypatch.setenv('ANSIBLE_COLLECTIONS_PATH', collection_root)
+
+ finder = _AnsibleCollectionFinder(paths=[collection_root])
+ reset_collections_loader_state(finder)
+
+ from ansible_collections.testns.testcoll.plugins.module_utils.my_util import question
+
+ original_trace_function = sys.gettrace()
+ trace_log = []
+
+ if original_trace_function:
+ # enable tracing while preserving the existing trace function (coverage)
+ def my_trace_function(frame, event, arg):
+ trace_log.append((frame.f_code.co_filename, frame.f_lineno, event))
+
+ # the original trace function expects to have itself set as the trace function
+ sys.settrace(original_trace_function)
+ # call the original trace function
+ original_trace_function(frame, event, arg)
+ # restore our trace function
+ sys.settrace(my_trace_function)
+
+ return my_trace_function
+ else:
+ # no existing trace function, so our trace function is much simpler
+ def my_trace_function(frame, event, arg):
+ trace_log.append((frame.f_code.co_filename, frame.f_lineno, event))
+
+ return my_trace_function
+
+ sys.settrace(my_trace_function)
+
+ try:
+ # run a minimal amount of code while the trace is running
+ # adding more code here, including use of a context manager, will add more to our trace
+ answer = question()
+ finally:
+ sys.settrace(original_trace_function)
+
+ # make sure 'import ... as ...' works on builtin synthetic collections
+ # the following import is not supported (it tries to find module_utils in ansible.plugins)
+ # import ansible_collections.ansible.builtin.plugins.module_utils as c1
+ import ansible_collections.ansible.builtin.plugins.action as c2
+ import ansible_collections.ansible.builtin.plugins as c3
+ import ansible_collections.ansible.builtin as c4
+ import ansible_collections.ansible as c5
+ import ansible_collections as c6
+
+ # make sure 'import ...' works on builtin synthetic collections
+ import ansible_collections.ansible.builtin.plugins.module_utils
+
+ import ansible_collections.ansible.builtin.plugins.action
+ assert ansible_collections.ansible.builtin.plugins.action == c3.action == c2
+
+ import ansible_collections.ansible.builtin.plugins
+ assert ansible_collections.ansible.builtin.plugins == c4.plugins == c3
+
+ import ansible_collections.ansible.builtin
+ assert ansible_collections.ansible.builtin == c5.builtin == c4
+
+ import ansible_collections.ansible
+ assert ansible_collections.ansible == c6.ansible == c5
+
+ import ansible_collections
+ assert ansible_collections == c6
+
+ # make sure 'from ... import ...' works on builtin synthetic collections
+ from ansible_collections.ansible import builtin
+ from ansible_collections.ansible.builtin import plugins
+ assert builtin.plugins == plugins
+
+ from ansible_collections.ansible.builtin.plugins import action
+ from ansible_collections.ansible.builtin.plugins.action import command
+ assert action.command == command
+
+ from ansible_collections.ansible.builtin.plugins.module_utils import basic
+ from ansible_collections.ansible.builtin.plugins.module_utils.basic import AnsibleModule
+ assert basic.AnsibleModule == AnsibleModule
+
+ # make sure relative imports work from collections code
+ # these require __package__ to be set correctly
+ import ansible_collections.testns.testcoll.plugins.module_utils.my_other_util
+ import ansible_collections.testns.testcoll.plugins.action.my_action
+
+ # verify that code loaded from a collection does not inherit __future__ statements from the collection loader
+ if sys.version_info[0] == 2:
+ # if the collection code inherits the division future feature from the collection loader this will fail
+ assert answer == 1
+ else:
+ assert answer == 1.5
+
+ # verify that the filename and line number reported by the trace is correct
+ # this makes sure that collection loading preserves file paths and line numbers
+ assert trace_log == expected_trace_log
+
+
+def test_eventsource():
+ es = _EventSource()
+ # fire when empty should succeed
+ es.fire(42)
+ handler1 = MagicMock()
+ handler2 = MagicMock()
+ es += handler1
+ es.fire(99, my_kwarg='blah')
+ handler1.assert_called_with(99, my_kwarg='blah')
+ es += handler2
+ es.fire(123, foo='bar')
+ handler1.assert_called_with(123, foo='bar')
+ handler2.assert_called_with(123, foo='bar')
+ es -= handler2
+ handler1.reset_mock()
+ handler2.reset_mock()
+ es.fire(123, foo='bar')
+ handler1.assert_called_with(123, foo='bar')
+ handler2.assert_not_called()
+ es -= handler1
+ handler1.reset_mock()
+ es.fire('blah', kwarg=None)
+ handler1.assert_not_called()
+ handler2.assert_not_called()
+ es -= handler1 # should succeed silently
+ handler_bang = MagicMock(side_effect=Exception('bang'))
+ es += handler_bang
+ with pytest.raises(Exception) as ex:
+ es.fire(123)
+ assert 'bang' in str(ex.value)
+ handler_bang.assert_called_with(123)
+ with pytest.raises(ValueError):
+ es += 42
+
+
+def test_on_collection_load():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ load_handler = MagicMock()
+ AnsibleCollectionConfig.on_collection_load += load_handler
+
+ m = import_module('ansible_collections.testns.testcoll')
+ load_handler.assert_called_once_with(collection_name='testns.testcoll', collection_path=os.path.dirname(m.__file__))
+
+ _meta = _get_collection_metadata('testns.testcoll')
+ assert _meta
+ # FIXME: compare to disk
+
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ AnsibleCollectionConfig.on_collection_load += MagicMock(side_effect=Exception('bang'))
+ with pytest.raises(Exception) as ex:
+ import_module('ansible_collections.testns.testcoll')
+ assert 'bang' in str(ex.value)
+
+
+def test_default_collection_config():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+ assert AnsibleCollectionConfig.default_collection is None
+ AnsibleCollectionConfig.default_collection = 'foo.bar'
+ assert AnsibleCollectionConfig.default_collection == 'foo.bar'
+
+
+def test_default_collection_detection():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ # we're clearly not under a collection path
+ assert _get_collection_name_from_path('/') is None
+
+ # something that looks like a collection path but isn't importable by our finder
+ assert _get_collection_name_from_path('/foo/ansible_collections/bogusns/boguscoll/bar') is None
+
+ # legit, at the top of the collection
+ live_collection_path = os.path.join(os.path.dirname(__file__), 'fixtures/collections/ansible_collections/testns/testcoll')
+ assert _get_collection_name_from_path(live_collection_path) == 'testns.testcoll'
+
+ # legit, deeper inside the collection
+ live_collection_deep_path = os.path.join(live_collection_path, 'plugins/modules')
+ assert _get_collection_name_from_path(live_collection_deep_path) == 'testns.testcoll'
+
+ # this one should be hidden by the real testns.testcoll, so should not resolve
+ masked_collection_path = os.path.join(os.path.dirname(__file__), 'fixtures/collections_masked/ansible_collections/testns/testcoll')
+ assert _get_collection_name_from_path(masked_collection_path) is None
+
+
+@pytest.mark.parametrize(
+ 'role_name,collection_list,expected_collection_name,expected_path_suffix',
+ [
+ ('some_role', ['testns.testcoll', 'ansible.bogus'], 'testns.testcoll', 'testns/testcoll/roles/some_role'),
+ ('testns.testcoll.some_role', ['ansible.bogus', 'testns.testcoll'], 'testns.testcoll', 'testns/testcoll/roles/some_role'),
+ ('testns.testcoll.some_role', [], 'testns.testcoll', 'testns/testcoll/roles/some_role'),
+ ('testns.testcoll.some_role', None, 'testns.testcoll', 'testns/testcoll/roles/some_role'),
+ ('some_role', [], None, None),
+ ('some_role', None, None, None),
+ ])
+def test_collection_role_name_location(role_name, collection_list, expected_collection_name, expected_path_suffix):
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ expected_path = None
+ if expected_path_suffix:
+ expected_path = os.path.join(os.path.dirname(__file__), 'fixtures/collections/ansible_collections', expected_path_suffix)
+
+ found = _get_collection_role_path(role_name, collection_list)
+
+ if found:
+ assert found[0] == role_name.rpartition('.')[2]
+ assert found[1] == expected_path
+ assert found[2] == expected_collection_name
+ else:
+ assert expected_collection_name is None and expected_path_suffix is None
+
+
+def test_bogus_imports():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ # ensure ImportError on known-bogus imports
+ bogus_imports = ['bogus_toplevel', 'ansible_collections.bogusns', 'ansible_collections.testns.boguscoll',
+ 'ansible_collections.testns.testcoll.bogussub', 'ansible_collections.ansible.builtin.bogussub']
+ for bogus_import in bogus_imports:
+ with pytest.raises(ImportError):
+ import_module(bogus_import)
+
+
+def test_empty_vs_no_code():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ from ansible_collections.testns import testcoll # synthetic package with no code on disk
+ from ansible_collections.testns.testcoll.plugins import module_utils # real package with empty code file
+
+ # ensure synthetic packages have no code object at all (prevent bogus coverage entries)
+ assert testcoll.__loader__.get_source(testcoll.__name__) is None
+ assert testcoll.__loader__.get_code(testcoll.__name__) is None
+
+ # ensure empty package inits do have a code object
+ assert module_utils.__loader__.get_source(module_utils.__name__) == b''
+ assert module_utils.__loader__.get_code(module_utils.__name__) is not None
+
+
+def test_finder_playbook_paths():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ import ansible_collections
+ import ansible_collections.ansible
+ import ansible_collections.testns
+
+ # ensure the package modules look like we expect
+ assert hasattr(ansible_collections, '__path__') and len(ansible_collections.__path__) > 0
+ assert hasattr(ansible_collections.ansible, '__path__') and len(ansible_collections.ansible.__path__) > 0
+ assert hasattr(ansible_collections.testns, '__path__') and len(ansible_collections.testns.__path__) > 0
+
+ # these shouldn't be visible yet, since we haven't added the playbook dir
+ with pytest.raises(ImportError):
+ import ansible_collections.ansible.playbook_adj_other
+
+ with pytest.raises(ImportError):
+ import ansible_collections.testns.playbook_adj_other
+
+ assert AnsibleCollectionConfig.playbook_paths == []
+ playbook_path_fixture_dir = os.path.join(os.path.dirname(__file__), 'fixtures/playbook_path')
+
+ # configure the playbook paths
+ AnsibleCollectionConfig.playbook_paths = [playbook_path_fixture_dir]
+
+ # playbook paths go to the front of the line
+ assert AnsibleCollectionConfig.collection_paths[0] == os.path.join(playbook_path_fixture_dir, 'collections')
+
+ # playbook paths should be updated on the existing root ansible_collections path, as well as on the 'ansible' namespace (but no others!)
+ assert ansible_collections.__path__[0] == os.path.join(playbook_path_fixture_dir, 'collections/ansible_collections')
+ assert ansible_collections.ansible.__path__[0] == os.path.join(playbook_path_fixture_dir, 'collections/ansible_collections/ansible')
+ assert all('playbook_path' not in p for p in ansible_collections.testns.__path__)
+
+ # should succeed since we fixed up the package path
+ import ansible_collections.ansible.playbook_adj_other
+ # should succeed since we didn't import freshns before hacking in the path
+ import ansible_collections.freshns.playbook_adj_other
+ # should fail since we've already imported something from this path and didn't fix up its package path
+ with pytest.raises(ImportError):
+ import ansible_collections.testns.playbook_adj_other
+
+
+def test_toplevel_iter_modules():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ modules = list(pkgutil.iter_modules(default_test_collection_paths, ''))
+ assert len(modules) == 1
+ assert modules[0][1] == 'ansible_collections'
+
+
+def test_iter_modules_namespaces():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ paths = extend_paths(default_test_collection_paths, 'ansible_collections')
+ modules = list(pkgutil.iter_modules(paths, 'ansible_collections.'))
+ assert len(modules) == 2
+ assert all(m[2] is True for m in modules)
+ assert all(isinstance(m[0], _AnsiblePathHookFinder) for m in modules)
+ assert set(['ansible_collections.testns', 'ansible_collections.ansible']) == set(m[1] for m in modules)
+
+
+def test_collection_get_data():
+ finder = get_default_finder()
+ reset_collections_loader_state(finder)
+
+ # something that's there
+ d = pkgutil.get_data('ansible_collections.testns.testcoll', 'plugins/action/my_action.py')
+ assert b'hello from my_action.py' in d
+
+ # something that's not there
+ d = pkgutil.get_data('ansible_collections.testns.testcoll', 'bogus/bogus')
+ assert d is None
+
+ with pytest.raises(ValueError):
+ plugins_pkg = import_module('ansible_collections.ansible.builtin')
+ assert not os.path.exists(os.path.dirname(plugins_pkg.__file__))
+ d = pkgutil.get_data('ansible_collections.ansible.builtin', 'plugins/connection/local.py')
+
+
+@pytest.mark.parametrize(
+ 'ref,ref_type,expected_collection,expected_subdirs,expected_resource,expected_python_pkg_name',
+ [
+ ('ns.coll.myaction', 'action', 'ns.coll', '', 'myaction', 'ansible_collections.ns.coll.plugins.action'),
+ ('ns.coll.subdir1.subdir2.myaction', 'action', 'ns.coll', 'subdir1.subdir2', 'myaction', 'ansible_collections.ns.coll.plugins.action.subdir1.subdir2'),
+ ('ns.coll.myrole', 'role', 'ns.coll', '', 'myrole', 'ansible_collections.ns.coll.roles.myrole'),
+ ('ns.coll.subdir1.subdir2.myrole', 'role', 'ns.coll', 'subdir1.subdir2', 'myrole', 'ansible_collections.ns.coll.roles.subdir1.subdir2.myrole'),
+ ])
+def test_fqcr_parsing_valid(ref, ref_type, expected_collection,
+ expected_subdirs, expected_resource, expected_python_pkg_name):
+ assert AnsibleCollectionRef.is_valid_fqcr(ref, ref_type)
+
+ r = AnsibleCollectionRef.from_fqcr(ref, ref_type)
+ assert r.collection == expected_collection
+ assert r.subdirs == expected_subdirs
+ assert r.resource == expected_resource
+ assert r.n_python_package_name == expected_python_pkg_name
+
+ r = AnsibleCollectionRef.try_parse_fqcr(ref, ref_type)
+ assert r.collection == expected_collection
+ assert r.subdirs == expected_subdirs
+ assert r.resource == expected_resource
+ assert r.n_python_package_name == expected_python_pkg_name
+
+
+@pytest.mark.parametrize(
+ ('fqcn', 'expected'),
+ (
+ ('ns1.coll2', True),
+ ('ns1#coll2', False),
+ ('def.coll3', False),
+ ('ns4.return', False),
+ ('assert.this', False),
+ ('import.that', False),
+ ('.that', False),
+ ('this.', False),
+ ('.', False),
+ ('', False),
+ ),
+)
+def test_fqcn_validation(fqcn, expected):
+ """Vefiry that is_valid_collection_name validates FQCN correctly."""
+ assert AnsibleCollectionRef.is_valid_collection_name(fqcn) is expected
+
+
+@pytest.mark.parametrize(
+ 'ref,ref_type,expected_error_type,expected_error_expression',
+ [
+ ('no_dots_at_all_action', 'action', ValueError, 'is not a valid collection reference'),
+ ('no_nscoll.myaction', 'action', ValueError, 'is not a valid collection reference'),
+ ('no_nscoll%myaction', 'action', ValueError, 'is not a valid collection reference'),
+ ('ns.coll.myaction', 'bogus', ValueError, 'invalid collection ref_type'),
+ ])
+def test_fqcr_parsing_invalid(ref, ref_type, expected_error_type, expected_error_expression):
+ assert not AnsibleCollectionRef.is_valid_fqcr(ref, ref_type)
+
+ with pytest.raises(expected_error_type) as curerr:
+ AnsibleCollectionRef.from_fqcr(ref, ref_type)
+
+ assert re.search(expected_error_expression, str(curerr.value))
+
+ r = AnsibleCollectionRef.try_parse_fqcr(ref, ref_type)
+ assert r is None
+
+
+@pytest.mark.parametrize(
+ 'name,subdirs,resource,ref_type,python_pkg_name',
+ [
+ ('ns.coll', None, 'res', 'doc_fragments', 'ansible_collections.ns.coll.plugins.doc_fragments'),
+ ('ns.coll', 'subdir1', 'res', 'doc_fragments', 'ansible_collections.ns.coll.plugins.doc_fragments.subdir1'),
+ ('ns.coll', 'subdir1.subdir2', 'res', 'action', 'ansible_collections.ns.coll.plugins.action.subdir1.subdir2'),
+ ])
+def test_collectionref_components_valid(name, subdirs, resource, ref_type, python_pkg_name):
+ x = AnsibleCollectionRef(name, subdirs, resource, ref_type)
+
+ assert x.collection == name
+ if subdirs:
+ assert x.subdirs == subdirs
+ else:
+ assert x.subdirs == ''
+
+ assert x.resource == resource
+ assert x.ref_type == ref_type
+ assert x.n_python_package_name == python_pkg_name
+
+
+@pytest.mark.parametrize(
+ 'dirname,expected_result',
+ [
+ ('become_plugins', 'become'),
+ ('cache_plugins', 'cache'),
+ ('connection_plugins', 'connection'),
+ ('library', 'modules'),
+ ('filter_plugins', 'filter'),
+ ('bogus_plugins', ValueError),
+ (None, ValueError)
+ ]
+)
+def test_legacy_plugin_dir_to_plugin_type(dirname, expected_result):
+ if isinstance(expected_result, string_types):
+ assert AnsibleCollectionRef.legacy_plugin_dir_to_plugin_type(dirname) == expected_result
+ else:
+ with pytest.raises(expected_result):
+ AnsibleCollectionRef.legacy_plugin_dir_to_plugin_type(dirname)
+
+
+@pytest.mark.parametrize(
+ 'name,subdirs,resource,ref_type,expected_error_type,expected_error_expression',
+ [
+ ('bad_ns', '', 'resource', 'action', ValueError, 'invalid collection name'),
+ ('ns.coll.', '', 'resource', 'action', ValueError, 'invalid collection name'),
+ ('ns.coll', 'badsubdir#', 'resource', 'action', ValueError, 'invalid subdirs entry'),
+ ('ns.coll', 'badsubdir.', 'resource', 'action', ValueError, 'invalid subdirs entry'),
+ ('ns.coll', '.badsubdir', 'resource', 'action', ValueError, 'invalid subdirs entry'),
+ ('ns.coll', '', 'resource', 'bogus', ValueError, 'invalid collection ref_type'),
+ ])
+def test_collectionref_components_invalid(name, subdirs, resource, ref_type, expected_error_type, expected_error_expression):
+ with pytest.raises(expected_error_type) as curerr:
+ AnsibleCollectionRef(name, subdirs, resource, ref_type)
+
+ assert re.search(expected_error_expression, str(curerr.value))
+
+
+# BEGIN TEST SUPPORT
+
+default_test_collection_paths = [
+ os.path.join(os.path.dirname(__file__), 'fixtures', 'collections'),
+ os.path.join(os.path.dirname(__file__), 'fixtures', 'collections_masked'),
+ '/bogus/bogussub'
+]
+
+
+def get_default_finder():
+ return _AnsibleCollectionFinder(paths=default_test_collection_paths)
+
+
+def extend_paths(path_list, suffix):
+ suffix = suffix.replace('.', '/')
+ return [os.path.join(p, suffix) for p in path_list]
+
+
+def nuke_module_prefix(prefix):
+ for module_to_nuke in [m for m in sys.modules if m.startswith(prefix)]:
+ sys.modules.pop(module_to_nuke)
+
+
+def reset_collections_loader_state(metapath_finder=None):
+ _AnsibleCollectionFinder._remove()
+
+ nuke_module_prefix('ansible_collections')
+ nuke_module_prefix('ansible.modules')
+ nuke_module_prefix('ansible.plugins')
+
+ # FIXME: better to move this someplace else that gets cleaned up automatically?
+ _AnsibleCollectionLoader._redirected_package_map = {}
+
+ AnsibleCollectionConfig._default_collection = None
+ AnsibleCollectionConfig._on_collection_load = _EventSource()
+
+ if metapath_finder:
+ metapath_finder._install()
diff --git a/test/units/utils/display/test_broken_cowsay.py b/test/units/utils/display/test_broken_cowsay.py
new file mode 100644
index 0000000..d888010
--- /dev/null
+++ b/test/units/utils/display/test_broken_cowsay.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2021 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+
+from ansible.utils.display import Display
+from unittest.mock import MagicMock
+
+
+def test_display_with_fake_cowsay_binary(capsys, mocker):
+ mocker.patch("ansible.constants.ANSIBLE_COW_PATH", "./cowsay.sh")
+
+ def mock_communicate(input=None, timeout=None):
+ return b"", b""
+
+ mock_popen = MagicMock()
+ mock_popen.return_value.communicate = mock_communicate
+ mock_popen.return_value.returncode = 1
+ mocker.patch("subprocess.Popen", mock_popen)
+
+ display = Display()
+ assert not hasattr(display, "cows_available")
+ assert display.b_cowsay is None
diff --git a/test/units/utils/display/test_display.py b/test/units/utils/display/test_display.py
new file mode 100644
index 0000000..cdeb496
--- /dev/null
+++ b/test/units/utils/display/test_display.py
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2020 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+from ansible.utils.display import Display
+
+
+def test_display_basic_message(capsys, mocker):
+ # Disable logging
+ mocker.patch('ansible.utils.display.logger', return_value=None)
+
+ d = Display()
+ d.display(u'Some displayed message')
+ out, err = capsys.readouterr()
+ assert out == 'Some displayed message\n'
+ assert err == ''
diff --git a/test/units/utils/display/test_logger.py b/test/units/utils/display/test_logger.py
new file mode 100644
index 0000000..ed69393
--- /dev/null
+++ b/test/units/utils/display/test_logger.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2020 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+import logging
+import sys
+
+
+def test_logger():
+ '''
+ Avoid CVE-2019-14846 as 3rd party libs will disclose secrets when
+ logging is set to DEBUG
+ '''
+
+ # clear loaded modules to have unadultered test.
+ for loaded in list(sys.modules.keys()):
+ if 'ansible' in loaded:
+ del sys.modules[loaded]
+
+ # force logger to exist via config
+ from ansible import constants as C
+ C.DEFAULT_LOG_PATH = '/dev/null'
+
+ # initialize logger
+ from ansible.utils.display import logger
+
+ assert logger.root.level != logging.DEBUG
diff --git a/test/units/utils/display/test_warning.py b/test/units/utils/display/test_warning.py
new file mode 100644
index 0000000..be63c34
--- /dev/null
+++ b/test/units/utils/display/test_warning.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2020 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import pytest
+
+from ansible.utils.display import Display
+
+
+@pytest.fixture
+def warning_message():
+ warning_message = 'bad things will happen'
+ expected_warning_message = '[WARNING]: {0}\n'.format(warning_message)
+ return warning_message, expected_warning_message
+
+
+def test_warning(capsys, mocker, warning_message):
+ warning_message, expected_warning_message = warning_message
+
+ mocker.patch('ansible.utils.color.ANSIBLE_COLOR', True)
+ mocker.patch('ansible.utils.color.parsecolor', return_value=u'1;35') # value for 'bright purple'
+
+ d = Display()
+ d.warning(warning_message)
+ out, err = capsys.readouterr()
+ assert d._warns == {expected_warning_message: 1}
+ assert err == '\x1b[1;35m{0}\x1b[0m\n'.format(expected_warning_message.rstrip('\n'))
+
+
+def test_warning_no_color(capsys, mocker, warning_message):
+ warning_message, expected_warning_message = warning_message
+
+ mocker.patch('ansible.utils.color.ANSIBLE_COLOR', False)
+
+ d = Display()
+ d.warning(warning_message)
+ out, err = capsys.readouterr()
+ assert d._warns == {expected_warning_message: 1}
+ assert err == expected_warning_message
diff --git a/test/units/utils/test_cleanup_tmp_file.py b/test/units/utils/test_cleanup_tmp_file.py
new file mode 100644
index 0000000..2a44a55
--- /dev/null
+++ b/test/units/utils/test_cleanup_tmp_file.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2019, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import pytest
+import tempfile
+
+from ansible.utils.path import cleanup_tmp_file
+
+
+def raise_error():
+ raise OSError
+
+
+def test_cleanup_tmp_file_file():
+ tmp_fd, tmp = tempfile.mkstemp()
+ cleanup_tmp_file(tmp)
+
+ assert not os.path.exists(tmp)
+
+
+def test_cleanup_tmp_file_dir():
+ tmp = tempfile.mkdtemp()
+ cleanup_tmp_file(tmp)
+
+ assert not os.path.exists(tmp)
+
+
+def test_cleanup_tmp_file_nonexistant():
+ assert None is cleanup_tmp_file('nope')
+
+
+def test_cleanup_tmp_file_failure(mocker):
+ tmp = tempfile.mkdtemp()
+ with pytest.raises(Exception):
+ mocker.patch('shutil.rmtree', side_effect=raise_error())
+ cleanup_tmp_file(tmp)
+
+
+def test_cleanup_tmp_file_failure_warning(mocker, capsys):
+ tmp = tempfile.mkdtemp()
+ with pytest.raises(Exception):
+ mocker.patch('shutil.rmtree', side_effect=raise_error())
+ cleanup_tmp_file(tmp, warn=True)
diff --git a/test/units/utils/test_context_objects.py b/test/units/utils/test_context_objects.py
new file mode 100644
index 0000000..c56a41d
--- /dev/null
+++ b/test/units/utils/test_context_objects.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2018, Toshio Kuratomi <tkuratomi@ansible.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import argparse
+
+import pytest
+
+from ansible.module_utils.common.collections import ImmutableDict
+from ansible.utils import context_objects as co
+
+
+MAKE_IMMUTABLE_DATA = ((u'くらとみ', u'くらとみ'),
+ (42, 42),
+ ({u'café': u'くらとみ'}, ImmutableDict({u'café': u'くらとみ'})),
+ ([1, u'café', u'くらとみ'], (1, u'café', u'くらとみ')),
+ (set((1, u'café', u'くらとみ')), frozenset((1, u'café', u'くらとみ'))),
+ ({u'café': [1, set(u'ñ')]},
+ ImmutableDict({u'café': (1, frozenset(u'ñ'))})),
+ ([set((1, 2)), {u'くらとみ': 3}],
+ (frozenset((1, 2)), ImmutableDict({u'くらとみ': 3}))),
+ )
+
+
+@pytest.mark.parametrize('data, expected', MAKE_IMMUTABLE_DATA)
+def test_make_immutable(data, expected):
+ assert co._make_immutable(data) == expected
+
+
+def test_cliargs_from_dict():
+ old_dict = {'tags': [u'production', u'webservers'],
+ 'check_mode': True,
+ 'start_at_task': u'Start with くらとみ'}
+ expected = frozenset((('tags', (u'production', u'webservers')),
+ ('check_mode', True),
+ ('start_at_task', u'Start with くらとみ')))
+
+ assert frozenset(co.CLIArgs(old_dict).items()) == expected
+
+
+def test_cliargs():
+ class FakeOptions:
+ pass
+ options = FakeOptions()
+ options.tags = [u'production', u'webservers']
+ options.check_mode = True
+ options.start_at_task = u'Start with くらとみ'
+
+ expected = frozenset((('tags', (u'production', u'webservers')),
+ ('check_mode', True),
+ ('start_at_task', u'Start with くらとみ')))
+
+ assert frozenset(co.CLIArgs.from_options(options).items()) == expected
+
+
+def test_cliargs_argparse():
+ parser = argparse.ArgumentParser(description='Process some integers.')
+ parser.add_argument('integers', metavar='N', type=int, nargs='+',
+ help='an integer for the accumulator')
+ parser.add_argument('--sum', dest='accumulate', action='store_const',
+ const=sum, default=max,
+ help='sum the integers (default: find the max)')
+ args = parser.parse_args([u'--sum', u'1', u'2'])
+
+ expected = frozenset((('accumulate', sum), ('integers', (1, 2))))
+
+ assert frozenset(co.CLIArgs.from_options(args).items()) == expected
diff --git a/test/units/utils/test_display.py b/test/units/utils/test_display.py
new file mode 100644
index 0000000..6b1914b
--- /dev/null
+++ b/test/units/utils/test_display.py
@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+# (c) 2020 Matt Martz <matt@sivel.net>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import locale
+import sys
+import unicodedata
+from unittest.mock import MagicMock
+
+import pytest
+
+from ansible.utils.display import _LIBC, _MAX_INT, Display, get_text_width
+from ansible.utils.multiprocessing import context as multiprocessing_context
+
+
+@pytest.fixture
+def problematic_wcswidth_chars():
+ problematic = []
+ try:
+ locale.setlocale(locale.LC_ALL, 'C.UTF-8')
+ except Exception:
+ return problematic
+
+ candidates = set(chr(c) for c in range(sys.maxunicode) if unicodedata.category(chr(c)) == 'Cf')
+ for c in candidates:
+ if _LIBC.wcswidth(c, _MAX_INT) == -1:
+ problematic.append(c)
+
+ return problematic
+
+
+def test_get_text_width():
+ locale.setlocale(locale.LC_ALL, '')
+ assert get_text_width(u'コンニチハ') == 10
+ assert get_text_width(u'abコcd') == 6
+ assert get_text_width(u'café') == 4
+ assert get_text_width(u'four') == 4
+ assert get_text_width(u'\u001B') == 0
+ assert get_text_width(u'ab\u0000') == 2
+ assert get_text_width(u'abコ\u0000') == 4
+ assert get_text_width(u'🚀🐮') == 4
+ assert get_text_width(u'\x08') == 0
+ assert get_text_width(u'\x08\x08') == 0
+ assert get_text_width(u'ab\x08cd') == 3
+ assert get_text_width(u'ab\x1bcd') == 3
+ assert get_text_width(u'ab\x7fcd') == 3
+ assert get_text_width(u'ab\x94cd') == 3
+
+ pytest.raises(TypeError, get_text_width, 1)
+ pytest.raises(TypeError, get_text_width, b'four')
+
+
+def test_get_text_width_no_locale(problematic_wcswidth_chars):
+ if not problematic_wcswidth_chars:
+ pytest.skip("No problmatic wcswidth chars")
+ locale.setlocale(locale.LC_ALL, 'C.UTF-8')
+ pytest.raises(EnvironmentError, get_text_width, problematic_wcswidth_chars[0])
+
+
+def test_Display_banner_get_text_width(monkeypatch):
+ locale.setlocale(locale.LC_ALL, '')
+ display = Display()
+ display_mock = MagicMock()
+ monkeypatch.setattr(display, 'display', display_mock)
+
+ display.banner(u'🚀🐮', color=False, cows=False)
+ args, kwargs = display_mock.call_args
+ msg = args[0]
+ stars = u' %s' % (75 * u'*')
+ assert msg.endswith(stars)
+
+
+def test_Display_banner_get_text_width_fallback(monkeypatch):
+ locale.setlocale(locale.LC_ALL, 'C.UTF-8')
+ display = Display()
+ display_mock = MagicMock()
+ monkeypatch.setattr(display, 'display', display_mock)
+
+ display.banner(u'\U000110cd', color=False, cows=False)
+ args, kwargs = display_mock.call_args
+ msg = args[0]
+ stars = u' %s' % (78 * u'*')
+ assert msg.endswith(stars)
+
+
+def test_Display_set_queue_parent():
+ display = Display()
+ pytest.raises(RuntimeError, display.set_queue, 'foo')
+
+
+def test_Display_set_queue_fork():
+ def test():
+ display = Display()
+ display.set_queue('foo')
+ assert display._final_q == 'foo'
+ p = multiprocessing_context.Process(target=test)
+ p.start()
+ p.join()
+ assert p.exitcode == 0
+
+
+def test_Display_display_fork():
+ def test():
+ queue = MagicMock()
+ display = Display()
+ display.set_queue(queue)
+ display.display('foo')
+ queue.send_display.assert_called_once_with(
+ 'foo', color=None, stderr=False, screen_only=False, log_only=False, newline=True
+ )
+
+ p = multiprocessing_context.Process(target=test)
+ p.start()
+ p.join()
+ assert p.exitcode == 0
+
+
+def test_Display_display_lock(monkeypatch):
+ lock = MagicMock()
+ display = Display()
+ monkeypatch.setattr(display, '_lock', lock)
+ display.display('foo')
+ lock.__enter__.assert_called_once_with()
+
+
+def test_Display_display_lock_fork(monkeypatch):
+ lock = MagicMock()
+ display = Display()
+ monkeypatch.setattr(display, '_lock', lock)
+ monkeypatch.setattr(display, '_final_q', MagicMock())
+ display.display('foo')
+ lock.__enter__.assert_not_called()
diff --git a/test/units/utils/test_encrypt.py b/test/units/utils/test_encrypt.py
new file mode 100644
index 0000000..72fe3b0
--- /dev/null
+++ b/test/units/utils/test_encrypt.py
@@ -0,0 +1,220 @@
+# (c) 2018, Matthias Fuchs <matthias.s.fuchs@gmail.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import sys
+
+import pytest
+
+from ansible.errors import AnsibleError, AnsibleFilterError
+from ansible.plugins.filter.core import get_encrypted_password
+from ansible.utils import encrypt
+
+
+class passlib_off(object):
+ def __init__(self):
+ self.orig = encrypt.PASSLIB_AVAILABLE
+
+ def __enter__(self):
+ encrypt.PASSLIB_AVAILABLE = False
+ return self
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ encrypt.PASSLIB_AVAILABLE = self.orig
+
+
+def assert_hash(expected, secret, algorithm, **settings):
+
+ if encrypt.PASSLIB_AVAILABLE:
+ assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected
+ assert encrypt.PasslibHash(algorithm).hash(secret, **settings) == expected
+ else:
+ assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected
+ with pytest.raises(AnsibleError) as excinfo:
+ encrypt.PasslibHash(algorithm).hash(secret, **settings)
+ assert excinfo.value.args[0] == "passlib must be installed and usable to hash with '%s'" % algorithm
+
+
+@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib')
+def test_encrypt_with_rounds_no_passlib():
+ with passlib_off():
+ assert_hash("$5$rounds=5000$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
+ secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000)
+ assert_hash("$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/",
+ secret="123", algorithm="sha256_crypt", salt="12345678", rounds=10000)
+ assert_hash("$6$rounds=5000$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.",
+ secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000)
+
+
+@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
+def test_encrypt_with_ident():
+ assert_hash("$2$12$123456789012345678901ufd3hZRrev.WXCbemqGIV/gmWaTGLImm",
+ secret="123", algorithm="bcrypt", salt='1234567890123456789012', ident='2')
+ assert_hash("$2y$12$123456789012345678901ugbM1PeTfRQ0t6dCJu5lQA8hwrZOYgDu",
+ secret="123", algorithm="bcrypt", salt='1234567890123456789012', ident='2y')
+ assert_hash("$2a$12$123456789012345678901ugbM1PeTfRQ0t6dCJu5lQA8hwrZOYgDu",
+ secret="123", algorithm="bcrypt", salt='1234567890123456789012', ident='2a')
+ assert_hash("$2b$12$123456789012345678901ugbM1PeTfRQ0t6dCJu5lQA8hwrZOYgDu",
+ secret="123", algorithm="bcrypt", salt='1234567890123456789012', ident='2b')
+ assert_hash("$2b$12$123456789012345678901ugbM1PeTfRQ0t6dCJu5lQA8hwrZOYgDu",
+ secret="123", algorithm="bcrypt", salt='1234567890123456789012')
+ # negative test: sha256_crypt does not take ident as parameter so ignore it
+ assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
+ secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000, ident='invalid_ident')
+
+
+# If passlib is not installed. this is identical to the test_encrypt_with_rounds_no_passlib() test
+@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
+def test_encrypt_with_rounds():
+ assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
+ secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000)
+ assert_hash("$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/",
+ secret="123", algorithm="sha256_crypt", salt="12345678", rounds=10000)
+ assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.",
+ secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000)
+
+
+@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib')
+def test_encrypt_default_rounds_no_passlib():
+ with passlib_off():
+ assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/",
+ secret="123", algorithm="md5_crypt", salt="12345678")
+ assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
+ secret="123", algorithm="sha256_crypt", salt="12345678")
+ assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.",
+ secret="123", algorithm="sha512_crypt", salt="12345678")
+
+ assert encrypt.CryptHash("md5_crypt").hash("123")
+
+
+# If passlib is not installed. this is identical to the test_encrypt_default_rounds_no_passlib() test
+@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
+def test_encrypt_default_rounds():
+ assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/",
+ secret="123", algorithm="md5_crypt", salt="12345678")
+ assert_hash("$5$rounds=535000$12345678$uy3TurUPaY71aioJi58HvUY8jkbhSQU8HepbyaNngv.",
+ secret="123", algorithm="sha256_crypt", salt="12345678")
+ assert_hash("$6$rounds=656000$12345678$InMy49UwxyCh2pGJU1NpOhVSElDDzKeyuC6n6E9O34BCUGVNYADnI.rcA3m.Vro9BiZpYmjEoNhpREqQcbvQ80",
+ secret="123", algorithm="sha512_crypt", salt="12345678")
+
+ assert encrypt.PasslibHash("md5_crypt").hash("123")
+
+
+@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib')
+def test_password_hash_filter_no_passlib():
+ with passlib_off():
+ assert not encrypt.PASSLIB_AVAILABLE
+ assert get_encrypted_password("123", "md5", salt="12345678") == "$1$12345678$tRy4cXc3kmcfRZVj4iFXr/"
+
+ with pytest.raises(AnsibleFilterError):
+ get_encrypted_password("123", "crypt16", salt="12")
+
+
+@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
+def test_password_hash_filter_passlib():
+
+ with pytest.raises(AnsibleFilterError):
+ get_encrypted_password("123", "sha257", salt="12345678")
+
+ # Uses passlib default rounds value for sha256 matching crypt behaviour
+ assert get_encrypted_password("123", "sha256", salt="12345678") == "$5$rounds=535000$12345678$uy3TurUPaY71aioJi58HvUY8jkbhSQU8HepbyaNngv."
+ assert get_encrypted_password("123", "sha256", salt="12345678", rounds=5000) == "$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7"
+
+ assert (get_encrypted_password("123", "sha256", salt="12345678", rounds=10000) ==
+ "$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/")
+
+ assert (get_encrypted_password("123", "sha512", salt="12345678", rounds=6000) ==
+ "$6$rounds=6000$12345678$l/fC67BdJwZrJ7qneKGP1b6PcatfBr0dI7W6JLBrsv8P1wnv/0pu4WJsWq5p6WiXgZ2gt9Aoir3MeORJxg4.Z/")
+
+ assert (get_encrypted_password("123", "sha512", salt="12345678", rounds=5000) ==
+ "$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.")
+
+ assert get_encrypted_password("123", "crypt16", salt="12") == "12pELHK2ME3McUFlHxel6uMM"
+
+ # Try algorithm that uses a raw salt
+ assert get_encrypted_password("123", "pbkdf2_sha256")
+ # Try algorithm with ident
+ assert get_encrypted_password("123", "pbkdf2_sha256", ident='invalid_ident')
+
+
+@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib')
+def test_do_encrypt_no_passlib():
+ with passlib_off():
+ assert not encrypt.PASSLIB_AVAILABLE
+ assert encrypt.do_encrypt("123", "md5_crypt", salt="12345678") == "$1$12345678$tRy4cXc3kmcfRZVj4iFXr/"
+
+ with pytest.raises(AnsibleError):
+ encrypt.do_encrypt("123", "crypt16", salt="12")
+
+
+@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
+def test_do_encrypt_passlib():
+ with pytest.raises(AnsibleError):
+ encrypt.do_encrypt("123", "sha257_crypt", salt="12345678")
+
+ # Uses passlib default rounds value for sha256 matching crypt behaviour.
+ assert encrypt.do_encrypt("123", "sha256_crypt", salt="12345678") == "$5$rounds=535000$12345678$uy3TurUPaY71aioJi58HvUY8jkbhSQU8HepbyaNngv."
+
+ assert encrypt.do_encrypt("123", "md5_crypt", salt="12345678") == "$1$12345678$tRy4cXc3kmcfRZVj4iFXr/"
+
+ assert encrypt.do_encrypt("123", "crypt16", salt="12") == "12pELHK2ME3McUFlHxel6uMM"
+
+ assert encrypt.do_encrypt("123", "bcrypt",
+ salt='1234567890123456789012',
+ ident='2a') == "$2a$12$123456789012345678901ugbM1PeTfRQ0t6dCJu5lQA8hwrZOYgDu"
+
+
+def test_random_salt():
+ res = encrypt.random_salt()
+ expected_salt_candidate_chars = u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789./'
+ assert len(res) == 8
+ for res_char in res:
+ assert res_char in expected_salt_candidate_chars
+
+
+@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='macOS requires passlib')
+def test_invalid_crypt_salt():
+ pytest.raises(
+ AnsibleError,
+ encrypt.CryptHash('bcrypt')._salt,
+ '_',
+ None
+ )
+ encrypt.CryptHash('bcrypt')._salt('1234567890123456789012', None)
+ pytest.raises(
+ AnsibleError,
+ encrypt.CryptHash('bcrypt')._salt,
+ 'kljsdf',
+ None
+ )
+ encrypt.CryptHash('sha256_crypt')._salt('123456', None)
+ pytest.raises(
+ AnsibleError,
+ encrypt.CryptHash('sha256_crypt')._salt,
+ '1234567890123456789012',
+ None
+ )
+
+
+def test_passlib_bcrypt_salt(recwarn):
+ passlib_exc = pytest.importorskip("passlib.exc")
+
+ secret = 'foo'
+ salt = '1234567890123456789012'
+ repaired_salt = '123456789012345678901u'
+ expected = '$2b$12$123456789012345678901uMv44x.2qmQeefEGb3bcIRc1mLuO7bqa'
+ ident = '2b'
+
+ p = encrypt.PasslibHash('bcrypt')
+
+ result = p.hash(secret, salt=salt, ident=ident)
+ passlib_warnings = [w.message for w in recwarn if isinstance(w.message, passlib_exc.PasslibHashWarning)]
+ assert len(passlib_warnings) == 0
+ assert result == expected
+
+ recwarn.clear()
+
+ result = p.hash(secret, salt=repaired_salt, ident=ident)
+ assert result == expected
diff --git a/test/units/utils/test_helpers.py b/test/units/utils/test_helpers.py
new file mode 100644
index 0000000..ec37b39
--- /dev/null
+++ b/test/units/utils/test_helpers.py
@@ -0,0 +1,34 @@
+# (c) 2015, Marius Gedminas <marius@gedmin.as>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+
+from ansible.utils.helpers import pct_to_int
+
+
+class TestHelpers(unittest.TestCase):
+
+ def test_pct_to_int(self):
+ self.assertEqual(pct_to_int(1, 100), 1)
+ self.assertEqual(pct_to_int(-1, 100), -1)
+ self.assertEqual(pct_to_int("1%", 10), 1)
+ self.assertEqual(pct_to_int("1%", 10, 0), 0)
+ self.assertEqual(pct_to_int("1", 100), 1)
+ self.assertEqual(pct_to_int("10%", 100), 10)
diff --git a/test/units/utils/test_isidentifier.py b/test/units/utils/test_isidentifier.py
new file mode 100644
index 0000000..de6de64
--- /dev/null
+++ b/test/units/utils/test_isidentifier.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible.utils.vars import isidentifier
+
+
+# Originally posted at: http://stackoverflow.com/a/29586366
+
+
+@pytest.mark.parametrize(
+ "identifier", [
+ "foo", "foo1_23",
+ ]
+)
+def test_valid_identifier(identifier):
+ assert isidentifier(identifier)
+
+
+@pytest.mark.parametrize(
+ "identifier", [
+ "pass", "foo ", " foo", "1234", "1234abc", "", " ", "foo bar", "no-dashed-names-for-you",
+ ]
+)
+def test_invalid_identifier(identifier):
+ assert not isidentifier(identifier)
+
+
+def test_keywords_not_in_PY2():
+ """In Python 2 ("True", "False", "None") are not keywords. The isidentifier
+ method ensures that those are treated as keywords on both Python 2 and 3.
+ """
+ assert not isidentifier("True")
+ assert not isidentifier("False")
+ assert not isidentifier("None")
+
+
+def test_non_ascii():
+ """In Python 3 non-ascii characters are allowed as opposed to Python 2. The
+ isidentifier method ensures that those are treated as keywords on both
+ Python 2 and 3.
+ """
+ assert not isidentifier("křížek")
diff --git a/test/units/utils/test_plugin_docs.py b/test/units/utils/test_plugin_docs.py
new file mode 100644
index 0000000..ff973b1
--- /dev/null
+++ b/test/units/utils/test_plugin_docs.py
@@ -0,0 +1,333 @@
+# -*- coding: utf-8 -*-
+# (c) 2020 Felix Fontein <felix@fontein.de>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import copy
+
+import pytest
+
+from ansible.utils.plugin_docs import (
+ add_collection_to_versions_and_dates,
+)
+
+
+ADD_TESTS = [
+ (
+ # Module options
+ True,
+ False,
+ {
+ 'author': 'x',
+ 'version_added': '1.0.0',
+ 'deprecated': {
+ 'removed_in': '2.0.0',
+ },
+ 'options': {
+ 'test': {
+ 'description': '',
+ 'type': 'str',
+ 'version_added': '1.1.0',
+ 'deprecated': {
+ # should not be touched since this isn't a plugin
+ 'removed_in': '2.0.0',
+ },
+ 'env': [
+ # should not be touched since this isn't a plugin
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ },
+ ],
+ 'ini': [
+ # should not be touched since this isn't a plugin
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ },
+ ],
+ 'vars': [
+ # should not be touched since this isn't a plugin
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'removed_at_date': '2020-01-01',
+ },
+ },
+ ],
+ },
+ 'subtest': {
+ 'description': '',
+ 'type': 'dict',
+ 'deprecated': {
+ # should not be touched since this isn't a plugin
+ 'version': '2.0.0',
+ },
+ 'suboptions': {
+ 'suboption': {
+ 'description': '',
+ 'type': 'int',
+ 'version_added': '1.2.0',
+ }
+ },
+ }
+ },
+ },
+ {
+ 'author': 'x',
+ 'version_added': '1.0.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ 'removed_in': '2.0.0',
+ 'removed_from_collection': 'foo.bar',
+ },
+ 'options': {
+ 'test': {
+ 'description': '',
+ 'type': 'str',
+ 'version_added': '1.1.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ # should not be touched since this isn't a plugin
+ 'removed_in': '2.0.0',
+ },
+ 'env': [
+ # should not be touched since this isn't a plugin
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ },
+ ],
+ 'ini': [
+ # should not be touched since this isn't a plugin
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ },
+ ],
+ 'vars': [
+ # should not be touched since this isn't a plugin
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'removed_at_date': '2020-01-01',
+ },
+ },
+ ],
+ },
+ 'subtest': {
+ 'description': '',
+ 'type': 'dict',
+ 'deprecated': {
+ # should not be touched since this isn't a plugin
+ 'version': '2.0.0',
+ },
+ 'suboptions': {
+ 'suboption': {
+ 'description': '',
+ 'type': 'int',
+ 'version_added': '1.2.0',
+ 'version_added_collection': 'foo.bar',
+ }
+ },
+ }
+ },
+ },
+ ),
+ (
+ # Module options
+ True,
+ False,
+ {
+ 'author': 'x',
+ 'deprecated': {
+ 'removed_at_date': '2020-01-01',
+ },
+ },
+ {
+ 'author': 'x',
+ 'deprecated': {
+ 'removed_at_date': '2020-01-01',
+ 'removed_from_collection': 'foo.bar',
+ },
+ },
+ ),
+ (
+ # Plugin options
+ False,
+ False,
+ {
+ 'author': 'x',
+ 'version_added': '1.0.0',
+ 'deprecated': {
+ 'removed_in': '2.0.0',
+ },
+ 'options': {
+ 'test': {
+ 'description': '',
+ 'type': 'str',
+ 'version_added': '1.1.0',
+ 'deprecated': {
+ # should not be touched since this is the wrong name
+ 'removed_in': '2.0.0',
+ },
+ 'env': [
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ },
+ ],
+ 'ini': [
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ },
+ ],
+ 'vars': [
+ {
+ 'version_added': '1.3.0',
+ 'deprecated': {
+ 'removed_at_date': '2020-01-01',
+ },
+ },
+ ],
+ },
+ 'subtest': {
+ 'description': '',
+ 'type': 'dict',
+ 'deprecated': {
+ 'version': '2.0.0',
+ },
+ 'suboptions': {
+ 'suboption': {
+ 'description': '',
+ 'type': 'int',
+ 'version_added': '1.2.0',
+ }
+ },
+ }
+ },
+ },
+ {
+ 'author': 'x',
+ 'version_added': '1.0.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ 'removed_in': '2.0.0',
+ 'removed_from_collection': 'foo.bar',
+ },
+ 'options': {
+ 'test': {
+ 'description': '',
+ 'type': 'str',
+ 'version_added': '1.1.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ # should not be touched since this is the wrong name
+ 'removed_in': '2.0.0',
+ },
+ 'env': [
+ {
+ 'version_added': '1.3.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ 'version': '2.0.0',
+ 'collection_name': 'foo.bar',
+ },
+ },
+ ],
+ 'ini': [
+ {
+ 'version_added': '1.3.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ 'version': '2.0.0',
+ 'collection_name': 'foo.bar',
+ },
+ },
+ ],
+ 'vars': [
+ {
+ 'version_added': '1.3.0',
+ 'version_added_collection': 'foo.bar',
+ 'deprecated': {
+ 'removed_at_date': '2020-01-01',
+ 'collection_name': 'foo.bar',
+ },
+ },
+ ],
+ },
+ 'subtest': {
+ 'description': '',
+ 'type': 'dict',
+ 'deprecated': {
+ 'version': '2.0.0',
+ 'collection_name': 'foo.bar',
+ },
+ 'suboptions': {
+ 'suboption': {
+ 'description': '',
+ 'type': 'int',
+ 'version_added': '1.2.0',
+ 'version_added_collection': 'foo.bar',
+ }
+ },
+ }
+ },
+ },
+ ),
+ (
+ # Return values
+ True, # this value is is ignored
+ True,
+ {
+ 'rv1': {
+ 'version_added': '1.0.0',
+ 'type': 'dict',
+ 'contains': {
+ 'srv1': {
+ 'version_added': '1.1.0',
+ },
+ 'srv2': {
+ },
+ }
+ },
+ },
+ {
+ 'rv1': {
+ 'version_added': '1.0.0',
+ 'version_added_collection': 'foo.bar',
+ 'type': 'dict',
+ 'contains': {
+ 'srv1': {
+ 'version_added': '1.1.0',
+ 'version_added_collection': 'foo.bar',
+ },
+ 'srv2': {
+ },
+ }
+ },
+ },
+ ),
+]
+
+
+@pytest.mark.parametrize('is_module,return_docs,fragment,expected_fragment', ADD_TESTS)
+def test_add(is_module, return_docs, fragment, expected_fragment):
+ fragment_copy = copy.deepcopy(fragment)
+ add_collection_to_versions_and_dates(fragment_copy, 'foo.bar', is_module, return_docs)
+ assert fragment_copy == expected_fragment
diff --git a/test/units/utils/test_shlex.py b/test/units/utils/test_shlex.py
new file mode 100644
index 0000000..e13d302
--- /dev/null
+++ b/test/units/utils/test_shlex.py
@@ -0,0 +1,41 @@
+# (c) 2015, Marius Gedminas <marius@gedmin.as>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+
+from ansible.utils.shlex import shlex_split
+
+
+class TestSplit(unittest.TestCase):
+
+ def test_trivial(self):
+ self.assertEqual(shlex_split("a b c"), ["a", "b", "c"])
+
+ def test_unicode(self):
+ self.assertEqual(shlex_split(u"a b \u010D"), [u"a", u"b", u"\u010D"])
+
+ def test_quoted(self):
+ self.assertEqual(shlex_split('"a b" c'), ["a b", "c"])
+
+ def test_comments(self):
+ self.assertEqual(shlex_split('"a b" c # d', comments=True), ["a b", "c"])
+
+ def test_error(self):
+ self.assertRaises(ValueError, shlex_split, 'a "b')
diff --git a/test/units/utils/test_unsafe_proxy.py b/test/units/utils/test_unsafe_proxy.py
new file mode 100644
index 0000000..ea653cf
--- /dev/null
+++ b/test/units/utils/test_unsafe_proxy.py
@@ -0,0 +1,121 @@
+# -*- coding: utf-8 -*-
+# (c) 2018 Matt Martz <matt@sivel.net>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from ansible.module_utils.six import PY3
+from ansible.utils.unsafe_proxy import AnsibleUnsafe, AnsibleUnsafeBytes, AnsibleUnsafeText, wrap_var
+from ansible.module_utils.common.text.converters import to_text, to_bytes
+
+
+def test_wrap_var_text():
+ assert isinstance(wrap_var(u'foo'), AnsibleUnsafeText)
+
+
+def test_wrap_var_bytes():
+ assert isinstance(wrap_var(b'foo'), AnsibleUnsafeBytes)
+
+
+def test_wrap_var_string():
+ if PY3:
+ assert isinstance(wrap_var('foo'), AnsibleUnsafeText)
+ else:
+ assert isinstance(wrap_var('foo'), AnsibleUnsafeBytes)
+
+
+def test_wrap_var_dict():
+ assert isinstance(wrap_var(dict(foo='bar')), dict)
+ assert not isinstance(wrap_var(dict(foo='bar')), AnsibleUnsafe)
+ assert isinstance(wrap_var(dict(foo=u'bar'))['foo'], AnsibleUnsafeText)
+
+
+def test_wrap_var_dict_None():
+ assert wrap_var(dict(foo=None))['foo'] is None
+ assert not isinstance(wrap_var(dict(foo=None))['foo'], AnsibleUnsafe)
+
+
+def test_wrap_var_list():
+ assert isinstance(wrap_var(['foo']), list)
+ assert not isinstance(wrap_var(['foo']), AnsibleUnsafe)
+ assert isinstance(wrap_var([u'foo'])[0], AnsibleUnsafeText)
+
+
+def test_wrap_var_list_None():
+ assert wrap_var([None])[0] is None
+ assert not isinstance(wrap_var([None])[0], AnsibleUnsafe)
+
+
+def test_wrap_var_set():
+ assert isinstance(wrap_var(set(['foo'])), set)
+ assert not isinstance(wrap_var(set(['foo'])), AnsibleUnsafe)
+ for item in wrap_var(set([u'foo'])):
+ assert isinstance(item, AnsibleUnsafeText)
+
+
+def test_wrap_var_set_None():
+ for item in wrap_var(set([None])):
+ assert item is None
+ assert not isinstance(item, AnsibleUnsafe)
+
+
+def test_wrap_var_tuple():
+ assert isinstance(wrap_var(('foo',)), tuple)
+ assert not isinstance(wrap_var(('foo',)), AnsibleUnsafe)
+ assert isinstance(wrap_var(('foo',))[0], AnsibleUnsafe)
+
+
+def test_wrap_var_tuple_None():
+ assert wrap_var((None,))[0] is None
+ assert not isinstance(wrap_var((None,))[0], AnsibleUnsafe)
+
+
+def test_wrap_var_None():
+ assert wrap_var(None) is None
+ assert not isinstance(wrap_var(None), AnsibleUnsafe)
+
+
+def test_wrap_var_unsafe_text():
+ assert isinstance(wrap_var(AnsibleUnsafeText(u'foo')), AnsibleUnsafeText)
+
+
+def test_wrap_var_unsafe_bytes():
+ assert isinstance(wrap_var(AnsibleUnsafeBytes(b'foo')), AnsibleUnsafeBytes)
+
+
+def test_wrap_var_no_ref():
+ thing = {
+ 'foo': {
+ 'bar': 'baz'
+ },
+ 'bar': ['baz', 'qux'],
+ 'baz': ('qux',),
+ 'none': None,
+ 'text': 'text',
+ }
+ wrapped_thing = wrap_var(thing)
+ thing is not wrapped_thing
+ thing['foo'] is not wrapped_thing['foo']
+ thing['bar'][0] is not wrapped_thing['bar'][0]
+ thing['baz'][0] is not wrapped_thing['baz'][0]
+ thing['none'] is not wrapped_thing['none']
+ thing['text'] is not wrapped_thing['text']
+
+
+def test_AnsibleUnsafeText():
+ assert isinstance(AnsibleUnsafeText(u'foo'), AnsibleUnsafe)
+
+
+def test_AnsibleUnsafeBytes():
+ assert isinstance(AnsibleUnsafeBytes(b'foo'), AnsibleUnsafe)
+
+
+def test_to_text_unsafe():
+ assert isinstance(to_text(AnsibleUnsafeBytes(b'foo')), AnsibleUnsafeText)
+ assert to_text(AnsibleUnsafeBytes(b'foo')) == AnsibleUnsafeText(u'foo')
+
+
+def test_to_bytes_unsafe():
+ assert isinstance(to_bytes(AnsibleUnsafeText(u'foo')), AnsibleUnsafeBytes)
+ assert to_bytes(AnsibleUnsafeText(u'foo')) == AnsibleUnsafeBytes(b'foo')
diff --git a/test/units/utils/test_vars.py b/test/units/utils/test_vars.py
new file mode 100644
index 0000000..9be33de
--- /dev/null
+++ b/test/units/utils/test_vars.py
@@ -0,0 +1,284 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2015, Toshio Kuraotmi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from collections import defaultdict
+
+from unittest import mock
+
+from units.compat import unittest
+from ansible.errors import AnsibleError
+from ansible.utils.vars import combine_vars, merge_hash
+
+
+class TestVariableUtils(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ combine_vars_merge_data = (
+ dict(
+ a=dict(a=1),
+ b=dict(b=2),
+ result=dict(a=1, b=2),
+ ),
+ dict(
+ a=dict(a=1, c=dict(foo='bar')),
+ b=dict(b=2, c=dict(baz='bam')),
+ result=dict(a=1, b=2, c=dict(foo='bar', baz='bam'))
+ ),
+ dict(
+ a=defaultdict(a=1, c=defaultdict(foo='bar')),
+ b=dict(b=2, c=dict(baz='bam')),
+ result=defaultdict(a=1, b=2, c=defaultdict(foo='bar', baz='bam'))
+ ),
+ )
+ combine_vars_replace_data = (
+ dict(
+ a=dict(a=1),
+ b=dict(b=2),
+ result=dict(a=1, b=2)
+ ),
+ dict(
+ a=dict(a=1, c=dict(foo='bar')),
+ b=dict(b=2, c=dict(baz='bam')),
+ result=dict(a=1, b=2, c=dict(baz='bam'))
+ ),
+ dict(
+ a=defaultdict(a=1, c=dict(foo='bar')),
+ b=dict(b=2, c=defaultdict(baz='bam')),
+ result=defaultdict(a=1, b=2, c=defaultdict(baz='bam'))
+ ),
+ )
+
+ def test_combine_vars_improper_args(self):
+ with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'replace'):
+ with self.assertRaises(AnsibleError):
+ combine_vars([1, 2, 3], dict(a=1))
+ with self.assertRaises(AnsibleError):
+ combine_vars(dict(a=1), [1, 2, 3])
+
+ with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'merge'):
+ with self.assertRaises(AnsibleError):
+ combine_vars([1, 2, 3], dict(a=1))
+ with self.assertRaises(AnsibleError):
+ combine_vars(dict(a=1), [1, 2, 3])
+
+ def test_combine_vars_replace(self):
+ with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'replace'):
+ for test in self.combine_vars_replace_data:
+ self.assertEqual(combine_vars(test['a'], test['b']), test['result'])
+
+ def test_combine_vars_merge(self):
+ with mock.patch('ansible.constants.DEFAULT_HASH_BEHAVIOUR', 'merge'):
+ for test in self.combine_vars_merge_data:
+ self.assertEqual(combine_vars(test['a'], test['b']), test['result'])
+
+ merge_hash_data = {
+ "low_prio": {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "low_value",
+ "list": ["low_value"]
+ }
+ },
+ "b": [1, 1, 2, 3]
+ },
+ "high_prio": {
+ "a": {
+ "a'": {
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["high_value"]
+ }
+ },
+ "b": [3, 4, 4, {"5": "value"}]
+ }
+ }
+
+ def test_merge_hash_simple(self):
+ for test in self.combine_vars_merge_data:
+ self.assertEqual(merge_hash(test['a'], test['b']), test['result'])
+
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["high_value"]
+ }
+ },
+ "b": high['b']
+ }
+ self.assertEqual(merge_hash(low, high), expected)
+
+ def test_merge_hash_non_recursive_and_list_replace(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = high
+ self.assertEqual(merge_hash(low, high, False, 'replace'), expected)
+
+ def test_merge_hash_non_recursive_and_list_keep(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": high['a'],
+ "b": low['b']
+ }
+ self.assertEqual(merge_hash(low, high, False, 'keep'), expected)
+
+ def test_merge_hash_non_recursive_and_list_append(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": high['a'],
+ "b": low['b'] + high['b']
+ }
+ self.assertEqual(merge_hash(low, high, False, 'append'), expected)
+
+ def test_merge_hash_non_recursive_and_list_prepend(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": high['a'],
+ "b": high['b'] + low['b']
+ }
+ self.assertEqual(merge_hash(low, high, False, 'prepend'), expected)
+
+ def test_merge_hash_non_recursive_and_list_append_rp(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": high['a'],
+ "b": [1, 1, 2] + high['b']
+ }
+ self.assertEqual(merge_hash(low, high, False, 'append_rp'), expected)
+
+ def test_merge_hash_non_recursive_and_list_prepend_rp(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": high['a'],
+ "b": high['b'] + [1, 1, 2]
+ }
+ self.assertEqual(merge_hash(low, high, False, 'prepend_rp'), expected)
+
+ def test_merge_hash_recursive_and_list_replace(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["high_value"]
+ }
+ },
+ "b": high['b']
+ }
+ self.assertEqual(merge_hash(low, high, True, 'replace'), expected)
+
+ def test_merge_hash_recursive_and_list_keep(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["low_value"]
+ }
+ },
+ "b": low['b']
+ }
+ self.assertEqual(merge_hash(low, high, True, 'keep'), expected)
+
+ def test_merge_hash_recursive_and_list_append(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["low_value", "high_value"]
+ }
+ },
+ "b": low['b'] + high['b']
+ }
+ self.assertEqual(merge_hash(low, high, True, 'append'), expected)
+
+ def test_merge_hash_recursive_and_list_prepend(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["high_value", "low_value"]
+ }
+ },
+ "b": high['b'] + low['b']
+ }
+ self.assertEqual(merge_hash(low, high, True, 'prepend'), expected)
+
+ def test_merge_hash_recursive_and_list_append_rp(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["low_value", "high_value"]
+ }
+ },
+ "b": [1, 1, 2] + high['b']
+ }
+ self.assertEqual(merge_hash(low, high, True, 'append_rp'), expected)
+
+ def test_merge_hash_recursive_and_list_prepend_rp(self):
+ low = self.merge_hash_data['low_prio']
+ high = self.merge_hash_data['high_prio']
+ expected = {
+ "a": {
+ "a'": {
+ "x": "low_value",
+ "y": "high_value",
+ "z": "high_value",
+ "list": ["high_value", "low_value"]
+ }
+ },
+ "b": high['b'] + [1, 1, 2]
+ }
+ self.assertEqual(merge_hash(low, high, True, 'prepend_rp'), expected)
diff --git a/test/units/utils/test_version.py b/test/units/utils/test_version.py
new file mode 100644
index 0000000..3c2cbaf
--- /dev/null
+++ b/test/units/utils/test_version.py
@@ -0,0 +1,335 @@
+# -*- coding: utf-8 -*-
+# (c) 2020 Matt Martz <matt@sivel.net>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from ansible.module_utils.compat.version import LooseVersion, StrictVersion
+
+import pytest
+
+from ansible.utils.version import _Alpha, _Numeric, SemanticVersion
+
+
+EQ = [
+ ('1.0.0', '1.0.0', True),
+ ('1.0.0', '1.0.0-beta', False),
+ ('1.0.0-beta2+build1', '1.0.0-beta.2+build.1', False),
+ ('1.0.0-beta+build', '1.0.0-beta+build', True),
+ ('1.0.0-beta+build1', '1.0.0-beta+build2', True),
+ ('1.0.0-beta+a', '1.0.0-alpha+bar', False),
+]
+
+NE = [
+ ('1.0.0', '1.0.0', False),
+ ('1.0.0', '1.0.0-beta', True),
+ ('1.0.0-beta2+build1', '1.0.0-beta.2+build.1', True),
+ ('1.0.0-beta+build', '1.0.0-beta+build', False),
+ ('1.0.0-beta+a', '1.0.0-alpha+bar', True),
+]
+
+LT = [
+ ('1.0.0', '2.0.0', True),
+ ('1.0.0-beta', '2.0.0-alpha', True),
+ ('1.0.0-alpha', '2.0.0-beta', True),
+ ('1.0.0-alpha', '1.0.0', True),
+ ('1.0.0-beta', '1.0.0-alpha3', False),
+ ('1.0.0+foo', '1.0.0-alpha', False),
+ ('1.0.0-beta.1', '1.0.0-beta.a', True),
+ ('1.0.0-beta+a', '1.0.0-alpha+bar', False),
+]
+
+GT = [
+ ('1.0.0', '2.0.0', False),
+ ('1.0.0-beta', '2.0.0-alpha', False),
+ ('1.0.0-alpha', '2.0.0-beta', False),
+ ('1.0.0-alpha', '1.0.0', False),
+ ('1.0.0-beta', '1.0.0-alpha3', True),
+ ('1.0.0+foo', '1.0.0-alpha', True),
+ ('1.0.0-beta.1', '1.0.0-beta.a', False),
+ ('1.0.0-beta+a', '1.0.0-alpha+bar', True),
+]
+
+LE = [
+ ('1.0.0', '1.0.0', True),
+ ('1.0.0', '2.0.0', True),
+ ('1.0.0-alpha', '1.0.0-beta', True),
+ ('1.0.0-beta', '1.0.0-alpha', False),
+]
+
+GE = [
+ ('1.0.0', '1.0.0', True),
+ ('1.0.0', '2.0.0', False),
+ ('1.0.0-alpha', '1.0.0-beta', False),
+ ('1.0.0-beta', '1.0.0-alpha', True),
+]
+
+VALID = [
+ "0.0.4",
+ "1.2.3",
+ "10.20.30",
+ "1.1.2-prerelease+meta",
+ "1.1.2+meta",
+ "1.1.2+meta-valid",
+ "1.0.0-alpha",
+ "1.0.0-beta",
+ "1.0.0-alpha.beta",
+ "1.0.0-alpha.beta.1",
+ "1.0.0-alpha.1",
+ "1.0.0-alpha0.valid",
+ "1.0.0-alpha.0valid",
+ "1.0.0-alpha-a.b-c-somethinglong+build.1-aef.1-its-okay",
+ "1.0.0-rc.1+build.1",
+ "2.0.0-rc.1+build.123",
+ "1.2.3-beta",
+ "10.2.3-DEV-SNAPSHOT",
+ "1.2.3-SNAPSHOT-123",
+ "1.0.0",
+ "2.0.0",
+ "1.1.7",
+ "2.0.0+build.1848",
+ "2.0.1-alpha.1227",
+ "1.0.0-alpha+beta",
+ "1.2.3----RC-SNAPSHOT.12.9.1--.12+788",
+ "1.2.3----R-S.12.9.1--.12+meta",
+ "1.2.3----RC-SNAPSHOT.12.9.1--.12",
+ "1.0.0+0.build.1-rc.10000aaa-kk-0.1",
+ "99999999999999999999999.999999999999999999.99999999999999999",
+ "1.0.0-0A.is.legal",
+]
+
+INVALID = [
+ "1",
+ "1.2",
+ "1.2.3-0123",
+ "1.2.3-0123.0123",
+ "1.1.2+.123",
+ "+invalid",
+ "-invalid",
+ "-invalid+invalid",
+ "-invalid.01",
+ "alpha",
+ "alpha.beta",
+ "alpha.beta.1",
+ "alpha.1",
+ "alpha+beta",
+ "alpha_beta",
+ "alpha.",
+ "alpha..",
+ "beta",
+ "1.0.0-alpha_beta",
+ "-alpha.",
+ "1.0.0-alpha..",
+ "1.0.0-alpha..1",
+ "1.0.0-alpha...1",
+ "1.0.0-alpha....1",
+ "1.0.0-alpha.....1",
+ "1.0.0-alpha......1",
+ "1.0.0-alpha.......1",
+ "01.1.1",
+ "1.01.1",
+ "1.1.01",
+ "1.2",
+ "1.2.3.DEV",
+ "1.2-SNAPSHOT",
+ "1.2.31.2.3----RC-SNAPSHOT.12.09.1--..12+788",
+ "1.2-RC-SNAPSHOT",
+ "-1.0.3-gamma+b7718",
+ "+justmeta",
+ "9.8.7+meta+meta",
+ "9.8.7-whatever+meta+meta",
+]
+
+PRERELEASE = [
+ ('1.0.0-alpha', True),
+ ('1.0.0-alpha.1', True),
+ ('1.0.0-0.3.7', True),
+ ('1.0.0-x.7.z.92', True),
+ ('0.1.2', False),
+ ('0.1.2+bob', False),
+ ('1.0.0', False),
+]
+
+STABLE = [
+ ('1.0.0-alpha', False),
+ ('1.0.0-alpha.1', False),
+ ('1.0.0-0.3.7', False),
+ ('1.0.0-x.7.z.92', False),
+ ('0.1.2', False),
+ ('0.1.2+bob', False),
+ ('1.0.0', True),
+ ('1.0.0+bob', True),
+]
+
+LOOSE_VERSION = [
+ (LooseVersion('1'), SemanticVersion('1.0.0')),
+ (LooseVersion('1-alpha'), SemanticVersion('1.0.0-alpha')),
+ (LooseVersion('1.0.0-alpha+build'), SemanticVersion('1.0.0-alpha+build')),
+]
+
+LOOSE_VERSION_INVALID = [
+ LooseVersion('1.a.3'),
+ LooseVersion(),
+ 'bar',
+ StrictVersion('1.2.3'),
+]
+
+
+def test_semanticversion_none():
+ assert SemanticVersion().major is None
+
+
+@pytest.mark.parametrize('left,right,expected', EQ)
+def test_eq(left, right, expected):
+ assert (SemanticVersion(left) == SemanticVersion(right)) is expected
+
+
+@pytest.mark.parametrize('left,right,expected', NE)
+def test_ne(left, right, expected):
+ assert (SemanticVersion(left) != SemanticVersion(right)) is expected
+
+
+@pytest.mark.parametrize('left,right,expected', LT)
+def test_lt(left, right, expected):
+ assert (SemanticVersion(left) < SemanticVersion(right)) is expected
+
+
+@pytest.mark.parametrize('left,right,expected', LE)
+def test_le(left, right, expected):
+ assert (SemanticVersion(left) <= SemanticVersion(right)) is expected
+
+
+@pytest.mark.parametrize('left,right,expected', GT)
+def test_gt(left, right, expected):
+ assert (SemanticVersion(left) > SemanticVersion(right)) is expected
+
+
+@pytest.mark.parametrize('left,right,expected', GE)
+def test_ge(left, right, expected):
+ assert (SemanticVersion(left) >= SemanticVersion(right)) is expected
+
+
+@pytest.mark.parametrize('value', VALID)
+def test_valid(value):
+ SemanticVersion(value)
+
+
+@pytest.mark.parametrize('value', INVALID)
+def test_invalid(value):
+ pytest.raises(ValueError, SemanticVersion, value)
+
+
+def test_example_precedence():
+ # https://semver.org/#spec-item-11
+ sv = SemanticVersion
+ assert sv('1.0.0') < sv('2.0.0') < sv('2.1.0') < sv('2.1.1')
+ assert sv('1.0.0-alpha') < sv('1.0.0')
+ assert sv('1.0.0-alpha') < sv('1.0.0-alpha.1') < sv('1.0.0-alpha.beta')
+ assert sv('1.0.0-beta') < sv('1.0.0-beta.2') < sv('1.0.0-beta.11') < sv('1.0.0-rc.1') < sv('1.0.0')
+
+
+@pytest.mark.parametrize('value,expected', PRERELEASE)
+def test_prerelease(value, expected):
+ assert SemanticVersion(value).is_prerelease is expected
+
+
+@pytest.mark.parametrize('value,expected', STABLE)
+def test_stable(value, expected):
+ assert SemanticVersion(value).is_stable is expected
+
+
+@pytest.mark.parametrize('value,expected', LOOSE_VERSION)
+def test_from_loose_version(value, expected):
+ assert SemanticVersion.from_loose_version(value) == expected
+
+
+@pytest.mark.parametrize('value', LOOSE_VERSION_INVALID)
+def test_from_loose_version_invalid(value):
+ pytest.raises((AttributeError, ValueError), SemanticVersion.from_loose_version, value)
+
+
+def test_comparison_with_string():
+ assert SemanticVersion('1.0.0') > '0.1.0'
+
+
+def test_alpha():
+ assert _Alpha('a') == _Alpha('a')
+ assert _Alpha('a') == 'a'
+ assert _Alpha('a') != _Alpha('b')
+ assert _Alpha('a') != 1
+ assert _Alpha('a') < _Alpha('b')
+ assert _Alpha('a') < 'c'
+ assert _Alpha('a') > _Numeric(1)
+ with pytest.raises(ValueError):
+ _Alpha('a') < None
+ assert _Alpha('a') <= _Alpha('a')
+ assert _Alpha('a') <= _Alpha('b')
+ assert _Alpha('b') >= _Alpha('a')
+ assert _Alpha('b') >= _Alpha('b')
+
+ # The following 3*6 tests check that all comparison operators perform
+ # as expected. DO NOT remove any of them, or reformulate them (to remove
+ # the explicit `not`)!
+
+ assert _Alpha('a') == _Alpha('a')
+ assert not _Alpha('a') != _Alpha('a') # pylint: disable=unneeded-not
+ assert not _Alpha('a') < _Alpha('a') # pylint: disable=unneeded-not
+ assert _Alpha('a') <= _Alpha('a')
+ assert not _Alpha('a') > _Alpha('a') # pylint: disable=unneeded-not
+ assert _Alpha('a') >= _Alpha('a')
+
+ assert not _Alpha('a') == _Alpha('b') # pylint: disable=unneeded-not
+ assert _Alpha('a') != _Alpha('b')
+ assert _Alpha('a') < _Alpha('b')
+ assert _Alpha('a') <= _Alpha('b')
+ assert not _Alpha('a') > _Alpha('b') # pylint: disable=unneeded-not
+ assert not _Alpha('a') >= _Alpha('b') # pylint: disable=unneeded-not
+
+ assert not _Alpha('b') == _Alpha('a') # pylint: disable=unneeded-not
+ assert _Alpha('b') != _Alpha('a')
+ assert not _Alpha('b') < _Alpha('a') # pylint: disable=unneeded-not
+ assert not _Alpha('b') <= _Alpha('a') # pylint: disable=unneeded-not
+ assert _Alpha('b') > _Alpha('a')
+ assert _Alpha('b') >= _Alpha('a')
+
+
+def test_numeric():
+ assert _Numeric(1) == _Numeric(1)
+ assert _Numeric(1) == 1
+ assert _Numeric(1) != _Numeric(2)
+ assert _Numeric(1) != 'a'
+ assert _Numeric(1) < _Numeric(2)
+ assert _Numeric(1) < 3
+ assert _Numeric(1) < _Alpha('b')
+ with pytest.raises(ValueError):
+ _Numeric(1) < None
+ assert _Numeric(1) <= _Numeric(1)
+ assert _Numeric(1) <= _Numeric(2)
+ assert _Numeric(2) >= _Numeric(1)
+ assert _Numeric(2) >= _Numeric(2)
+
+ # The following 3*6 tests check that all comparison operators perform
+ # as expected. DO NOT remove any of them, or reformulate them (to remove
+ # the explicit `not`)!
+
+ assert _Numeric(1) == _Numeric(1)
+ assert not _Numeric(1) != _Numeric(1) # pylint: disable=unneeded-not
+ assert not _Numeric(1) < _Numeric(1) # pylint: disable=unneeded-not
+ assert _Numeric(1) <= _Numeric(1)
+ assert not _Numeric(1) > _Numeric(1) # pylint: disable=unneeded-not
+ assert _Numeric(1) >= _Numeric(1)
+
+ assert not _Numeric(1) == _Numeric(2) # pylint: disable=unneeded-not
+ assert _Numeric(1) != _Numeric(2)
+ assert _Numeric(1) < _Numeric(2)
+ assert _Numeric(1) <= _Numeric(2)
+ assert not _Numeric(1) > _Numeric(2) # pylint: disable=unneeded-not
+ assert not _Numeric(1) >= _Numeric(2) # pylint: disable=unneeded-not
+
+ assert not _Numeric(2) == _Numeric(1) # pylint: disable=unneeded-not
+ assert _Numeric(2) != _Numeric(1)
+ assert not _Numeric(2) < _Numeric(1) # pylint: disable=unneeded-not
+ assert not _Numeric(2) <= _Numeric(1) # pylint: disable=unneeded-not
+ assert _Numeric(2) > _Numeric(1)
+ assert _Numeric(2) >= _Numeric(1)