summaryrefslogtreecommitdiffstats
path: root/test/units/modules
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/modules')
-rw-r--r--test/units/modules/__init__.py0
-rw-r--r--test/units/modules/conftest.py31
-rw-r--r--test/units/modules/test_apt.py53
-rw-r--r--test/units/modules/test_apt_key.py32
-rw-r--r--test/units/modules/test_async_wrapper.py58
-rw-r--r--test/units/modules/test_copy.py215
-rw-r--r--test/units/modules/test_hostname.py147
-rw-r--r--test/units/modules/test_iptables.py1192
-rw-r--r--test/units/modules/test_known_hosts.py110
-rw-r--r--test/units/modules/test_pip.py40
-rw-r--r--test/units/modules/test_service.py70
-rw-r--r--test/units/modules/test_service_facts.py126
-rw-r--r--test/units/modules/test_systemd.py52
-rw-r--r--test/units/modules/test_unarchive.py93
-rw-r--r--test/units/modules/test_yum.py222
-rw-r--r--test/units/modules/utils.py50
16 files changed, 2491 insertions, 0 deletions
diff --git a/test/units/modules/__init__.py b/test/units/modules/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/modules/__init__.py
diff --git a/test/units/modules/conftest.py b/test/units/modules/conftest.py
new file mode 100644
index 0000000..a7d1e04
--- /dev/null
+++ b/test/units/modules/conftest.py
@@ -0,0 +1,31 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.module_utils.six import string_types
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils.common._collections_compat import MutableMapping
+
+
+@pytest.fixture
+def patch_ansible_module(request, mocker):
+ if isinstance(request.param, string_types):
+ args = request.param
+ elif isinstance(request.param, MutableMapping):
+ if 'ANSIBLE_MODULE_ARGS' not in request.param:
+ request.param = {'ANSIBLE_MODULE_ARGS': request.param}
+ if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
+ request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
+ if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
+ request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
+ args = json.dumps(request.param)
+ else:
+ raise Exception('Malformed data to the patch_ansible_module pytest fixture')
+
+ mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))
diff --git a/test/units/modules/test_apt.py b/test/units/modules/test_apt.py
new file mode 100644
index 0000000..20e056f
--- /dev/null
+++ b/test/units/modules/test_apt.py
@@ -0,0 +1,53 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import collections
+import sys
+
+from units.compat.mock import Mock
+from units.compat import unittest
+
+try:
+ from ansible.modules.apt import (
+ expand_pkgspec_from_fnmatches,
+ )
+except Exception:
+ # Need some more module_utils work (porting urls.py) before we can test
+ # modules. So don't error out in this case.
+ if sys.version_info[0] >= 3:
+ pass
+
+
+class AptExpandPkgspecTestCase(unittest.TestCase):
+
+ def setUp(self):
+ FakePackage = collections.namedtuple("Package", ("name",))
+ self.fake_cache = [
+ FakePackage("apt"),
+ FakePackage("apt-utils"),
+ FakePackage("not-selected"),
+ ]
+
+ def test_trivial(self):
+ foo = ["apt"]
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo)
+
+ def test_version_wildcard(self):
+ foo = ["apt=1.0*"]
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo)
+
+ def test_pkgname_wildcard_version_wildcard(self):
+ foo = ["apt*=1.0*"]
+ m_mock = Mock()
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache),
+ ['apt', 'apt-utils'])
+
+ def test_pkgname_expands(self):
+ foo = ["apt*"]
+ m_mock = Mock()
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache),
+ ["apt", "apt-utils"])
diff --git a/test/units/modules/test_apt_key.py b/test/units/modules/test_apt_key.py
new file mode 100644
index 0000000..37cd53b
--- /dev/null
+++ b/test/units/modules/test_apt_key.py
@@ -0,0 +1,32 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+
+from units.compat.mock import patch, Mock
+from units.compat import unittest
+
+from ansible.modules import apt_key
+
+
+def returnc(x):
+ return 'C'
+
+
+class AptKeyTestCase(unittest.TestCase):
+
+ @patch.object(apt_key, 'apt_key_bin', '/usr/bin/apt-key')
+ @patch.object(apt_key, 'lang_env', returnc)
+ @patch.dict(os.environ, {'HTTP_PROXY': 'proxy.example.com'})
+ def test_import_key_with_http_proxy(self):
+ m_mock = Mock()
+ m_mock.run_command.return_value = (0, '', '')
+ apt_key.import_key(
+ m_mock, keyring=None, keyserver='keyserver.example.com',
+ key_id='0xDEADBEEF')
+ self.assertEqual(
+ m_mock.run_command.call_args_list[0][0][0],
+ '/usr/bin/apt-key adv --no-tty --keyserver keyserver.example.com'
+ ' --keyserver-options http-proxy=proxy.example.com'
+ ' --recv 0xDEADBEEF'
+ )
diff --git a/test/units/modules/test_async_wrapper.py b/test/units/modules/test_async_wrapper.py
new file mode 100644
index 0000000..37b1fda
--- /dev/null
+++ b/test/units/modules/test_async_wrapper.py
@@ -0,0 +1,58 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+import os
+import json
+import shutil
+import tempfile
+
+import pytest
+
+from units.compat.mock import patch, MagicMock
+from ansible.modules import async_wrapper
+
+from pprint import pprint
+
+
+class TestAsyncWrapper:
+
+ def test_run_module(self, monkeypatch):
+
+ def mock_get_interpreter(module_path):
+ return ['/usr/bin/python']
+
+ module_result = {'rc': 0}
+ module_lines = [
+ '#!/usr/bin/python',
+ 'import sys',
+ 'sys.stderr.write("stderr stuff")',
+ "print('%s')" % json.dumps(module_result)
+ ]
+ module_data = '\n'.join(module_lines) + '\n'
+ module_data = module_data.encode('utf-8')
+
+ workdir = tempfile.mkdtemp()
+ fh, fn = tempfile.mkstemp(dir=workdir)
+
+ with open(fn, 'wb') as f:
+ f.write(module_data)
+
+ command = fn
+ jobid = 0
+ job_path = os.path.join(os.path.dirname(command), 'job')
+
+ monkeypatch.setattr(async_wrapper, '_get_interpreter', mock_get_interpreter)
+ monkeypatch.setattr(async_wrapper, 'job_path', job_path)
+
+ res = async_wrapper._run_module(command, jobid)
+
+ with open(os.path.join(workdir, 'job'), 'r') as f:
+ jres = json.loads(f.read())
+
+ shutil.rmtree(workdir)
+
+ assert jres.get('rc') == 0
+ assert jres.get('stderr') == 'stderr stuff'
diff --git a/test/units/modules/test_copy.py b/test/units/modules/test_copy.py
new file mode 100644
index 0000000..20c309b
--- /dev/null
+++ b/test/units/modules/test_copy.py
@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+# Copyright:
+# (c) 2018 Ansible Project
+# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible.modules.copy import AnsibleModuleError, split_pre_existing_dir
+
+from ansible.module_utils.basic import AnsibleModule
+
+
+THREE_DIRS_DATA = (('/dir1/dir2',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1', 'dir2']),
+ # 2 existing dirs:
+ ('/dir1', ['dir2']),
+ # 3 existing dirs:
+ ('/dir1/dir2', [])
+ ),
+ ('/dir1/dir2/',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1', 'dir2']),
+ # 2 existing dirs:
+ ('/dir1', ['dir2']),
+ # 3 existing dirs:
+ ('/dir1/dir2', [])
+ ),
+ )
+
+
+TWO_DIRS_DATA = (('dir1/dir2',
+ # 0 existing dirs:
+ ('.', ['dir1', 'dir2']),
+ # 1 existing dir:
+ ('dir1', ['dir2']),
+ # 2 existing dirs:
+ ('dir1/dir2', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ('dir1/dir2/',
+ # 0 existing dirs:
+ ('.', ['dir1', 'dir2']),
+ # 1 existing dir:
+ ('dir1', ['dir2']),
+ # 2 existing dirs:
+ ('dir1/dir2', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ('/dir1',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1']),
+ # 2 existing dirs:
+ ('/dir1', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ('/dir1/',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1']),
+ # 2 existing dirs:
+ ('/dir1', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ) + THREE_DIRS_DATA
+
+
+ONE_DIR_DATA = (('dir1',
+ # 0 existing dirs:
+ ('.', ['dir1']),
+ # 1 existing dir:
+ ('dir1', []),
+ # 2 existing dirs: Same as 1 because we never get to the third
+ ),
+ ('dir1/',
+ # 0 existing dirs:
+ ('.', ['dir1']),
+ # 1 existing dir:
+ ('dir1', []),
+ # 2 existing dirs: Same as 1 because we never get to the third
+ ),
+ ) + TWO_DIRS_DATA
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[4]) for d in THREE_DIRS_DATA))
+def test_split_pre_existing_dir_three_levels_exist(directory, expected, mocker):
+ mocker.patch('os.path.exists', side_effect=[True, True, True])
+ split_pre_existing_dir(directory) == expected
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[3]) for d in TWO_DIRS_DATA))
+def test_split_pre_existing_dir_two_levels_exist(directory, expected, mocker):
+ mocker.patch('os.path.exists', side_effect=[True, True, False])
+ split_pre_existing_dir(directory) == expected
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[2]) for d in ONE_DIR_DATA))
+def test_split_pre_existing_dir_one_level_exists(directory, expected, mocker):
+ mocker.patch('os.path.exists', side_effect=[True, False, False])
+ split_pre_existing_dir(directory) == expected
+
+
+@pytest.mark.parametrize('directory', (d[0] for d in ONE_DIR_DATA if d[1] is None))
+def test_split_pre_existing_dir_root_does_not_exist(directory, mocker):
+ mocker.patch('os.path.exists', return_value=False)
+ with pytest.raises(AnsibleModuleError) as excinfo:
+ split_pre_existing_dir(directory)
+ assert excinfo.value.results['msg'].startswith("The '/' directory doesn't exist on this machine.")
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[1]) for d in ONE_DIR_DATA if not d[0].startswith('/')))
+def test_split_pre_existing_dir_working_dir_exists(directory, expected, mocker):
+ mocker.patch('os.path.exists', return_value=False)
+ split_pre_existing_dir(directory) == expected
+
+
+#
+# Info helpful for making new test cases:
+#
+# base_mode = {'dir no perms': 0o040000,
+# 'file no perms': 0o100000,
+# 'dir all perms': 0o400000 | 0o777,
+# 'file all perms': 0o100000, | 0o777}
+#
+# perm_bits = {'x': 0b001,
+# 'w': 0b010,
+# 'r': 0b100}
+#
+# role_shift = {'u': 6,
+# 'g': 3,
+# 'o': 0}
+
+DATA = ( # Going from no permissions to setting all for user, group, and/or other
+ (0o040000, u'a+rwx', 0o0777),
+ (0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777),
+ (0o040000, u'o+rwx', 0o0007),
+ (0o040000, u'g+rwx', 0o0070),
+ (0o040000, u'u+rwx', 0o0700),
+
+ # Going from all permissions to none for user, group, and/or other
+ (0o040777, u'a-rwx', 0o0000),
+ (0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000),
+ (0o040777, u'o-rwx', 0o0770),
+ (0o040777, u'g-rwx', 0o0707),
+ (0o040777, u'u-rwx', 0o0077),
+
+ # now using absolute assignment from None to a set of perms
+ (0o040000, u'a=rwx', 0o0777),
+ (0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777),
+ (0o040000, u'o=rwx', 0o0007),
+ (0o040000, u'g=rwx', 0o0070),
+ (0o040000, u'u=rwx', 0o0700),
+
+ # X effect on files and dirs
+ (0o040000, u'a+X', 0o0111),
+ (0o100000, u'a+X', 0),
+ (0o040000, u'a=X', 0o0111),
+ (0o100000, u'a=X', 0),
+ (0o040777, u'a-X', 0o0666),
+ # Same as chmod but is it a bug?
+ # chmod a-X statfile <== removes execute from statfile
+ (0o100777, u'a-X', 0o0666),
+
+ # Multiple permissions
+ (0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755),
+ (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644),
+)
+
+UMASK_DATA = (
+ (0o100000, '+rwx', 0o770),
+ (0o100777, '-rwx', 0o007),
+)
+
+INVALID_DATA = (
+ (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"),
+ (0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"),
+)
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', DATA)
+def test_good_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA)
+def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_umask = mocker.patch('os.umask')
+ mock_umask.return_value = 0o7
+
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA)
+def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+ with pytest.raises(ValueError) as exc:
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah'
+ assert exc.match(expected)
diff --git a/test/units/modules/test_hostname.py b/test/units/modules/test_hostname.py
new file mode 100644
index 0000000..9050fd0
--- /dev/null
+++ b/test/units/modules/test_hostname.py
@@ -0,0 +1,147 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import shutil
+import tempfile
+
+from units.compat.mock import patch, MagicMock, mock_open
+from ansible.module_utils import basic
+from ansible.module_utils.common._utils import get_all_subclasses
+from ansible.modules import hostname
+from units.modules.utils import ModuleTestCase, set_module_args
+from ansible.module_utils.six import PY2
+
+
+class TestHostname(ModuleTestCase):
+ @patch('os.path.isfile')
+ def test_stategy_get_never_writes_in_check_mode(self, isfile):
+ isfile.return_value = True
+
+ set_module_args({'name': 'fooname', '_ansible_check_mode': True})
+ subclasses = get_all_subclasses(hostname.BaseStrategy)
+ module = MagicMock()
+ for cls in subclasses:
+ instance = cls(module)
+
+ instance.module.run_command = MagicMock()
+ instance.module.run_command.return_value = (0, '', '')
+
+ m = mock_open()
+ builtins = 'builtins'
+ if PY2:
+ builtins = '__builtin__'
+ with patch('%s.open' % builtins, m):
+ instance.get_permanent_hostname()
+ instance.get_current_hostname()
+ self.assertFalse(
+ m.return_value.write.called,
+ msg='%s called write, should not have' % str(cls))
+
+ def test_all_named_strategies_exist(self):
+ """Loop through the STRATS and see if anything is missing."""
+ for _name, prefix in hostname.STRATS.items():
+ classname = "%sStrategy" % prefix
+ cls = getattr(hostname, classname, None)
+
+ if cls is None:
+ self.assertFalse(
+ cls is None, "%s is None, should be a subclass" % classname
+ )
+ else:
+ self.assertTrue(issubclass(cls, hostname.BaseStrategy))
+
+
+class TestRedhatStrategy(ModuleTestCase):
+ def setUp(self):
+ super(TestRedhatStrategy, self).setUp()
+ self.testdir = tempfile.mkdtemp(prefix='ansible-test-hostname-')
+ self.network_file = os.path.join(self.testdir, "network")
+
+ def tearDown(self):
+ super(TestRedhatStrategy, self).tearDown()
+ shutil.rmtree(self.testdir, ignore_errors=True)
+
+ @property
+ def instance(self):
+ self.module = MagicMock()
+ instance = hostname.RedHatStrategy(self.module)
+ instance.NETWORK_FILE = self.network_file
+ return instance
+
+ def test_get_permanent_hostname_missing(self):
+ self.assertIsNone(self.instance.get_permanent_hostname())
+ self.assertTrue(self.module.fail_json.called)
+ self.module.fail_json.assert_called_with(
+ "Unable to locate HOSTNAME entry in %s" % self.network_file
+ )
+
+ def test_get_permanent_hostname_line_missing(self):
+ with open(self.network_file, "w") as f:
+ f.write("# some other content\n")
+ self.assertIsNone(self.instance.get_permanent_hostname())
+ self.module.fail_json.assert_called_with(
+ "Unable to locate HOSTNAME entry in %s" % self.network_file
+ )
+
+ def test_get_permanent_hostname_existing(self):
+ with open(self.network_file, "w") as f:
+ f.write(
+ "some other content\n"
+ "HOSTNAME=foobar\n"
+ "more content\n"
+ )
+ self.assertEqual(self.instance.get_permanent_hostname(), "foobar")
+
+ def test_get_permanent_hostname_existing_whitespace(self):
+ with open(self.network_file, "w") as f:
+ f.write(
+ "some other content\n"
+ " HOSTNAME=foobar \n"
+ "more content\n"
+ )
+ self.assertEqual(self.instance.get_permanent_hostname(), "foobar")
+
+ def test_set_permanent_hostname_missing(self):
+ self.instance.set_permanent_hostname("foobar")
+ with open(self.network_file) as f:
+ self.assertEqual(f.read(), "HOSTNAME=foobar\n")
+
+ def test_set_permanent_hostname_line_missing(self):
+ with open(self.network_file, "w") as f:
+ f.write("# some other content\n")
+ self.instance.set_permanent_hostname("foobar")
+ with open(self.network_file) as f:
+ self.assertEqual(f.read(), "# some other content\nHOSTNAME=foobar\n")
+
+ def test_set_permanent_hostname_existing(self):
+ with open(self.network_file, "w") as f:
+ f.write(
+ "some other content\n"
+ "HOSTNAME=spam\n"
+ "more content\n"
+ )
+ self.instance.set_permanent_hostname("foobar")
+ with open(self.network_file) as f:
+ self.assertEqual(
+ f.read(),
+ "some other content\n"
+ "HOSTNAME=foobar\n"
+ "more content\n"
+ )
+
+ def test_set_permanent_hostname_existing_whitespace(self):
+ with open(self.network_file, "w") as f:
+ f.write(
+ "some other content\n"
+ " HOSTNAME=spam \n"
+ "more content\n"
+ )
+ self.instance.set_permanent_hostname("foobar")
+ with open(self.network_file) as f:
+ self.assertEqual(
+ f.read(),
+ "some other content\n"
+ "HOSTNAME=foobar\n"
+ "more content\n"
+ )
diff --git a/test/units/modules/test_iptables.py b/test/units/modules/test_iptables.py
new file mode 100644
index 0000000..265e770
--- /dev/null
+++ b/test/units/modules/test_iptables.py
@@ -0,0 +1,1192 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat.mock import patch
+from ansible.module_utils import basic
+from ansible.modules import iptables
+from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
+
+
+def get_bin_path(*args, **kwargs):
+ return "/sbin/iptables"
+
+
+def get_iptables_version(iptables_path, module):
+ return "1.8.2"
+
+
+class TestIptables(ModuleTestCase):
+
+ def setUp(self):
+ super(TestIptables, self).setUp()
+ self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path)
+ self.mock_get_bin_path.start()
+ self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone'
+ self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version)
+ self.mock_get_iptables_version.start()
+ self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone'
+
+ def test_without_required_parameters(self):
+ """Failure must occurs when all parameters are missing"""
+ with self.assertRaises(AnsibleFailJson):
+ set_module_args({})
+ iptables.main()
+
+ def test_flush_table_without_chain(self):
+ """Test flush without chain, flush the table"""
+ set_module_args({
+ 'flush': True,
+ })
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.return_value = 0, '', '' # successful execution, no output
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables')
+ self.assertEqual(run_command.call_args[0][0][1], '-t')
+ self.assertEqual(run_command.call_args[0][0][2], 'filter')
+ self.assertEqual(run_command.call_args[0][0][3], '-F')
+
+ def test_flush_table_check_true(self):
+ """Test flush without parameters and check == true"""
+ set_module_args({
+ 'flush': True,
+ '_ansible_check_mode': True,
+ })
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.return_value = 0, '', '' # successful execution, no output
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 0)
+
+# TODO ADD test flush table nat
+# TODO ADD test flush with chain
+# TODO ADD test flush with chain and table nat
+
+ def test_policy_table(self):
+ """Test change policy of a chain"""
+ set_module_args({
+ 'policy': 'ACCEPT',
+ 'chain': 'INPUT',
+ })
+ commands_results = [
+ (0, 'Chain INPUT (policy DROP)\n', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-L',
+ 'INPUT',
+ ])
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-P',
+ 'INPUT',
+ 'ACCEPT',
+ ])
+
+ def test_policy_table_no_change(self):
+ """Test don't change policy of a chain if the policy is right"""
+ set_module_args({
+ 'policy': 'ACCEPT',
+ 'chain': 'INPUT',
+ })
+ commands_results = [
+ (0, 'Chain INPUT (policy ACCEPT)\n', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertFalse(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-L',
+ 'INPUT',
+ ])
+
+ def test_policy_table_changed_false(self):
+ """Test flush without parameters and change == false"""
+ set_module_args({
+ 'policy': 'ACCEPT',
+ 'chain': 'INPUT',
+ '_ansible_check_mode': True,
+ })
+ commands_results = [
+ (0, 'Chain INPUT (policy DROP)\n', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-L',
+ 'INPUT',
+ ])
+
+# TODO ADD test policy without chain fail
+# TODO ADD test policy with chain don't exists
+# TODO ADD test policy with wrong choice fail
+
+ def test_insert_rule_change_false(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'OUTPUT',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'ACCEPT',
+ 'action': 'insert',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ (0, '', ''), # check_chain_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+
+ def test_insert_rule(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'OUTPUT',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'ACCEPT',
+ 'action': 'insert'
+ })
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ (0, '', ''), # check_chain_present
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 3)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+ self.assertEqual(run_command.call_args_list[2][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-I',
+ 'OUTPUT',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+
+ def test_append_rule_check_mode(self):
+ """Test append a redirection rule in check mode"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'REDIRECT',
+ 'table': 'nat',
+ 'to_destination': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'destination_port': '22',
+ 'to_ports': '8600',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ (0, '', ''), # check_chain_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'REDIRECT',
+ '--to-destination',
+ '5.5.5.5/32',
+ '--destination-port',
+ '22',
+ '--to-ports',
+ '8600'
+ ])
+
+ def test_append_rule(self):
+ """Test append a redirection rule"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'REDIRECT',
+ 'table': 'nat',
+ 'to_destination': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'destination_port': '22',
+ 'to_ports': '8600'
+ })
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ (0, '', ''), # check_chain_present
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 3)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'REDIRECT',
+ '--to-destination',
+ '5.5.5.5/32',
+ '--destination-port',
+ '22',
+ '--to-ports',
+ '8600'
+ ])
+ self.assertEqual(run_command.call_args_list[2][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-A',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'REDIRECT',
+ '--to-destination',
+ '5.5.5.5/32',
+ '--destination-port',
+ '22',
+ '--to-ports',
+ '8600'
+ ])
+
+ def test_remove_rule(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'SNAT',
+ 'table': 'nat',
+ 'to_source': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'source_port': '22',
+ 'to_ports': '8600',
+ 'state': 'absent',
+ 'in_interface': 'eth0',
+ 'out_interface': 'eth1',
+ 'comment': 'this is a comment'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'SNAT',
+ '--to-source',
+ '5.5.5.5/32',
+ '-i',
+ 'eth0',
+ '-o',
+ 'eth1',
+ '--source-port',
+ '22',
+ '--to-ports',
+ '8600',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-D',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'SNAT',
+ '--to-source',
+ '5.5.5.5/32',
+ '-i',
+ 'eth0',
+ '-o',
+ 'eth1',
+ '--source-port',
+ '22',
+ '--to-ports',
+ '8600',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+
+ def test_remove_rule_check_mode(self):
+ """Test flush without parameters check mode"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'SNAT',
+ 'table': 'nat',
+ 'to_source': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'source_port': '22',
+ 'to_ports': '8600',
+ 'state': 'absent',
+ 'in_interface': 'eth0',
+ 'out_interface': 'eth1',
+ 'comment': 'this is a comment',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'SNAT',
+ '--to-source',
+ '5.5.5.5/32',
+ '-i',
+ 'eth0',
+ '-o',
+ 'eth1',
+ '--source-port',
+ '22',
+ '--to-ports',
+ '8600',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+
+ def test_insert_with_reject(self):
+ """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'tcp',
+ 'reject_with': 'tcp-reset',
+ 'ip_version': 'ipv4',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-p',
+ 'tcp',
+ '-j',
+ 'REJECT',
+ '--reject-with',
+ 'tcp-reset',
+ ])
+
+ def test_insert_jump_reject_with_reject(self):
+ """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'tcp',
+ 'jump': 'REJECT',
+ 'reject_with': 'tcp-reset',
+ 'ip_version': 'ipv4',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-p',
+ 'tcp',
+ '-j',
+ 'REJECT',
+ '--reject-with',
+ 'tcp-reset',
+ ])
+
+ def test_jump_tee_gateway_negative(self):
+ """ Missing gateway when JUMP is set to TEE """
+ set_module_args({
+ 'table': 'mangle',
+ 'chain': 'PREROUTING',
+ 'in_interface': 'eth0',
+ 'protocol': 'udp',
+ 'match': 'state',
+ 'jump': 'TEE',
+ 'ctstate': ['NEW'],
+ 'destination_port': '9521',
+ 'destination': '127.0.0.1'
+ })
+
+ with self.assertRaises(AnsibleFailJson) as e:
+ iptables.main()
+ self.assertTrue(e.exception.args[0]['failed'])
+ self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway')
+
+ def test_jump_tee_gateway(self):
+ """ Using gateway when JUMP is set to TEE """
+ set_module_args({
+ 'table': 'mangle',
+ 'chain': 'PREROUTING',
+ 'in_interface': 'eth0',
+ 'protocol': 'udp',
+ 'match': 'state',
+ 'jump': 'TEE',
+ 'ctstate': ['NEW'],
+ 'destination_port': '9521',
+ 'gateway': '192.168.10.1',
+ 'destination': '127.0.0.1'
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'mangle',
+ '-C', 'PREROUTING',
+ '-p', 'udp',
+ '-d', '127.0.0.1',
+ '-m', 'state',
+ '-j', 'TEE',
+ '--gateway', '192.168.10.1',
+ '-i', 'eth0',
+ '--destination-port', '9521',
+ '--state', 'NEW'
+ ])
+
+ def test_tcp_flags(self):
+ """ Test various ways of inputting tcp_flags """
+ args = [
+ {
+ 'chain': 'OUTPUT',
+ 'protocol': 'tcp',
+ 'jump': 'DROP',
+ 'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"'
+ },
+ {
+ 'chain': 'OUTPUT',
+ 'protocol': 'tcp',
+ 'jump': 'DROP',
+ 'tcp_flags': {
+ 'flags': 'ALL',
+ 'flags_set': 'ACK,RST,SYN,FIN'
+ }
+ },
+ {
+ 'chain': 'OUTPUT',
+ 'protocol': 'tcp',
+ 'jump': 'DROP',
+ 'tcp_flags': {
+ 'flags': ['ALL'],
+ 'flags_set': ['ACK', 'RST', 'SYN', 'FIN']
+ }
+ },
+
+ ]
+
+ for item in args:
+ set_module_args(item)
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-p',
+ 'tcp',
+ '--tcp-flags',
+ 'ALL',
+ 'ACK,RST,SYN,FIN',
+ '-j',
+ 'DROP'
+ ])
+
+ def test_log_level(self):
+ """ Test various ways of log level flag """
+
+ log_levels = ['0', '1', '2', '3', '4', '5', '6', '7',
+ 'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug']
+
+ for log_lvl in log_levels:
+ set_module_args({
+ 'chain': 'INPUT',
+ 'jump': 'LOG',
+ 'log_level': log_lvl,
+ 'source': '1.2.3.4/32',
+ 'log_prefix': '** DROP-this_ip **'
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'INPUT',
+ '-s', '1.2.3.4/32',
+ '-j', 'LOG',
+ '--log-prefix', '** DROP-this_ip **',
+ '--log-level', log_lvl
+ ])
+
+ def test_iprange(self):
+ """ Test iprange module with its flags src_range and dst_range """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'match': ['iprange'],
+ 'src_range': '192.168.1.100-192.168.1.199',
+ 'jump': 'ACCEPT'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-m',
+ 'iprange',
+ '-j',
+ 'ACCEPT',
+ '--src-range',
+ '192.168.1.100-192.168.1.199',
+ ])
+
+ set_module_args({
+ 'chain': 'INPUT',
+ 'src_range': '192.168.1.100-192.168.1.199',
+ 'dst_range': '10.0.0.50-10.0.0.100',
+ 'jump': 'ACCEPT'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-j',
+ 'ACCEPT',
+ '-m',
+ 'iprange',
+ '--src-range',
+ '192.168.1.100-192.168.1.199',
+ '--dst-range',
+ '10.0.0.50-10.0.0.100'
+ ])
+
+ set_module_args({
+ 'chain': 'INPUT',
+ 'dst_range': '10.0.0.50-10.0.0.100',
+ 'jump': 'ACCEPT'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-j',
+ 'ACCEPT',
+ '-m',
+ 'iprange',
+ '--dst-range',
+ '10.0.0.50-10.0.0.100'
+ ])
+
+ def test_insert_rule_with_wait(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'OUTPUT',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'ACCEPT',
+ 'action': 'insert',
+ 'wait': '10'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-w',
+ '10',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+
+ def test_comment_position_at_end(self):
+ """Test comment position to make sure it is at the end of command"""
+ set_module_args({
+ 'chain': 'INPUT',
+ 'jump': 'ACCEPT',
+ 'action': 'insert',
+ 'ctstate': ['NEW'],
+ 'comment': 'this is a comment',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-j',
+ 'ACCEPT',
+ '-m',
+ 'conntrack',
+ '--ctstate',
+ 'NEW',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+ self.assertEqual(run_command.call_args[0][0][14], 'this is a comment')
+
+ def test_destination_ports(self):
+ """ Test multiport module usage with multiple ports """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'tcp',
+ 'in_interface': 'eth0',
+ 'source': '192.168.0.1/32',
+ 'destination_ports': ['80', '443', '8081:8085'],
+ 'jump': 'ACCEPT',
+ 'comment': 'this is a comment',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'INPUT',
+ '-p', 'tcp',
+ '-s', '192.168.0.1/32',
+ '-j', 'ACCEPT',
+ '-m', 'multiport',
+ '--dports', '80,443,8081:8085',
+ '-i', 'eth0',
+ '-m', 'comment',
+ '--comment', 'this is a comment'
+ ])
+
+ def test_match_set(self):
+ """ Test match_set together with match_set_flags """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'tcp',
+ 'match_set': 'admin_hosts',
+ 'match_set_flags': 'src',
+ 'destination_port': '22',
+ 'jump': 'ACCEPT',
+ 'comment': 'this is a comment',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'INPUT',
+ '-p', 'tcp',
+ '-j', 'ACCEPT',
+ '--destination-port', '22',
+ '-m', 'set',
+ '--match-set', 'admin_hosts', 'src',
+ '-m', 'comment',
+ '--comment', 'this is a comment'
+ ])
+
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'udp',
+ 'match_set': 'banned_hosts',
+ 'match_set_flags': 'src,dst',
+ 'jump': 'REJECT',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'INPUT',
+ '-p', 'udp',
+ '-j', 'REJECT',
+ '-m', 'set',
+ '--match-set', 'banned_hosts', 'src,dst'
+ ])
+
+ def test_chain_creation(self):
+ """Test chain creation when absent"""
+ set_module_args({
+ 'chain': 'FOOBAR',
+ 'state': 'present',
+ 'chain_management': True,
+ })
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ (1, '', ''), # check_chain_present
+ (0, '', ''), # create_chain
+ (0, '', ''), # append_rule
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 4)
+
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'FOOBAR',
+ ])
+
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-L', 'FOOBAR',
+ ])
+
+ self.assertEqual(run_command.call_args_list[2][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-N', 'FOOBAR',
+ ])
+
+ self.assertEqual(run_command.call_args_list[3][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-A', 'FOOBAR',
+ ])
+
+ commands_results = [
+ (0, '', ''), # check_rule_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertFalse(result.exception.args[0]['changed'])
+
+ def test_chain_creation_check_mode(self):
+ """Test chain creation when absent"""
+ set_module_args({
+ 'chain': 'FOOBAR',
+ 'state': 'present',
+ 'chain_management': True,
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ (1, '', ''), # check_chain_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'FOOBAR',
+ ])
+
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-L', 'FOOBAR',
+ ])
+
+ commands_results = [
+ (0, '', ''), # check_rule_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertFalse(result.exception.args[0]['changed'])
+
+ def test_chain_deletion(self):
+ """Test chain deletion when present"""
+ set_module_args({
+ 'chain': 'FOOBAR',
+ 'state': 'absent',
+ 'chain_management': True,
+ })
+
+ commands_results = [
+ (0, '', ''), # check_chain_present
+ (0, '', ''), # delete_chain
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-L', 'FOOBAR',
+ ])
+
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-X', 'FOOBAR',
+ ])
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertFalse(result.exception.args[0]['changed'])
+
+ def test_chain_deletion_check_mode(self):
+ """Test chain deletion when present"""
+ set_module_args({
+ 'chain': 'FOOBAR',
+ 'state': 'absent',
+ 'chain_management': True,
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (0, '', ''), # check_chain_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-L', 'FOOBAR',
+ ])
+
+ commands_results = [
+ (1, '', ''), # check_rule_present
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertFalse(result.exception.args[0]['changed'])
diff --git a/test/units/modules/test_known_hosts.py b/test/units/modules/test_known_hosts.py
new file mode 100644
index 0000000..123dd75
--- /dev/null
+++ b/test/units/modules/test_known_hosts.py
@@ -0,0 +1,110 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import tempfile
+from ansible.module_utils import basic
+
+from units.compat import unittest
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils.basic import AnsibleModule
+
+from ansible.modules.known_hosts import compute_diff, sanity_check
+
+
+class KnownHostsDiffTestCase(unittest.TestCase):
+
+ def _create_file(self, content):
+ tmp_file = tempfile.NamedTemporaryFile(prefix='ansible-test-', suffix='-known_hosts', delete=False)
+ tmp_file.write(to_bytes(content))
+ tmp_file.close()
+ self.addCleanup(os.unlink, tmp_file.name)
+ return tmp_file.name
+
+ def test_no_existing_file(self):
+ path = "/tmp/this_file_does_not_exists_known_hosts"
+ key = 'example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': '/dev/null',
+ 'after_header': path,
+ 'before': '',
+ 'after': 'example.com ssh-rsa AAAAetc\n',
+ })
+
+ def test_key_addition(self):
+ path = self._create_file(
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'two.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n',
+ })
+
+ def test_no_change(self):
+ path = self._create_file(
+ 'one.example.com ssh-rsa AAAAetc\n'
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=1, replace_or_add=False, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ 'after': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ })
+
+ def test_key_change(self):
+ path = self._create_file(
+ 'one.example.com ssh-rsa AAAaetc\n'
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=1, replace_or_add=True, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'one.example.com ssh-rsa AAAaetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n',
+ })
+
+ def test_key_removal(self):
+ path = self._create_file(
+ 'one.example.com ssh-rsa AAAAetc\n'
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=1, replace_or_add=False, state='absent', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\n',
+ })
+
+ def test_key_removal_no_change(self):
+ path = self._create_file(
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=None, replace_or_add=False, state='absent', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'two.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\n',
+ })
+
+ def test_sanity_check(self):
+ basic._load_params = lambda: {}
+ # Module used internally to execute ssh-keygen system executable
+ module = AnsibleModule(argument_spec={})
+ host = '10.0.0.1'
+ key = '%s ssh-rsa ASDF foo@bar' % (host,)
+ keygen = module.get_bin_path('ssh-keygen')
+ sanity_check(module, host, key, keygen)
diff --git a/test/units/modules/test_pip.py b/test/units/modules/test_pip.py
new file mode 100644
index 0000000..5640b80
--- /dev/null
+++ b/test/units/modules/test_pip.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.modules import pip
+
+
+pytestmark = pytest.mark.usefixtures('patch_ansible_module')
+
+
+@pytest.mark.parametrize('patch_ansible_module', [{'name': 'six'}], indirect=['patch_ansible_module'])
+def test_failure_when_pip_absent(mocker, capfd):
+ mocker.patch('ansible.modules.pip._have_pip_module').return_value = False
+
+ get_bin_path = mocker.patch('ansible.module_utils.basic.AnsibleModule.get_bin_path')
+ get_bin_path.return_value = None
+
+ with pytest.raises(SystemExit):
+ pip.main()
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+ assert results['failed']
+ assert 'pip needs to be installed' in results['msg']
+
+
+@pytest.mark.parametrize('patch_ansible_module, test_input, expected', [
+ [None, ['django>1.11.1', '<1.11.2', 'ipaddress', 'simpleproject<2.0.0', '>1.1.0'],
+ ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']],
+ [None, ['django>1.11.1,<1.11.2,ipaddress', 'simpleproject<2.0.0,>1.1.0'],
+ ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']],
+ [None, ['django>1.11.1', '<1.11.2', 'ipaddress,simpleproject<2.0.0,>1.1.0'],
+ ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']]])
+def test_recover_package_name(test_input, expected):
+ assert pip._recover_package_name(test_input) == expected
diff --git a/test/units/modules/test_service.py b/test/units/modules/test_service.py
new file mode 100644
index 0000000..caabd74
--- /dev/null
+++ b/test/units/modules/test_service.py
@@ -0,0 +1,70 @@
+# Copyright: (c) 2021, Ansible Project
+# Copyright: (c) 2021, Abhijeet Kasurde <akasurde@redhat.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+import json
+import platform
+
+import pytest
+from ansible.modules import service
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.six import PY2
+from units.modules.utils import set_module_args
+
+
+def mocker_sunos_service(mocker):
+ """
+ Configure common mocker object for SunOSService
+ """
+ platform_system = mocker.patch.object(platform, "system")
+ platform_system.return_value = "SunOS"
+
+ get_bin_path = mocker.patch.object(AnsibleModule, "get_bin_path")
+ get_bin_path.return_value = "/usr/bin/svcs"
+
+ # Read a mocked /etc/release file
+ mocked_etc_release_data = mocker.mock_open(
+ read_data=" Oracle Solaris 12.0")
+ builtin_open = "__builtin__.open" if PY2 else "builtins.open"
+ mocker.patch(builtin_open, mocked_etc_release_data)
+
+ service_status = mocker.patch.object(
+ service.Service, "modify_service_state")
+ service_status.return_value = (0, "", "")
+
+ get_sunos_svcs_status = mocker.patch.object(
+ service.SunOSService, "get_sunos_svcs_status")
+ get_sunos_svcs_status.return_value = "offline"
+ get_service_status = mocker.patch.object(
+ service.Service, "get_service_status")
+ get_service_status.return_value = ""
+
+ mocker.patch('ansible.module_utils.common.sys_info.distro.id', return_value='')
+
+
+@pytest.fixture
+def mocked_sunos_service(mocker):
+ mocker_sunos_service(mocker)
+
+
+def test_sunos_service_start(mocked_sunos_service, capfd):
+ """
+ test SunOS Service Start
+ """
+ set_module_args(
+ {
+ "name": "environment",
+ "state": "started",
+ }
+ )
+ with pytest.raises(SystemExit):
+ service.main()
+
+ out, dummy = capfd.readouterr()
+ results = json.loads(out)
+ assert not results.get("failed")
+ assert results["changed"]
diff --git a/test/units/modules/test_service_facts.py b/test/units/modules/test_service_facts.py
new file mode 100644
index 0000000..07f6827
--- /dev/null
+++ b/test/units/modules/test_service_facts.py
@@ -0,0 +1,126 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2020 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from units.compat import unittest
+from units.compat.mock import patch
+
+from ansible.module_utils import basic
+from ansible.modules.service_facts import AIXScanService
+
+
+# AIX # lssrc -a
+LSSRC_OUTPUT = """
+Subsystem Group PID Status
+ sendmail mail 5243302 active
+ syslogd ras 5636528 active
+ portmap portmap 5177768 active
+ snmpd tcpip 5308844 active
+ hostmibd tcpip 5374380 active
+ snmpmibd tcpip 5439918 active
+ aixmibd tcpip 5505456 active
+ nimsh nimclient 5571004 active
+ aso 6029758 active
+ biod nfs 6357464 active
+ nfsd nfs 5701906 active
+ rpc.mountd nfs 6488534 active
+ rpc.statd nfs 7209216 active
+ rpc.lockd nfs 7274988 active
+ qdaemon spooler 6816222 active
+ writesrv spooler 6685150 active
+ clcomd caa 7471600 active
+ sshd ssh 7602674 active
+ pfcdaemon 7012860 active
+ ctrmc rsct 6947312 active
+ IBM.HostRM rsct_rm 14418376 active
+ IBM.ConfigRM rsct_rm 6160674 active
+ IBM.DRM rsct_rm 14680550 active
+ IBM.MgmtDomainRM rsct_rm 14090676 active
+ IBM.ServiceRM rsct_rm 13828542 active
+ cthats cthats 13959668 active
+ cthags cthags 14025054 active
+ IBM.StorageRM rsct_rm 12255706 active
+ inetd tcpip 12517828 active
+ lpd spooler inoperative
+ keyserv keyserv inoperative
+ ypbind yp inoperative
+ gsclvmd inoperative
+ cdromd inoperative
+ ndpd-host tcpip inoperative
+ ndpd-router tcpip inoperative
+ netcd netcd inoperative
+ tftpd tcpip inoperative
+ routed tcpip inoperative
+ mrouted tcpip inoperative
+ rsvpd qos inoperative
+ policyd qos inoperative
+ timed tcpip inoperative
+ iptrace tcpip inoperative
+ dpid2 tcpip inoperative
+ rwhod tcpip inoperative
+ pxed tcpip inoperative
+ binld tcpip inoperative
+ xntpd tcpip inoperative
+ gated tcpip inoperative
+ dhcpcd tcpip inoperative
+ dhcpcd6 tcpip inoperative
+ dhcpsd tcpip inoperative
+ dhcpsdv6 tcpip inoperative
+ dhcprd tcpip inoperative
+ dfpd tcpip inoperative
+ named tcpip inoperative
+ automountd autofs inoperative
+ nfsrgyd nfs inoperative
+ gssd nfs inoperative
+ cpsd ike inoperative
+ tmd ike inoperative
+ isakmpd inoperative
+ ikev2d inoperative
+ iked ike inoperative
+ clconfd caa inoperative
+ ksys_vmmon inoperative
+ nimhttp inoperative
+ IBM.SRVPROXY ibmsrv inoperative
+ ctcas rsct inoperative
+ IBM.ERRM rsct_rm inoperative
+ IBM.AuditRM rsct_rm inoperative
+ isnstgtd isnstgtd inoperative
+ IBM.LPRM rsct_rm inoperative
+ cthagsglsm cthags inoperative
+"""
+
+
+class TestAIXScanService(unittest.TestCase):
+
+ def setUp(self):
+ self.mock1 = patch.object(basic.AnsibleModule, 'get_bin_path', return_value='/usr/sbin/lssrc')
+ self.mock1.start()
+ self.addCleanup(self.mock1.stop)
+ self.mock2 = patch.object(basic.AnsibleModule, 'run_command', return_value=(0, LSSRC_OUTPUT, ''))
+ self.mock2.start()
+ self.addCleanup(self.mock2.stop)
+ self.mock3 = patch('platform.system', return_value='AIX')
+ self.mock3.start()
+ self.addCleanup(self.mock3.stop)
+
+ def test_gather_services(self):
+ svcmod = AIXScanService(basic.AnsibleModule)
+ result = svcmod.gather_services()
+
+ self.assertIsInstance(result, dict)
+
+ self.assertIn('IBM.HostRM', result)
+ self.assertEqual(result['IBM.HostRM'], {
+ 'name': 'IBM.HostRM',
+ 'source': 'src',
+ 'state': 'running',
+ })
+ self.assertIn('IBM.AuditRM', result)
+ self.assertEqual(result['IBM.AuditRM'], {
+ 'name': 'IBM.AuditRM',
+ 'source': 'src',
+ 'state': 'stopped',
+ })
diff --git a/test/units/modules/test_systemd.py b/test/units/modules/test_systemd.py
new file mode 100644
index 0000000..52c212a
--- /dev/null
+++ b/test/units/modules/test_systemd.py
@@ -0,0 +1,52 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat import unittest
+from ansible.modules.systemd import parse_systemctl_show
+
+
+class ParseSystemctlShowTestCase(unittest.TestCase):
+
+ def test_simple(self):
+ lines = [
+ 'Type=simple',
+ 'Restart=no',
+ 'Requires=system.slice sysinit.target',
+ 'Description=Blah blah blah',
+ ]
+ parsed = parse_systemctl_show(lines)
+ self.assertEqual(parsed, {
+ 'Type': 'simple',
+ 'Restart': 'no',
+ 'Requires': 'system.slice sysinit.target',
+ 'Description': 'Blah blah blah',
+ })
+
+ def test_multiline_exec(self):
+ # This was taken from a real service that specified "ExecStart=/bin/echo foo\nbar"
+ lines = [
+ 'Type=simple',
+ 'ExecStart={ path=/bin/echo ; argv[]=/bin/echo foo',
+ 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }',
+ 'Description=blah',
+ ]
+ parsed = parse_systemctl_show(lines)
+ self.assertEqual(parsed, {
+ 'Type': 'simple',
+ 'ExecStart': '{ path=/bin/echo ; argv[]=/bin/echo foo\n'
+ 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }',
+ 'Description': 'blah',
+ })
+
+ def test_single_line_with_brace(self):
+ lines = [
+ 'Type=simple',
+ 'Description={ this is confusing',
+ 'Restart=no',
+ ]
+ parsed = parse_systemctl_show(lines)
+ self.assertEqual(parsed, {
+ 'Type': 'simple',
+ 'Description': '{ this is confusing',
+ 'Restart': 'no',
+ })
diff --git a/test/units/modules/test_unarchive.py b/test/units/modules/test_unarchive.py
new file mode 100644
index 0000000..3e7a58c
--- /dev/null
+++ b/test/units/modules/test_unarchive.py
@@ -0,0 +1,93 @@
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+
+import pytest
+
+from ansible.modules.unarchive import ZipArchive, TgzArchive
+
+
+class AnsibleModuleExit(Exception):
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+
+class ExitJson(AnsibleModuleExit):
+ pass
+
+
+class FailJson(AnsibleModuleExit):
+ pass
+
+
+@pytest.fixture
+def fake_ansible_module():
+ return FakeAnsibleModule()
+
+
+class FakeAnsibleModule:
+ def __init__(self):
+ self.params = {}
+ self.tmpdir = None
+
+ def exit_json(self, *args, **kwargs):
+ raise ExitJson(*args, **kwargs)
+
+ def fail_json(self, *args, **kwargs):
+ raise FailJson(*args, **kwargs)
+
+
+class TestCaseZipArchive:
+ @pytest.mark.parametrize(
+ 'side_effect, expected_reason', (
+ ([ValueError, '/bin/zipinfo'], "Unable to find required 'unzip'"),
+ (ValueError, "Unable to find required 'unzip' or 'zipinfo'"),
+ )
+ )
+ def test_no_zip_zipinfo_binary(self, mocker, fake_ansible_module, side_effect, expected_reason):
+ mocker.patch("ansible.modules.unarchive.get_bin_path", side_effect=side_effect)
+ fake_ansible_module.params = {
+ "extra_opts": "",
+ "exclude": "",
+ "include": "",
+ "io_buffer_size": 65536,
+ }
+
+ z = ZipArchive(
+ src="",
+ b_dest="",
+ file_args="",
+ module=fake_ansible_module,
+ )
+ can_handle, reason = z.can_handle_archive()
+
+ assert can_handle is False
+ assert expected_reason in reason
+ assert z.cmd_path is None
+
+
+class TestCaseTgzArchive:
+ def test_no_tar_binary(self, mocker, fake_ansible_module):
+ mocker.patch("ansible.modules.unarchive.get_bin_path", side_effect=ValueError)
+ fake_ansible_module.params = {
+ "extra_opts": "",
+ "exclude": "",
+ "include": "",
+ "io_buffer_size": 65536,
+ }
+ fake_ansible_module.check_mode = False
+
+ t = TgzArchive(
+ src="",
+ b_dest="",
+ file_args="",
+ module=fake_ansible_module,
+ )
+ can_handle, reason = t.can_handle_archive()
+
+ assert can_handle is False
+ assert 'Unable to find required' in reason
+ assert t.cmd_path is None
+ assert t.tar_type is None
diff --git a/test/units/modules/test_yum.py b/test/units/modules/test_yum.py
new file mode 100644
index 0000000..8052eff
--- /dev/null
+++ b/test/units/modules/test_yum.py
@@ -0,0 +1,222 @@
+# -*- coding: utf-8 -*-
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat import unittest
+
+from ansible.modules.yum import YumModule
+
+
+yum_plugin_load_error = """
+Plugin "product-id" can't be imported
+Plugin "search-disabled-repos" can't be imported
+Plugin "subscription-manager" can't be imported
+Plugin "product-id" can't be imported
+Plugin "search-disabled-repos" can't be imported
+Plugin "subscription-manager" can't be imported
+"""
+
+# from https://github.com/ansible/ansible/issues/20608#issuecomment-276106505
+wrapped_output_1 = """
+Загружены модули: fastestmirror
+Loading mirror speeds from cached hostfile
+ * base: mirror.h1host.ru
+ * extras: mirror.h1host.ru
+ * updates: mirror.h1host.ru
+
+vms-agent.x86_64 0.0-9 dev
+"""
+
+# from https://github.com/ansible/ansible/issues/20608#issuecomment-276971275
+wrapped_output_2 = """
+Загружены модули: fastestmirror
+Loading mirror speeds from cached hostfile
+ * base: mirror.corbina.net
+ * extras: mirror.corbina.net
+ * updates: mirror.corbina.net
+
+empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty.x86_64
+ 0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1-0
+ addons
+libtiff.x86_64 4.0.3-27.el7_3 updates
+"""
+
+# From https://github.com/ansible/ansible/issues/20608#issuecomment-276698431
+wrapped_output_3 = """
+Loaded plugins: fastestmirror, langpacks
+Loading mirror speeds from cached hostfile
+
+ceph.x86_64 1:11.2.0-0.el7 ceph
+ceph-base.x86_64 1:11.2.0-0.el7 ceph
+ceph-common.x86_64 1:11.2.0-0.el7 ceph
+ceph-mds.x86_64 1:11.2.0-0.el7 ceph
+ceph-mon.x86_64 1:11.2.0-0.el7 ceph
+ceph-osd.x86_64 1:11.2.0-0.el7 ceph
+ceph-selinux.x86_64 1:11.2.0-0.el7 ceph
+libcephfs1.x86_64 1:11.0.2-0.el7 ceph
+librados2.x86_64 1:11.2.0-0.el7 ceph
+libradosstriper1.x86_64 1:11.2.0-0.el7 ceph
+librbd1.x86_64 1:11.2.0-0.el7 ceph
+librgw2.x86_64 1:11.2.0-0.el7 ceph
+python-cephfs.x86_64 1:11.2.0-0.el7 ceph
+python-rados.x86_64 1:11.2.0-0.el7 ceph
+python-rbd.x86_64 1:11.2.0-0.el7 ceph
+"""
+
+# from https://github.com/ansible/ansible-modules-core/issues/4318#issuecomment-251416661
+wrapped_output_4 = """
+ipxe-roms-qemu.noarch 20160127-1.git6366fa7a.el7
+ rhelosp-9.0-director-puddle
+quota.x86_64 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z
+quota-nls.noarch 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z
+rdma.noarch 7.2_4.1_rc6-2.el7 rhelosp-rhel-7.2-z
+screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2
+ rhelosp-rhel-7.2-z
+sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle
+sssd-client.x86_64 1.13.0-40.el7_2.12 rhelosp-rhel-7.2-z
+"""
+
+
+# A 'normal-ish' yum check-update output, without any wrapped lines
+unwrapped_output_rhel7 = """
+
+Loaded plugins: etckeeper, product-id, search-disabled-repos, subscription-
+ : manager
+This system is not registered to Red Hat Subscription Management. You can use subscription-manager to register.
+
+NetworkManager-openvpn.x86_64 1:1.2.6-1.el7 epel
+NetworkManager-openvpn-gnome.x86_64 1:1.2.6-1.el7 epel
+cabal-install.x86_64 1.16.1.0-2.el7 epel
+cgit.x86_64 1.1-1.el7 epel
+python34-libs.x86_64 3.4.5-3.el7 epel
+python34-test.x86_64 3.4.5-3.el7 epel
+python34-tkinter.x86_64 3.4.5-3.el7 epel
+python34-tools.x86_64 3.4.5-3.el7 epel
+qgit.x86_64 2.6-4.el7 epel
+rdiff-backup.x86_64 1.2.8-12.el7 epel
+stoken-libs.x86_64 0.91-1.el7 epel
+xlockmore.x86_64 5.49-2.el7 epel
+"""
+
+# Some wrapped obsoletes for prepending to output for testing both
+wrapped_output_rhel7_obsoletes_postfix = """
+Obsoleting Packages
+ddashboard.x86_64 0.2.0.1-1.el7_3 mhlavink-developerdashboard
+ developerdashboard.x86_64 0.1.12.2-1.el7_2 @mhlavink-developerdashboard
+python-bugzilla.noarch 1.2.2-3.el7_2.1 mhlavink-developerdashboard
+ python-bugzilla-develdashboardfixes.noarch
+ 1.2.2-3.el7 @mhlavink-developerdashboard
+python2-futures.noarch 3.0.5-1.el7 epel
+ python-futures.noarch 3.0.3-1.el7 @epel
+python2-pip.noarch 8.1.2-5.el7 epel
+ python-pip.noarch 7.1.0-1.el7 @epel
+python2-pyxdg.noarch 0.25-6.el7 epel
+ pyxdg.noarch 0.25-5.el7 @epel
+python2-simplejson.x86_64 3.10.0-1.el7 epel
+ python-simplejson.x86_64 3.3.3-1.el7 @epel
+Security: kernel-3.10.0-327.28.2.el7.x86_64 is an installed security update
+Security: kernel-3.10.0-327.22.2.el7.x86_64 is the currently running version
+"""
+
+wrapped_output_multiple_empty_lines = """
+Loaded plugins: langpacks, product-id, search-disabled-repos, subscription-manager
+
+This system is not registered with an entitlement server. You can use subscription-manager to register.
+
+
+screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2
+ rhelosp-rhel-7.2-z
+sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle
+"""
+
+longname = """
+Loaded plugins: fastestmirror, priorities, rhnplugin
+This system is receiving updates from RHN Classic or Red Hat Satellite.
+Loading mirror speeds from cached hostfile
+
+xxxxxxxxxxxxxxxxxxxxxxxxxx.noarch
+ 1.16-1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+glibc.x86_64 2.17-157.el7_3.1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"""
+
+
+unwrapped_output_rhel7_obsoletes = unwrapped_output_rhel7 + wrapped_output_rhel7_obsoletes_postfix
+unwrapped_output_rhel7_expected_new_obsoletes_pkgs = [
+ "ddashboard", "python-bugzilla", "python2-futures", "python2-pip",
+ "python2-pyxdg", "python2-simplejson"
+]
+unwrapped_output_rhel7_expected_old_obsoletes_pkgs = [
+ "developerdashboard", "python-bugzilla-develdashboardfixes",
+ "python-futures", "python-pip", "pyxdg", "python-simplejson"
+]
+unwrapped_output_rhel7_expected_updated_pkgs = [
+ "NetworkManager-openvpn", "NetworkManager-openvpn-gnome", "cabal-install",
+ "cgit", "python34-libs", "python34-test", "python34-tkinter",
+ "python34-tools", "qgit", "rdiff-backup", "stoken-libs", "xlockmore"
+]
+
+
+class TestYumUpdateCheckParse(unittest.TestCase):
+ def _assert_expected(self, expected_pkgs, result):
+
+ for expected_pkg in expected_pkgs:
+ self.assertIn(expected_pkg, result)
+ self.assertEqual(len(result), len(expected_pkgs))
+ self.assertIsInstance(result, dict)
+
+ def test_empty_output(self):
+ res, obs = YumModule.parse_check_update("")
+ expected_pkgs = []
+ self._assert_expected(expected_pkgs, res)
+
+ def test_longname(self):
+ res, obs = YumModule.parse_check_update(longname)
+ expected_pkgs = ['xxxxxxxxxxxxxxxxxxxxxxxxxx', 'glibc']
+ self._assert_expected(expected_pkgs, res)
+
+ def test_plugin_load_error(self):
+ res, obs = YumModule.parse_check_update(yum_plugin_load_error)
+ expected_pkgs = []
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_1(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_1)
+ expected_pkgs = ["vms-agent"]
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_2(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_2)
+ expected_pkgs = ["empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty",
+ "libtiff"]
+
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_3(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_3)
+ expected_pkgs = ["ceph", "ceph-base", "ceph-common", "ceph-mds",
+ "ceph-mon", "ceph-osd", "ceph-selinux", "libcephfs1",
+ "librados2", "libradosstriper1", "librbd1", "librgw2",
+ "python-cephfs", "python-rados", "python-rbd"]
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_4(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_4)
+
+ expected_pkgs = ["ipxe-roms-qemu", "quota", "quota-nls", "rdma", "screen",
+ "sos", "sssd-client"]
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_rhel7(self):
+ res, obs = YumModule.parse_check_update(unwrapped_output_rhel7)
+ self._assert_expected(unwrapped_output_rhel7_expected_updated_pkgs, res)
+
+ def test_wrapped_output_rhel7_obsoletes(self):
+ res, obs = YumModule.parse_check_update(unwrapped_output_rhel7_obsoletes)
+ self._assert_expected(
+ unwrapped_output_rhel7_expected_updated_pkgs + unwrapped_output_rhel7_expected_new_obsoletes_pkgs,
+ res
+ )
+ self._assert_expected(unwrapped_output_rhel7_expected_old_obsoletes_pkgs, obs)
+
+ def test_wrapped_output_multiple_empty_lines(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_multiple_empty_lines)
+ self._assert_expected(['screen', 'sos'], res)
diff --git a/test/units/modules/utils.py b/test/units/modules/utils.py
new file mode 100644
index 0000000..6d169e3
--- /dev/null
+++ b/test/units/modules/utils.py
@@ -0,0 +1,50 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+from units.compat import unittest
+from units.compat.mock import patch
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+
+
+def set_module_args(args):
+ if '_ansible_remote_tmp' not in args:
+ args['_ansible_remote_tmp'] = '/tmp'
+ if '_ansible_keep_remote_files' not in args:
+ args['_ansible_keep_remote_files'] = False
+
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args)
+
+
+class AnsibleExitJson(Exception):
+ pass
+
+
+class AnsibleFailJson(Exception):
+ pass
+
+
+def exit_json(*args, **kwargs):
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs):
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class ModuleTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
+ self.mock_module.start()
+ self.mock_sleep = patch('time.sleep')
+ self.mock_sleep.start()
+ set_module_args({})
+ self.addCleanup(self.mock_module.stop)
+ self.addCleanup(self.mock_sleep.stop)