diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /test/units/modules | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream.tar.xz ansible-core-upstream.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/units/modules')
-rw-r--r-- | test/units/modules/__init__.py | 0 | ||||
-rw-r--r-- | test/units/modules/conftest.py | 31 | ||||
-rw-r--r-- | test/units/modules/test_apt.py | 53 | ||||
-rw-r--r-- | test/units/modules/test_apt_key.py | 32 | ||||
-rw-r--r-- | test/units/modules/test_async_wrapper.py | 58 | ||||
-rw-r--r-- | test/units/modules/test_copy.py | 215 | ||||
-rw-r--r-- | test/units/modules/test_hostname.py | 147 | ||||
-rw-r--r-- | test/units/modules/test_iptables.py | 1192 | ||||
-rw-r--r-- | test/units/modules/test_known_hosts.py | 110 | ||||
-rw-r--r-- | test/units/modules/test_pip.py | 40 | ||||
-rw-r--r-- | test/units/modules/test_service.py | 70 | ||||
-rw-r--r-- | test/units/modules/test_service_facts.py | 126 | ||||
-rw-r--r-- | test/units/modules/test_systemd.py | 52 | ||||
-rw-r--r-- | test/units/modules/test_unarchive.py | 93 | ||||
-rw-r--r-- | test/units/modules/test_yum.py | 222 | ||||
-rw-r--r-- | test/units/modules/utils.py | 50 |
16 files changed, 2491 insertions, 0 deletions
diff --git a/test/units/modules/__init__.py b/test/units/modules/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/units/modules/__init__.py diff --git a/test/units/modules/conftest.py b/test/units/modules/conftest.py new file mode 100644 index 0000000..a7d1e04 --- /dev/null +++ b/test/units/modules/conftest.py @@ -0,0 +1,31 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +import pytest + +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_bytes +from ansible.module_utils.common._collections_compat import MutableMapping + + +@pytest.fixture +def patch_ansible_module(request, mocker): + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if 'ANSIBLE_MODULE_ARGS' not in request.param: + request.param = {'ANSIBLE_MODULE_ARGS': request.param} + if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']: + request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp' + if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']: + request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False + args = json.dumps(request.param) + else: + raise Exception('Malformed data to the patch_ansible_module pytest fixture') + + mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args)) diff --git a/test/units/modules/test_apt.py b/test/units/modules/test_apt.py new file mode 100644 index 0000000..20e056f --- /dev/null +++ b/test/units/modules/test_apt.py @@ -0,0 +1,53 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import collections +import sys + +from units.compat.mock import Mock +from units.compat import unittest + +try: + from ansible.modules.apt import ( + expand_pkgspec_from_fnmatches, + ) +except Exception: + # Need some more module_utils work (porting urls.py) before we can test + # modules. So don't error out in this case. + if sys.version_info[0] >= 3: + pass + + +class AptExpandPkgspecTestCase(unittest.TestCase): + + def setUp(self): + FakePackage = collections.namedtuple("Package", ("name",)) + self.fake_cache = [ + FakePackage("apt"), + FakePackage("apt-utils"), + FakePackage("not-selected"), + ] + + def test_trivial(self): + foo = ["apt"] + self.assertEqual( + expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) + + def test_version_wildcard(self): + foo = ["apt=1.0*"] + self.assertEqual( + expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) + + def test_pkgname_wildcard_version_wildcard(self): + foo = ["apt*=1.0*"] + m_mock = Mock() + self.assertEqual( + expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), + ['apt', 'apt-utils']) + + def test_pkgname_expands(self): + foo = ["apt*"] + m_mock = Mock() + self.assertEqual( + expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), + ["apt", "apt-utils"]) diff --git a/test/units/modules/test_apt_key.py b/test/units/modules/test_apt_key.py new file mode 100644 index 0000000..37cd53b --- /dev/null +++ b/test/units/modules/test_apt_key.py @@ -0,0 +1,32 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from units.compat.mock import patch, Mock +from units.compat import unittest + +from ansible.modules import apt_key + + +def returnc(x): + return 'C' + + +class AptKeyTestCase(unittest.TestCase): + + @patch.object(apt_key, 'apt_key_bin', '/usr/bin/apt-key') + @patch.object(apt_key, 'lang_env', returnc) + @patch.dict(os.environ, {'HTTP_PROXY': 'proxy.example.com'}) + def test_import_key_with_http_proxy(self): + m_mock = Mock() + m_mock.run_command.return_value = (0, '', '') + apt_key.import_key( + m_mock, keyring=None, keyserver='keyserver.example.com', + key_id='0xDEADBEEF') + self.assertEqual( + m_mock.run_command.call_args_list[0][0][0], + '/usr/bin/apt-key adv --no-tty --keyserver keyserver.example.com' + ' --keyserver-options http-proxy=proxy.example.com' + ' --recv 0xDEADBEEF' + ) diff --git a/test/units/modules/test_async_wrapper.py b/test/units/modules/test_async_wrapper.py new file mode 100644 index 0000000..37b1fda --- /dev/null +++ b/test/units/modules/test_async_wrapper.py @@ -0,0 +1,58 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import os +import json +import shutil +import tempfile + +import pytest + +from units.compat.mock import patch, MagicMock +from ansible.modules import async_wrapper + +from pprint import pprint + + +class TestAsyncWrapper: + + def test_run_module(self, monkeypatch): + + def mock_get_interpreter(module_path): + return ['/usr/bin/python'] + + module_result = {'rc': 0} + module_lines = [ + '#!/usr/bin/python', + 'import sys', + 'sys.stderr.write("stderr stuff")', + "print('%s')" % json.dumps(module_result) + ] + module_data = '\n'.join(module_lines) + '\n' + module_data = module_data.encode('utf-8') + + workdir = tempfile.mkdtemp() + fh, fn = tempfile.mkstemp(dir=workdir) + + with open(fn, 'wb') as f: + f.write(module_data) + + command = fn + jobid = 0 + job_path = os.path.join(os.path.dirname(command), 'job') + + monkeypatch.setattr(async_wrapper, '_get_interpreter', mock_get_interpreter) + monkeypatch.setattr(async_wrapper, 'job_path', job_path) + + res = async_wrapper._run_module(command, jobid) + + with open(os.path.join(workdir, 'job'), 'r') as f: + jres = json.loads(f.read()) + + shutil.rmtree(workdir) + + assert jres.get('rc') == 0 + assert jres.get('stderr') == 'stderr stuff' diff --git a/test/units/modules/test_copy.py b/test/units/modules/test_copy.py new file mode 100644 index 0000000..20c309b --- /dev/null +++ b/test/units/modules/test_copy.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Copyright: +# (c) 2018 Ansible Project +# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible.modules.copy import AnsibleModuleError, split_pre_existing_dir + +from ansible.module_utils.basic import AnsibleModule + + +THREE_DIRS_DATA = (('/dir1/dir2', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1', 'dir2']), + # 2 existing dirs: + ('/dir1', ['dir2']), + # 3 existing dirs: + ('/dir1/dir2', []) + ), + ('/dir1/dir2/', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1', 'dir2']), + # 2 existing dirs: + ('/dir1', ['dir2']), + # 3 existing dirs: + ('/dir1/dir2', []) + ), + ) + + +TWO_DIRS_DATA = (('dir1/dir2', + # 0 existing dirs: + ('.', ['dir1', 'dir2']), + # 1 existing dir: + ('dir1', ['dir2']), + # 2 existing dirs: + ('dir1/dir2', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ('dir1/dir2/', + # 0 existing dirs: + ('.', ['dir1', 'dir2']), + # 1 existing dir: + ('dir1', ['dir2']), + # 2 existing dirs: + ('dir1/dir2', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ('/dir1', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1']), + # 2 existing dirs: + ('/dir1', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ('/dir1/', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1']), + # 2 existing dirs: + ('/dir1', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ) + THREE_DIRS_DATA + + +ONE_DIR_DATA = (('dir1', + # 0 existing dirs: + ('.', ['dir1']), + # 1 existing dir: + ('dir1', []), + # 2 existing dirs: Same as 1 because we never get to the third + ), + ('dir1/', + # 0 existing dirs: + ('.', ['dir1']), + # 1 existing dir: + ('dir1', []), + # 2 existing dirs: Same as 1 because we never get to the third + ), + ) + TWO_DIRS_DATA + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[4]) for d in THREE_DIRS_DATA)) +def test_split_pre_existing_dir_three_levels_exist(directory, expected, mocker): + mocker.patch('os.path.exists', side_effect=[True, True, True]) + split_pre_existing_dir(directory) == expected + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[3]) for d in TWO_DIRS_DATA)) +def test_split_pre_existing_dir_two_levels_exist(directory, expected, mocker): + mocker.patch('os.path.exists', side_effect=[True, True, False]) + split_pre_existing_dir(directory) == expected + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[2]) for d in ONE_DIR_DATA)) +def test_split_pre_existing_dir_one_level_exists(directory, expected, mocker): + mocker.patch('os.path.exists', side_effect=[True, False, False]) + split_pre_existing_dir(directory) == expected + + +@pytest.mark.parametrize('directory', (d[0] for d in ONE_DIR_DATA if d[1] is None)) +def test_split_pre_existing_dir_root_does_not_exist(directory, mocker): + mocker.patch('os.path.exists', return_value=False) + with pytest.raises(AnsibleModuleError) as excinfo: + split_pre_existing_dir(directory) + assert excinfo.value.results['msg'].startswith("The '/' directory doesn't exist on this machine.") + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[1]) for d in ONE_DIR_DATA if not d[0].startswith('/'))) +def test_split_pre_existing_dir_working_dir_exists(directory, expected, mocker): + mocker.patch('os.path.exists', return_value=False) + split_pre_existing_dir(directory) == expected + + +# +# Info helpful for making new test cases: +# +# base_mode = {'dir no perms': 0o040000, +# 'file no perms': 0o100000, +# 'dir all perms': 0o400000 | 0o777, +# 'file all perms': 0o100000, | 0o777} +# +# perm_bits = {'x': 0b001, +# 'w': 0b010, +# 'r': 0b100} +# +# role_shift = {'u': 6, +# 'g': 3, +# 'o': 0} + +DATA = ( # Going from no permissions to setting all for user, group, and/or other + (0o040000, u'a+rwx', 0o0777), + (0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777), + (0o040000, u'o+rwx', 0o0007), + (0o040000, u'g+rwx', 0o0070), + (0o040000, u'u+rwx', 0o0700), + + # Going from all permissions to none for user, group, and/or other + (0o040777, u'a-rwx', 0o0000), + (0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000), + (0o040777, u'o-rwx', 0o0770), + (0o040777, u'g-rwx', 0o0707), + (0o040777, u'u-rwx', 0o0077), + + # now using absolute assignment from None to a set of perms + (0o040000, u'a=rwx', 0o0777), + (0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777), + (0o040000, u'o=rwx', 0o0007), + (0o040000, u'g=rwx', 0o0070), + (0o040000, u'u=rwx', 0o0700), + + # X effect on files and dirs + (0o040000, u'a+X', 0o0111), + (0o100000, u'a+X', 0), + (0o040000, u'a=X', 0o0111), + (0o100000, u'a=X', 0), + (0o040777, u'a-X', 0o0666), + # Same as chmod but is it a bug? + # chmod a-X statfile <== removes execute from statfile + (0o100777, u'a-X', 0o0666), + + # Multiple permissions + (0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755), + (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644), +) + +UMASK_DATA = ( + (0o100000, '+rwx', 0o770), + (0o100777, '-rwx', 0o007), +) + +INVALID_DATA = ( + (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"), + (0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"), +) + + +@pytest.mark.parametrize('stat_info, mode_string, expected', DATA) +def test_good_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected + + +@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA) +def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_umask = mocker.patch('os.umask') + mock_umask.return_value = 0o7 + + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected + + +@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA) +def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + with pytest.raises(ValueError) as exc: + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah' + assert exc.match(expected) diff --git a/test/units/modules/test_hostname.py b/test/units/modules/test_hostname.py new file mode 100644 index 0000000..9050fd0 --- /dev/null +++ b/test/units/modules/test_hostname.py @@ -0,0 +1,147 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import shutil +import tempfile + +from units.compat.mock import patch, MagicMock, mock_open +from ansible.module_utils import basic +from ansible.module_utils.common._utils import get_all_subclasses +from ansible.modules import hostname +from units.modules.utils import ModuleTestCase, set_module_args +from ansible.module_utils.six import PY2 + + +class TestHostname(ModuleTestCase): + @patch('os.path.isfile') + def test_stategy_get_never_writes_in_check_mode(self, isfile): + isfile.return_value = True + + set_module_args({'name': 'fooname', '_ansible_check_mode': True}) + subclasses = get_all_subclasses(hostname.BaseStrategy) + module = MagicMock() + for cls in subclasses: + instance = cls(module) + + instance.module.run_command = MagicMock() + instance.module.run_command.return_value = (0, '', '') + + m = mock_open() + builtins = 'builtins' + if PY2: + builtins = '__builtin__' + with patch('%s.open' % builtins, m): + instance.get_permanent_hostname() + instance.get_current_hostname() + self.assertFalse( + m.return_value.write.called, + msg='%s called write, should not have' % str(cls)) + + def test_all_named_strategies_exist(self): + """Loop through the STRATS and see if anything is missing.""" + for _name, prefix in hostname.STRATS.items(): + classname = "%sStrategy" % prefix + cls = getattr(hostname, classname, None) + + if cls is None: + self.assertFalse( + cls is None, "%s is None, should be a subclass" % classname + ) + else: + self.assertTrue(issubclass(cls, hostname.BaseStrategy)) + + +class TestRedhatStrategy(ModuleTestCase): + def setUp(self): + super(TestRedhatStrategy, self).setUp() + self.testdir = tempfile.mkdtemp(prefix='ansible-test-hostname-') + self.network_file = os.path.join(self.testdir, "network") + + def tearDown(self): + super(TestRedhatStrategy, self).tearDown() + shutil.rmtree(self.testdir, ignore_errors=True) + + @property + def instance(self): + self.module = MagicMock() + instance = hostname.RedHatStrategy(self.module) + instance.NETWORK_FILE = self.network_file + return instance + + def test_get_permanent_hostname_missing(self): + self.assertIsNone(self.instance.get_permanent_hostname()) + self.assertTrue(self.module.fail_json.called) + self.module.fail_json.assert_called_with( + "Unable to locate HOSTNAME entry in %s" % self.network_file + ) + + def test_get_permanent_hostname_line_missing(self): + with open(self.network_file, "w") as f: + f.write("# some other content\n") + self.assertIsNone(self.instance.get_permanent_hostname()) + self.module.fail_json.assert_called_with( + "Unable to locate HOSTNAME entry in %s" % self.network_file + ) + + def test_get_permanent_hostname_existing(self): + with open(self.network_file, "w") as f: + f.write( + "some other content\n" + "HOSTNAME=foobar\n" + "more content\n" + ) + self.assertEqual(self.instance.get_permanent_hostname(), "foobar") + + def test_get_permanent_hostname_existing_whitespace(self): + with open(self.network_file, "w") as f: + f.write( + "some other content\n" + " HOSTNAME=foobar \n" + "more content\n" + ) + self.assertEqual(self.instance.get_permanent_hostname(), "foobar") + + def test_set_permanent_hostname_missing(self): + self.instance.set_permanent_hostname("foobar") + with open(self.network_file) as f: + self.assertEqual(f.read(), "HOSTNAME=foobar\n") + + def test_set_permanent_hostname_line_missing(self): + with open(self.network_file, "w") as f: + f.write("# some other content\n") + self.instance.set_permanent_hostname("foobar") + with open(self.network_file) as f: + self.assertEqual(f.read(), "# some other content\nHOSTNAME=foobar\n") + + def test_set_permanent_hostname_existing(self): + with open(self.network_file, "w") as f: + f.write( + "some other content\n" + "HOSTNAME=spam\n" + "more content\n" + ) + self.instance.set_permanent_hostname("foobar") + with open(self.network_file) as f: + self.assertEqual( + f.read(), + "some other content\n" + "HOSTNAME=foobar\n" + "more content\n" + ) + + def test_set_permanent_hostname_existing_whitespace(self): + with open(self.network_file, "w") as f: + f.write( + "some other content\n" + " HOSTNAME=spam \n" + "more content\n" + ) + self.instance.set_permanent_hostname("foobar") + with open(self.network_file) as f: + self.assertEqual( + f.read(), + "some other content\n" + "HOSTNAME=foobar\n" + "more content\n" + ) diff --git a/test/units/modules/test_iptables.py b/test/units/modules/test_iptables.py new file mode 100644 index 0000000..265e770 --- /dev/null +++ b/test/units/modules/test_iptables.py @@ -0,0 +1,1192 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat.mock import patch +from ansible.module_utils import basic +from ansible.modules import iptables +from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args + + +def get_bin_path(*args, **kwargs): + return "/sbin/iptables" + + +def get_iptables_version(iptables_path, module): + return "1.8.2" + + +class TestIptables(ModuleTestCase): + + def setUp(self): + super(TestIptables, self).setUp() + self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path) + self.mock_get_bin_path.start() + self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone' + self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version) + self.mock_get_iptables_version.start() + self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone' + + def test_without_required_parameters(self): + """Failure must occurs when all parameters are missing""" + with self.assertRaises(AnsibleFailJson): + set_module_args({}) + iptables.main() + + def test_flush_table_without_chain(self): + """Test flush without chain, flush the table""" + set_module_args({ + 'flush': True, + }) + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.return_value = 0, '', '' # successful execution, no output + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables') + self.assertEqual(run_command.call_args[0][0][1], '-t') + self.assertEqual(run_command.call_args[0][0][2], 'filter') + self.assertEqual(run_command.call_args[0][0][3], '-F') + + def test_flush_table_check_true(self): + """Test flush without parameters and check == true""" + set_module_args({ + 'flush': True, + '_ansible_check_mode': True, + }) + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.return_value = 0, '', '' # successful execution, no output + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 0) + +# TODO ADD test flush table nat +# TODO ADD test flush with chain +# TODO ADD test flush with chain and table nat + + def test_policy_table(self): + """Test change policy of a chain""" + set_module_args({ + 'policy': 'ACCEPT', + 'chain': 'INPUT', + }) + commands_results = [ + (0, 'Chain INPUT (policy DROP)\n', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-L', + 'INPUT', + ]) + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-P', + 'INPUT', + 'ACCEPT', + ]) + + def test_policy_table_no_change(self): + """Test don't change policy of a chain if the policy is right""" + set_module_args({ + 'policy': 'ACCEPT', + 'chain': 'INPUT', + }) + commands_results = [ + (0, 'Chain INPUT (policy ACCEPT)\n', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertFalse(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-L', + 'INPUT', + ]) + + def test_policy_table_changed_false(self): + """Test flush without parameters and change == false""" + set_module_args({ + 'policy': 'ACCEPT', + 'chain': 'INPUT', + '_ansible_check_mode': True, + }) + commands_results = [ + (0, 'Chain INPUT (policy DROP)\n', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-L', + 'INPUT', + ]) + +# TODO ADD test policy without chain fail +# TODO ADD test policy with chain don't exists +# TODO ADD test policy with wrong choice fail + + def test_insert_rule_change_false(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'OUTPUT', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'ACCEPT', + 'action': 'insert', + '_ansible_check_mode': True, + }) + + commands_results = [ + (1, '', ''), # check_rule_present + (0, '', ''), # check_chain_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + + def test_insert_rule(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'OUTPUT', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'ACCEPT', + 'action': 'insert' + }) + + commands_results = [ + (1, '', ''), # check_rule_present + (0, '', ''), # check_chain_present + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 3) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + self.assertEqual(run_command.call_args_list[2][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-I', + 'OUTPUT', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + + def test_append_rule_check_mode(self): + """Test append a redirection rule in check mode""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'REDIRECT', + 'table': 'nat', + 'to_destination': '5.5.5.5/32', + 'protocol': 'udp', + 'destination_port': '22', + 'to_ports': '8600', + '_ansible_check_mode': True, + }) + + commands_results = [ + (1, '', ''), # check_rule_present + (0, '', ''), # check_chain_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'REDIRECT', + '--to-destination', + '5.5.5.5/32', + '--destination-port', + '22', + '--to-ports', + '8600' + ]) + + def test_append_rule(self): + """Test append a redirection rule""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'REDIRECT', + 'table': 'nat', + 'to_destination': '5.5.5.5/32', + 'protocol': 'udp', + 'destination_port': '22', + 'to_ports': '8600' + }) + + commands_results = [ + (1, '', ''), # check_rule_present + (0, '', ''), # check_chain_present + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 3) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'REDIRECT', + '--to-destination', + '5.5.5.5/32', + '--destination-port', + '22', + '--to-ports', + '8600' + ]) + self.assertEqual(run_command.call_args_list[2][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-A', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'REDIRECT', + '--to-destination', + '5.5.5.5/32', + '--destination-port', + '22', + '--to-ports', + '8600' + ]) + + def test_remove_rule(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'SNAT', + 'table': 'nat', + 'to_source': '5.5.5.5/32', + 'protocol': 'udp', + 'source_port': '22', + 'to_ports': '8600', + 'state': 'absent', + 'in_interface': 'eth0', + 'out_interface': 'eth1', + 'comment': 'this is a comment' + }) + + commands_results = [ + (0, '', ''), + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'SNAT', + '--to-source', + '5.5.5.5/32', + '-i', + 'eth0', + '-o', + 'eth1', + '--source-port', + '22', + '--to-ports', + '8600', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-D', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'SNAT', + '--to-source', + '5.5.5.5/32', + '-i', + 'eth0', + '-o', + 'eth1', + '--source-port', + '22', + '--to-ports', + '8600', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + + def test_remove_rule_check_mode(self): + """Test flush without parameters check mode""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'SNAT', + 'table': 'nat', + 'to_source': '5.5.5.5/32', + 'protocol': 'udp', + 'source_port': '22', + 'to_ports': '8600', + 'state': 'absent', + 'in_interface': 'eth0', + 'out_interface': 'eth1', + 'comment': 'this is a comment', + '_ansible_check_mode': True, + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'SNAT', + '--to-source', + '5.5.5.5/32', + '-i', + 'eth0', + '-o', + 'eth1', + '--source-port', + '22', + '--to-ports', + '8600', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + + def test_insert_with_reject(self): + """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """ + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'tcp', + 'reject_with': 'tcp-reset', + 'ip_version': 'ipv4', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-p', + 'tcp', + '-j', + 'REJECT', + '--reject-with', + 'tcp-reset', + ]) + + def test_insert_jump_reject_with_reject(self): + """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """ + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'tcp', + 'jump': 'REJECT', + 'reject_with': 'tcp-reset', + 'ip_version': 'ipv4', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-p', + 'tcp', + '-j', + 'REJECT', + '--reject-with', + 'tcp-reset', + ]) + + def test_jump_tee_gateway_negative(self): + """ Missing gateway when JUMP is set to TEE """ + set_module_args({ + 'table': 'mangle', + 'chain': 'PREROUTING', + 'in_interface': 'eth0', + 'protocol': 'udp', + 'match': 'state', + 'jump': 'TEE', + 'ctstate': ['NEW'], + 'destination_port': '9521', + 'destination': '127.0.0.1' + }) + + with self.assertRaises(AnsibleFailJson) as e: + iptables.main() + self.assertTrue(e.exception.args[0]['failed']) + self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway') + + def test_jump_tee_gateway(self): + """ Using gateway when JUMP is set to TEE """ + set_module_args({ + 'table': 'mangle', + 'chain': 'PREROUTING', + 'in_interface': 'eth0', + 'protocol': 'udp', + 'match': 'state', + 'jump': 'TEE', + 'ctstate': ['NEW'], + 'destination_port': '9521', + 'gateway': '192.168.10.1', + 'destination': '127.0.0.1' + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'mangle', + '-C', 'PREROUTING', + '-p', 'udp', + '-d', '127.0.0.1', + '-m', 'state', + '-j', 'TEE', + '--gateway', '192.168.10.1', + '-i', 'eth0', + '--destination-port', '9521', + '--state', 'NEW' + ]) + + def test_tcp_flags(self): + """ Test various ways of inputting tcp_flags """ + args = [ + { + 'chain': 'OUTPUT', + 'protocol': 'tcp', + 'jump': 'DROP', + 'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"' + }, + { + 'chain': 'OUTPUT', + 'protocol': 'tcp', + 'jump': 'DROP', + 'tcp_flags': { + 'flags': 'ALL', + 'flags_set': 'ACK,RST,SYN,FIN' + } + }, + { + 'chain': 'OUTPUT', + 'protocol': 'tcp', + 'jump': 'DROP', + 'tcp_flags': { + 'flags': ['ALL'], + 'flags_set': ['ACK', 'RST', 'SYN', 'FIN'] + } + }, + + ] + + for item in args: + set_module_args(item) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-p', + 'tcp', + '--tcp-flags', + 'ALL', + 'ACK,RST,SYN,FIN', + '-j', + 'DROP' + ]) + + def test_log_level(self): + """ Test various ways of log level flag """ + + log_levels = ['0', '1', '2', '3', '4', '5', '6', '7', + 'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug'] + + for log_lvl in log_levels: + set_module_args({ + 'chain': 'INPUT', + 'jump': 'LOG', + 'log_level': log_lvl, + 'source': '1.2.3.4/32', + 'log_prefix': '** DROP-this_ip **' + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'INPUT', + '-s', '1.2.3.4/32', + '-j', 'LOG', + '--log-prefix', '** DROP-this_ip **', + '--log-level', log_lvl + ]) + + def test_iprange(self): + """ Test iprange module with its flags src_range and dst_range """ + set_module_args({ + 'chain': 'INPUT', + 'match': ['iprange'], + 'src_range': '192.168.1.100-192.168.1.199', + 'jump': 'ACCEPT' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-m', + 'iprange', + '-j', + 'ACCEPT', + '--src-range', + '192.168.1.100-192.168.1.199', + ]) + + set_module_args({ + 'chain': 'INPUT', + 'src_range': '192.168.1.100-192.168.1.199', + 'dst_range': '10.0.0.50-10.0.0.100', + 'jump': 'ACCEPT' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-j', + 'ACCEPT', + '-m', + 'iprange', + '--src-range', + '192.168.1.100-192.168.1.199', + '--dst-range', + '10.0.0.50-10.0.0.100' + ]) + + set_module_args({ + 'chain': 'INPUT', + 'dst_range': '10.0.0.50-10.0.0.100', + 'jump': 'ACCEPT' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-j', + 'ACCEPT', + '-m', + 'iprange', + '--dst-range', + '10.0.0.50-10.0.0.100' + ]) + + def test_insert_rule_with_wait(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'OUTPUT', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'ACCEPT', + 'action': 'insert', + 'wait': '10' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-w', + '10', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + + def test_comment_position_at_end(self): + """Test comment position to make sure it is at the end of command""" + set_module_args({ + 'chain': 'INPUT', + 'jump': 'ACCEPT', + 'action': 'insert', + 'ctstate': ['NEW'], + 'comment': 'this is a comment', + '_ansible_check_mode': True, + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-j', + 'ACCEPT', + '-m', + 'conntrack', + '--ctstate', + 'NEW', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + self.assertEqual(run_command.call_args[0][0][14], 'this is a comment') + + def test_destination_ports(self): + """ Test multiport module usage with multiple ports """ + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'tcp', + 'in_interface': 'eth0', + 'source': '192.168.0.1/32', + 'destination_ports': ['80', '443', '8081:8085'], + 'jump': 'ACCEPT', + 'comment': 'this is a comment', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'INPUT', + '-p', 'tcp', + '-s', '192.168.0.1/32', + '-j', 'ACCEPT', + '-m', 'multiport', + '--dports', '80,443,8081:8085', + '-i', 'eth0', + '-m', 'comment', + '--comment', 'this is a comment' + ]) + + def test_match_set(self): + """ Test match_set together with match_set_flags """ + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'tcp', + 'match_set': 'admin_hosts', + 'match_set_flags': 'src', + 'destination_port': '22', + 'jump': 'ACCEPT', + 'comment': 'this is a comment', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'INPUT', + '-p', 'tcp', + '-j', 'ACCEPT', + '--destination-port', '22', + '-m', 'set', + '--match-set', 'admin_hosts', 'src', + '-m', 'comment', + '--comment', 'this is a comment' + ]) + + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'udp', + 'match_set': 'banned_hosts', + 'match_set_flags': 'src,dst', + 'jump': 'REJECT', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'INPUT', + '-p', 'udp', + '-j', 'REJECT', + '-m', 'set', + '--match-set', 'banned_hosts', 'src,dst' + ]) + + def test_chain_creation(self): + """Test chain creation when absent""" + set_module_args({ + 'chain': 'FOOBAR', + 'state': 'present', + 'chain_management': True, + }) + + commands_results = [ + (1, '', ''), # check_rule_present + (1, '', ''), # check_chain_present + (0, '', ''), # create_chain + (0, '', ''), # append_rule + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 4) + + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'FOOBAR', + ]) + + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-L', 'FOOBAR', + ]) + + self.assertEqual(run_command.call_args_list[2][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-N', 'FOOBAR', + ]) + + self.assertEqual(run_command.call_args_list[3][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-A', 'FOOBAR', + ]) + + commands_results = [ + (0, '', ''), # check_rule_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertFalse(result.exception.args[0]['changed']) + + def test_chain_creation_check_mode(self): + """Test chain creation when absent""" + set_module_args({ + 'chain': 'FOOBAR', + 'state': 'present', + 'chain_management': True, + '_ansible_check_mode': True, + }) + + commands_results = [ + (1, '', ''), # check_rule_present + (1, '', ''), # check_chain_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'FOOBAR', + ]) + + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-L', 'FOOBAR', + ]) + + commands_results = [ + (0, '', ''), # check_rule_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertFalse(result.exception.args[0]['changed']) + + def test_chain_deletion(self): + """Test chain deletion when present""" + set_module_args({ + 'chain': 'FOOBAR', + 'state': 'absent', + 'chain_management': True, + }) + + commands_results = [ + (0, '', ''), # check_chain_present + (0, '', ''), # delete_chain + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-L', 'FOOBAR', + ]) + + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-X', 'FOOBAR', + ]) + + commands_results = [ + (1, '', ''), # check_rule_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertFalse(result.exception.args[0]['changed']) + + def test_chain_deletion_check_mode(self): + """Test chain deletion when present""" + set_module_args({ + 'chain': 'FOOBAR', + 'state': 'absent', + 'chain_management': True, + '_ansible_check_mode': True, + }) + + commands_results = [ + (0, '', ''), # check_chain_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-L', 'FOOBAR', + ]) + + commands_results = [ + (1, '', ''), # check_rule_present + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertFalse(result.exception.args[0]['changed']) diff --git a/test/units/modules/test_known_hosts.py b/test/units/modules/test_known_hosts.py new file mode 100644 index 0000000..123dd75 --- /dev/null +++ b/test/units/modules/test_known_hosts.py @@ -0,0 +1,110 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import tempfile +from ansible.module_utils import basic + +from units.compat import unittest +from ansible.module_utils._text import to_bytes +from ansible.module_utils.basic import AnsibleModule + +from ansible.modules.known_hosts import compute_diff, sanity_check + + +class KnownHostsDiffTestCase(unittest.TestCase): + + def _create_file(self, content): + tmp_file = tempfile.NamedTemporaryFile(prefix='ansible-test-', suffix='-known_hosts', delete=False) + tmp_file.write(to_bytes(content)) + tmp_file.close() + self.addCleanup(os.unlink, tmp_file.name) + return tmp_file.name + + def test_no_existing_file(self): + path = "/tmp/this_file_does_not_exists_known_hosts" + key = 'example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key) + self.assertEqual(diff, { + 'before_header': '/dev/null', + 'after_header': path, + 'before': '', + 'after': 'example.com ssh-rsa AAAAetc\n', + }) + + def test_key_addition(self): + path = self._create_file( + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'two.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n', + }) + + def test_no_change(self): + path = self._create_file( + 'one.example.com ssh-rsa AAAAetc\n' + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=1, replace_or_add=False, state='present', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n', + 'after': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n', + }) + + def test_key_change(self): + path = self._create_file( + 'one.example.com ssh-rsa AAAaetc\n' + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=1, replace_or_add=True, state='present', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'one.example.com ssh-rsa AAAaetc\ntwo.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n', + }) + + def test_key_removal(self): + path = self._create_file( + 'one.example.com ssh-rsa AAAAetc\n' + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=1, replace_or_add=False, state='absent', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\n', + }) + + def test_key_removal_no_change(self): + path = self._create_file( + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=None, replace_or_add=False, state='absent', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'two.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\n', + }) + + def test_sanity_check(self): + basic._load_params = lambda: {} + # Module used internally to execute ssh-keygen system executable + module = AnsibleModule(argument_spec={}) + host = '10.0.0.1' + key = '%s ssh-rsa ASDF foo@bar' % (host,) + keygen = module.get_bin_path('ssh-keygen') + sanity_check(module, host, key, keygen) diff --git a/test/units/modules/test_pip.py b/test/units/modules/test_pip.py new file mode 100644 index 0000000..5640b80 --- /dev/null +++ b/test/units/modules/test_pip.py @@ -0,0 +1,40 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +import pytest + +from ansible.modules import pip + + +pytestmark = pytest.mark.usefixtures('patch_ansible_module') + + +@pytest.mark.parametrize('patch_ansible_module', [{'name': 'six'}], indirect=['patch_ansible_module']) +def test_failure_when_pip_absent(mocker, capfd): + mocker.patch('ansible.modules.pip._have_pip_module').return_value = False + + get_bin_path = mocker.patch('ansible.module_utils.basic.AnsibleModule.get_bin_path') + get_bin_path.return_value = None + + with pytest.raises(SystemExit): + pip.main() + + out, err = capfd.readouterr() + results = json.loads(out) + assert results['failed'] + assert 'pip needs to be installed' in results['msg'] + + +@pytest.mark.parametrize('patch_ansible_module, test_input, expected', [ + [None, ['django>1.11.1', '<1.11.2', 'ipaddress', 'simpleproject<2.0.0', '>1.1.0'], + ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']], + [None, ['django>1.11.1,<1.11.2,ipaddress', 'simpleproject<2.0.0,>1.1.0'], + ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']], + [None, ['django>1.11.1', '<1.11.2', 'ipaddress,simpleproject<2.0.0,>1.1.0'], + ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']]]) +def test_recover_package_name(test_input, expected): + assert pip._recover_package_name(test_input) == expected diff --git a/test/units/modules/test_service.py b/test/units/modules/test_service.py new file mode 100644 index 0000000..caabd74 --- /dev/null +++ b/test/units/modules/test_service.py @@ -0,0 +1,70 @@ +# Copyright: (c) 2021, Ansible Project +# Copyright: (c) 2021, Abhijeet Kasurde <akasurde@redhat.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import json +import platform + +import pytest +from ansible.modules import service +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.six import PY2 +from units.modules.utils import set_module_args + + +def mocker_sunos_service(mocker): + """ + Configure common mocker object for SunOSService + """ + platform_system = mocker.patch.object(platform, "system") + platform_system.return_value = "SunOS" + + get_bin_path = mocker.patch.object(AnsibleModule, "get_bin_path") + get_bin_path.return_value = "/usr/bin/svcs" + + # Read a mocked /etc/release file + mocked_etc_release_data = mocker.mock_open( + read_data=" Oracle Solaris 12.0") + builtin_open = "__builtin__.open" if PY2 else "builtins.open" + mocker.patch(builtin_open, mocked_etc_release_data) + + service_status = mocker.patch.object( + service.Service, "modify_service_state") + service_status.return_value = (0, "", "") + + get_sunos_svcs_status = mocker.patch.object( + service.SunOSService, "get_sunos_svcs_status") + get_sunos_svcs_status.return_value = "offline" + get_service_status = mocker.patch.object( + service.Service, "get_service_status") + get_service_status.return_value = "" + + mocker.patch('ansible.module_utils.common.sys_info.distro.id', return_value='') + + +@pytest.fixture +def mocked_sunos_service(mocker): + mocker_sunos_service(mocker) + + +def test_sunos_service_start(mocked_sunos_service, capfd): + """ + test SunOS Service Start + """ + set_module_args( + { + "name": "environment", + "state": "started", + } + ) + with pytest.raises(SystemExit): + service.main() + + out, dummy = capfd.readouterr() + results = json.loads(out) + assert not results.get("failed") + assert results["changed"] diff --git a/test/units/modules/test_service_facts.py b/test/units/modules/test_service_facts.py new file mode 100644 index 0000000..07f6827 --- /dev/null +++ b/test/units/modules/test_service_facts.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2020 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.compat import unittest +from units.compat.mock import patch + +from ansible.module_utils import basic +from ansible.modules.service_facts import AIXScanService + + +# AIX # lssrc -a +LSSRC_OUTPUT = """ +Subsystem Group PID Status + sendmail mail 5243302 active + syslogd ras 5636528 active + portmap portmap 5177768 active + snmpd tcpip 5308844 active + hostmibd tcpip 5374380 active + snmpmibd tcpip 5439918 active + aixmibd tcpip 5505456 active + nimsh nimclient 5571004 active + aso 6029758 active + biod nfs 6357464 active + nfsd nfs 5701906 active + rpc.mountd nfs 6488534 active + rpc.statd nfs 7209216 active + rpc.lockd nfs 7274988 active + qdaemon spooler 6816222 active + writesrv spooler 6685150 active + clcomd caa 7471600 active + sshd ssh 7602674 active + pfcdaemon 7012860 active + ctrmc rsct 6947312 active + IBM.HostRM rsct_rm 14418376 active + IBM.ConfigRM rsct_rm 6160674 active + IBM.DRM rsct_rm 14680550 active + IBM.MgmtDomainRM rsct_rm 14090676 active + IBM.ServiceRM rsct_rm 13828542 active + cthats cthats 13959668 active + cthags cthags 14025054 active + IBM.StorageRM rsct_rm 12255706 active + inetd tcpip 12517828 active + lpd spooler inoperative + keyserv keyserv inoperative + ypbind yp inoperative + gsclvmd inoperative + cdromd inoperative + ndpd-host tcpip inoperative + ndpd-router tcpip inoperative + netcd netcd inoperative + tftpd tcpip inoperative + routed tcpip inoperative + mrouted tcpip inoperative + rsvpd qos inoperative + policyd qos inoperative + timed tcpip inoperative + iptrace tcpip inoperative + dpid2 tcpip inoperative + rwhod tcpip inoperative + pxed tcpip inoperative + binld tcpip inoperative + xntpd tcpip inoperative + gated tcpip inoperative + dhcpcd tcpip inoperative + dhcpcd6 tcpip inoperative + dhcpsd tcpip inoperative + dhcpsdv6 tcpip inoperative + dhcprd tcpip inoperative + dfpd tcpip inoperative + named tcpip inoperative + automountd autofs inoperative + nfsrgyd nfs inoperative + gssd nfs inoperative + cpsd ike inoperative + tmd ike inoperative + isakmpd inoperative + ikev2d inoperative + iked ike inoperative + clconfd caa inoperative + ksys_vmmon inoperative + nimhttp inoperative + IBM.SRVPROXY ibmsrv inoperative + ctcas rsct inoperative + IBM.ERRM rsct_rm inoperative + IBM.AuditRM rsct_rm inoperative + isnstgtd isnstgtd inoperative + IBM.LPRM rsct_rm inoperative + cthagsglsm cthags inoperative +""" + + +class TestAIXScanService(unittest.TestCase): + + def setUp(self): + self.mock1 = patch.object(basic.AnsibleModule, 'get_bin_path', return_value='/usr/sbin/lssrc') + self.mock1.start() + self.addCleanup(self.mock1.stop) + self.mock2 = patch.object(basic.AnsibleModule, 'run_command', return_value=(0, LSSRC_OUTPUT, '')) + self.mock2.start() + self.addCleanup(self.mock2.stop) + self.mock3 = patch('platform.system', return_value='AIX') + self.mock3.start() + self.addCleanup(self.mock3.stop) + + def test_gather_services(self): + svcmod = AIXScanService(basic.AnsibleModule) + result = svcmod.gather_services() + + self.assertIsInstance(result, dict) + + self.assertIn('IBM.HostRM', result) + self.assertEqual(result['IBM.HostRM'], { + 'name': 'IBM.HostRM', + 'source': 'src', + 'state': 'running', + }) + self.assertIn('IBM.AuditRM', result) + self.assertEqual(result['IBM.AuditRM'], { + 'name': 'IBM.AuditRM', + 'source': 'src', + 'state': 'stopped', + }) diff --git a/test/units/modules/test_systemd.py b/test/units/modules/test_systemd.py new file mode 100644 index 0000000..52c212a --- /dev/null +++ b/test/units/modules/test_systemd.py @@ -0,0 +1,52 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.modules.systemd import parse_systemctl_show + + +class ParseSystemctlShowTestCase(unittest.TestCase): + + def test_simple(self): + lines = [ + 'Type=simple', + 'Restart=no', + 'Requires=system.slice sysinit.target', + 'Description=Blah blah blah', + ] + parsed = parse_systemctl_show(lines) + self.assertEqual(parsed, { + 'Type': 'simple', + 'Restart': 'no', + 'Requires': 'system.slice sysinit.target', + 'Description': 'Blah blah blah', + }) + + def test_multiline_exec(self): + # This was taken from a real service that specified "ExecStart=/bin/echo foo\nbar" + lines = [ + 'Type=simple', + 'ExecStart={ path=/bin/echo ; argv[]=/bin/echo foo', + 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }', + 'Description=blah', + ] + parsed = parse_systemctl_show(lines) + self.assertEqual(parsed, { + 'Type': 'simple', + 'ExecStart': '{ path=/bin/echo ; argv[]=/bin/echo foo\n' + 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }', + 'Description': 'blah', + }) + + def test_single_line_with_brace(self): + lines = [ + 'Type=simple', + 'Description={ this is confusing', + 'Restart=no', + ] + parsed = parse_systemctl_show(lines) + self.assertEqual(parsed, { + 'Type': 'simple', + 'Description': '{ this is confusing', + 'Restart': 'no', + }) diff --git a/test/units/modules/test_unarchive.py b/test/units/modules/test_unarchive.py new file mode 100644 index 0000000..3e7a58c --- /dev/null +++ b/test/units/modules/test_unarchive.py @@ -0,0 +1,93 @@ +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +import pytest + +from ansible.modules.unarchive import ZipArchive, TgzArchive + + +class AnsibleModuleExit(Exception): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + +class ExitJson(AnsibleModuleExit): + pass + + +class FailJson(AnsibleModuleExit): + pass + + +@pytest.fixture +def fake_ansible_module(): + return FakeAnsibleModule() + + +class FakeAnsibleModule: + def __init__(self): + self.params = {} + self.tmpdir = None + + def exit_json(self, *args, **kwargs): + raise ExitJson(*args, **kwargs) + + def fail_json(self, *args, **kwargs): + raise FailJson(*args, **kwargs) + + +class TestCaseZipArchive: + @pytest.mark.parametrize( + 'side_effect, expected_reason', ( + ([ValueError, '/bin/zipinfo'], "Unable to find required 'unzip'"), + (ValueError, "Unable to find required 'unzip' or 'zipinfo'"), + ) + ) + def test_no_zip_zipinfo_binary(self, mocker, fake_ansible_module, side_effect, expected_reason): + mocker.patch("ansible.modules.unarchive.get_bin_path", side_effect=side_effect) + fake_ansible_module.params = { + "extra_opts": "", + "exclude": "", + "include": "", + "io_buffer_size": 65536, + } + + z = ZipArchive( + src="", + b_dest="", + file_args="", + module=fake_ansible_module, + ) + can_handle, reason = z.can_handle_archive() + + assert can_handle is False + assert expected_reason in reason + assert z.cmd_path is None + + +class TestCaseTgzArchive: + def test_no_tar_binary(self, mocker, fake_ansible_module): + mocker.patch("ansible.modules.unarchive.get_bin_path", side_effect=ValueError) + fake_ansible_module.params = { + "extra_opts": "", + "exclude": "", + "include": "", + "io_buffer_size": 65536, + } + fake_ansible_module.check_mode = False + + t = TgzArchive( + src="", + b_dest="", + file_args="", + module=fake_ansible_module, + ) + can_handle, reason = t.can_handle_archive() + + assert can_handle is False + assert 'Unable to find required' in reason + assert t.cmd_path is None + assert t.tar_type is None diff --git a/test/units/modules/test_yum.py b/test/units/modules/test_yum.py new file mode 100644 index 0000000..8052eff --- /dev/null +++ b/test/units/modules/test_yum.py @@ -0,0 +1,222 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest + +from ansible.modules.yum import YumModule + + +yum_plugin_load_error = """ +Plugin "product-id" can't be imported +Plugin "search-disabled-repos" can't be imported +Plugin "subscription-manager" can't be imported +Plugin "product-id" can't be imported +Plugin "search-disabled-repos" can't be imported +Plugin "subscription-manager" can't be imported +""" + +# from https://github.com/ansible/ansible/issues/20608#issuecomment-276106505 +wrapped_output_1 = """ +Загружены модули: fastestmirror +Loading mirror speeds from cached hostfile + * base: mirror.h1host.ru + * extras: mirror.h1host.ru + * updates: mirror.h1host.ru + +vms-agent.x86_64 0.0-9 dev +""" + +# from https://github.com/ansible/ansible/issues/20608#issuecomment-276971275 +wrapped_output_2 = """ +Загружены модули: fastestmirror +Loading mirror speeds from cached hostfile + * base: mirror.corbina.net + * extras: mirror.corbina.net + * updates: mirror.corbina.net + +empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty.x86_64 + 0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1-0 + addons +libtiff.x86_64 4.0.3-27.el7_3 updates +""" + +# From https://github.com/ansible/ansible/issues/20608#issuecomment-276698431 +wrapped_output_3 = """ +Loaded plugins: fastestmirror, langpacks +Loading mirror speeds from cached hostfile + +ceph.x86_64 1:11.2.0-0.el7 ceph +ceph-base.x86_64 1:11.2.0-0.el7 ceph +ceph-common.x86_64 1:11.2.0-0.el7 ceph +ceph-mds.x86_64 1:11.2.0-0.el7 ceph +ceph-mon.x86_64 1:11.2.0-0.el7 ceph +ceph-osd.x86_64 1:11.2.0-0.el7 ceph +ceph-selinux.x86_64 1:11.2.0-0.el7 ceph +libcephfs1.x86_64 1:11.0.2-0.el7 ceph +librados2.x86_64 1:11.2.0-0.el7 ceph +libradosstriper1.x86_64 1:11.2.0-0.el7 ceph +librbd1.x86_64 1:11.2.0-0.el7 ceph +librgw2.x86_64 1:11.2.0-0.el7 ceph +python-cephfs.x86_64 1:11.2.0-0.el7 ceph +python-rados.x86_64 1:11.2.0-0.el7 ceph +python-rbd.x86_64 1:11.2.0-0.el7 ceph +""" + +# from https://github.com/ansible/ansible-modules-core/issues/4318#issuecomment-251416661 +wrapped_output_4 = """ +ipxe-roms-qemu.noarch 20160127-1.git6366fa7a.el7 + rhelosp-9.0-director-puddle +quota.x86_64 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z +quota-nls.noarch 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z +rdma.noarch 7.2_4.1_rc6-2.el7 rhelosp-rhel-7.2-z +screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2 + rhelosp-rhel-7.2-z +sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle +sssd-client.x86_64 1.13.0-40.el7_2.12 rhelosp-rhel-7.2-z +""" + + +# A 'normal-ish' yum check-update output, without any wrapped lines +unwrapped_output_rhel7 = """ + +Loaded plugins: etckeeper, product-id, search-disabled-repos, subscription- + : manager +This system is not registered to Red Hat Subscription Management. You can use subscription-manager to register. + +NetworkManager-openvpn.x86_64 1:1.2.6-1.el7 epel +NetworkManager-openvpn-gnome.x86_64 1:1.2.6-1.el7 epel +cabal-install.x86_64 1.16.1.0-2.el7 epel +cgit.x86_64 1.1-1.el7 epel +python34-libs.x86_64 3.4.5-3.el7 epel +python34-test.x86_64 3.4.5-3.el7 epel +python34-tkinter.x86_64 3.4.5-3.el7 epel +python34-tools.x86_64 3.4.5-3.el7 epel +qgit.x86_64 2.6-4.el7 epel +rdiff-backup.x86_64 1.2.8-12.el7 epel +stoken-libs.x86_64 0.91-1.el7 epel +xlockmore.x86_64 5.49-2.el7 epel +""" + +# Some wrapped obsoletes for prepending to output for testing both +wrapped_output_rhel7_obsoletes_postfix = """ +Obsoleting Packages +ddashboard.x86_64 0.2.0.1-1.el7_3 mhlavink-developerdashboard + developerdashboard.x86_64 0.1.12.2-1.el7_2 @mhlavink-developerdashboard +python-bugzilla.noarch 1.2.2-3.el7_2.1 mhlavink-developerdashboard + python-bugzilla-develdashboardfixes.noarch + 1.2.2-3.el7 @mhlavink-developerdashboard +python2-futures.noarch 3.0.5-1.el7 epel + python-futures.noarch 3.0.3-1.el7 @epel +python2-pip.noarch 8.1.2-5.el7 epel + python-pip.noarch 7.1.0-1.el7 @epel +python2-pyxdg.noarch 0.25-6.el7 epel + pyxdg.noarch 0.25-5.el7 @epel +python2-simplejson.x86_64 3.10.0-1.el7 epel + python-simplejson.x86_64 3.3.3-1.el7 @epel +Security: kernel-3.10.0-327.28.2.el7.x86_64 is an installed security update +Security: kernel-3.10.0-327.22.2.el7.x86_64 is the currently running version +""" + +wrapped_output_multiple_empty_lines = """ +Loaded plugins: langpacks, product-id, search-disabled-repos, subscription-manager + +This system is not registered with an entitlement server. You can use subscription-manager to register. + + +screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2 + rhelosp-rhel-7.2-z +sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle +""" + +longname = """ +Loaded plugins: fastestmirror, priorities, rhnplugin +This system is receiving updates from RHN Classic or Red Hat Satellite. +Loading mirror speeds from cached hostfile + +xxxxxxxxxxxxxxxxxxxxxxxxxx.noarch + 1.16-1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +glibc.x86_64 2.17-157.el7_3.1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx""" + + +unwrapped_output_rhel7_obsoletes = unwrapped_output_rhel7 + wrapped_output_rhel7_obsoletes_postfix +unwrapped_output_rhel7_expected_new_obsoletes_pkgs = [ + "ddashboard", "python-bugzilla", "python2-futures", "python2-pip", + "python2-pyxdg", "python2-simplejson" +] +unwrapped_output_rhel7_expected_old_obsoletes_pkgs = [ + "developerdashboard", "python-bugzilla-develdashboardfixes", + "python-futures", "python-pip", "pyxdg", "python-simplejson" +] +unwrapped_output_rhel7_expected_updated_pkgs = [ + "NetworkManager-openvpn", "NetworkManager-openvpn-gnome", "cabal-install", + "cgit", "python34-libs", "python34-test", "python34-tkinter", + "python34-tools", "qgit", "rdiff-backup", "stoken-libs", "xlockmore" +] + + +class TestYumUpdateCheckParse(unittest.TestCase): + def _assert_expected(self, expected_pkgs, result): + + for expected_pkg in expected_pkgs: + self.assertIn(expected_pkg, result) + self.assertEqual(len(result), len(expected_pkgs)) + self.assertIsInstance(result, dict) + + def test_empty_output(self): + res, obs = YumModule.parse_check_update("") + expected_pkgs = [] + self._assert_expected(expected_pkgs, res) + + def test_longname(self): + res, obs = YumModule.parse_check_update(longname) + expected_pkgs = ['xxxxxxxxxxxxxxxxxxxxxxxxxx', 'glibc'] + self._assert_expected(expected_pkgs, res) + + def test_plugin_load_error(self): + res, obs = YumModule.parse_check_update(yum_plugin_load_error) + expected_pkgs = [] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_1(self): + res, obs = YumModule.parse_check_update(wrapped_output_1) + expected_pkgs = ["vms-agent"] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_2(self): + res, obs = YumModule.parse_check_update(wrapped_output_2) + expected_pkgs = ["empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty", + "libtiff"] + + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_3(self): + res, obs = YumModule.parse_check_update(wrapped_output_3) + expected_pkgs = ["ceph", "ceph-base", "ceph-common", "ceph-mds", + "ceph-mon", "ceph-osd", "ceph-selinux", "libcephfs1", + "librados2", "libradosstriper1", "librbd1", "librgw2", + "python-cephfs", "python-rados", "python-rbd"] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_4(self): + res, obs = YumModule.parse_check_update(wrapped_output_4) + + expected_pkgs = ["ipxe-roms-qemu", "quota", "quota-nls", "rdma", "screen", + "sos", "sssd-client"] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_rhel7(self): + res, obs = YumModule.parse_check_update(unwrapped_output_rhel7) + self._assert_expected(unwrapped_output_rhel7_expected_updated_pkgs, res) + + def test_wrapped_output_rhel7_obsoletes(self): + res, obs = YumModule.parse_check_update(unwrapped_output_rhel7_obsoletes) + self._assert_expected( + unwrapped_output_rhel7_expected_updated_pkgs + unwrapped_output_rhel7_expected_new_obsoletes_pkgs, + res + ) + self._assert_expected(unwrapped_output_rhel7_expected_old_obsoletes_pkgs, obs) + + def test_wrapped_output_multiple_empty_lines(self): + res, obs = YumModule.parse_check_update(wrapped_output_multiple_empty_lines) + self._assert_expected(['screen', 'sos'], res) diff --git a/test/units/modules/utils.py b/test/units/modules/utils.py new file mode 100644 index 0000000..6d169e3 --- /dev/null +++ b/test/units/modules/utils.py @@ -0,0 +1,50 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +from units.compat import unittest +from units.compat.mock import patch +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes + + +def set_module_args(args): + if '_ansible_remote_tmp' not in args: + args['_ansible_remote_tmp'] = '/tmp' + if '_ansible_keep_remote_files' not in args: + args['_ansible_keep_remote_files'] = False + + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) + + +class AnsibleExitJson(Exception): + pass + + +class AnsibleFailJson(Exception): + pass + + +def exit_json(*args, **kwargs): + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class ModuleTestCase(unittest.TestCase): + + def setUp(self): + self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json) + self.mock_module.start() + self.mock_sleep = patch('time.sleep') + self.mock_sleep.start() + set_module_args({}) + self.addCleanup(self.mock_module.stop) + self.addCleanup(self.mock_sleep.stop) |