diff options
Diffstat (limited to '')
-rw-r--r-- | test/units/modules/__init__.py | 0 | ||||
-rw-r--r-- | test/units/modules/conftest.py | 31 | ||||
-rw-r--r-- | test/units/modules/test_apt.py | 53 | ||||
-rw-r--r-- | test/units/modules/test_async_wrapper.py | 57 | ||||
-rw-r--r-- | test/units/modules/test_copy.py | 215 | ||||
-rw-r--r-- | test/units/modules/test_iptables.py | 919 | ||||
-rw-r--r-- | test/units/modules/test_known_hosts.py | 110 | ||||
-rw-r--r-- | test/units/modules/test_pip.py | 38 | ||||
-rw-r--r-- | test/units/modules/test_systemd.py | 52 | ||||
-rw-r--r-- | test/units/modules/test_yum.py | 207 | ||||
-rw-r--r-- | test/units/modules/utils.py | 50 |
11 files changed, 1732 insertions, 0 deletions
diff --git a/test/units/modules/__init__.py b/test/units/modules/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/modules/__init__.py diff --git a/test/units/modules/conftest.py b/test/units/modules/conftest.py new file mode 100644 index 00000000..a7d1e047 --- /dev/null +++ b/test/units/modules/conftest.py @@ -0,0 +1,31 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +import pytest + +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_bytes +from ansible.module_utils.common._collections_compat import MutableMapping + + +@pytest.fixture +def patch_ansible_module(request, mocker): + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if 'ANSIBLE_MODULE_ARGS' not in request.param: + request.param = {'ANSIBLE_MODULE_ARGS': request.param} + if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']: + request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp' + if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']: + request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False + args = json.dumps(request.param) + else: + raise Exception('Malformed data to the patch_ansible_module pytest fixture') + + mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args)) diff --git a/test/units/modules/test_apt.py b/test/units/modules/test_apt.py new file mode 100644 index 00000000..3daf3c11 --- /dev/null +++ b/test/units/modules/test_apt.py @@ -0,0 +1,53 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import collections +import sys + +from units.compat import mock +from units.compat import unittest + +try: + from ansible.modules.apt import ( + expand_pkgspec_from_fnmatches, + ) +except Exception: + # Need some more module_utils work (porting urls.py) before we can test + # modules. So don't error out in this case. + if sys.version_info[0] >= 3: + pass + + +class AptExpandPkgspecTestCase(unittest.TestCase): + + def setUp(self): + FakePackage = collections.namedtuple("Package", ("name",)) + self.fake_cache = [ + FakePackage("apt"), + FakePackage("apt-utils"), + FakePackage("not-selected"), + ] + + def test_trivial(self): + foo = ["apt"] + self.assertEqual( + expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) + + def test_version_wildcard(self): + foo = ["apt=1.0*"] + self.assertEqual( + expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) + + def test_pkgname_wildcard_version_wildcard(self): + foo = ["apt*=1.0*"] + m_mock = mock.Mock() + self.assertEqual( + expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), + ['apt', 'apt-utils']) + + def test_pkgname_expands(self): + foo = ["apt*"] + m_mock = mock.Mock() + self.assertEqual( + expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), + ["apt", "apt-utils"]) diff --git a/test/units/modules/test_async_wrapper.py b/test/units/modules/test_async_wrapper.py new file mode 100644 index 00000000..762fc2fb --- /dev/null +++ b/test/units/modules/test_async_wrapper.py @@ -0,0 +1,57 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import os +import json +import shutil +import tempfile + +import pytest + +from units.compat.mock import patch, MagicMock +from ansible.modules import async_wrapper + +from pprint import pprint + + +class TestAsyncWrapper: + + def test_run_module(self, monkeypatch): + + def mock_get_interpreter(module_path): + return ['/usr/bin/python'] + + module_result = {'rc': 0} + module_lines = [ + '#!/usr/bin/python', + 'import sys', + 'sys.stderr.write("stderr stuff")', + "print('%s')" % json.dumps(module_result) + ] + module_data = '\n'.join(module_lines) + '\n' + module_data = module_data.encode('utf-8') + + workdir = tempfile.mkdtemp() + fh, fn = tempfile.mkstemp(dir=workdir) + + with open(fn, 'wb') as f: + f.write(module_data) + + command = fn + jobid = 0 + jobpath = os.path.join(os.path.dirname(command), 'job') + + monkeypatch.setattr(async_wrapper, '_get_interpreter', mock_get_interpreter) + + res = async_wrapper._run_module(command, jobid, jobpath) + + with open(os.path.join(workdir, 'job'), 'r') as f: + jres = json.loads(f.read()) + + shutil.rmtree(workdir) + + assert jres.get('rc') == 0 + assert jres.get('stderr') == 'stderr stuff' diff --git a/test/units/modules/test_copy.py b/test/units/modules/test_copy.py new file mode 100644 index 00000000..20c309b6 --- /dev/null +++ b/test/units/modules/test_copy.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Copyright: +# (c) 2018 Ansible Project +# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible.modules.copy import AnsibleModuleError, split_pre_existing_dir + +from ansible.module_utils.basic import AnsibleModule + + +THREE_DIRS_DATA = (('/dir1/dir2', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1', 'dir2']), + # 2 existing dirs: + ('/dir1', ['dir2']), + # 3 existing dirs: + ('/dir1/dir2', []) + ), + ('/dir1/dir2/', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1', 'dir2']), + # 2 existing dirs: + ('/dir1', ['dir2']), + # 3 existing dirs: + ('/dir1/dir2', []) + ), + ) + + +TWO_DIRS_DATA = (('dir1/dir2', + # 0 existing dirs: + ('.', ['dir1', 'dir2']), + # 1 existing dir: + ('dir1', ['dir2']), + # 2 existing dirs: + ('dir1/dir2', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ('dir1/dir2/', + # 0 existing dirs: + ('.', ['dir1', 'dir2']), + # 1 existing dir: + ('dir1', ['dir2']), + # 2 existing dirs: + ('dir1/dir2', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ('/dir1', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1']), + # 2 existing dirs: + ('/dir1', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ('/dir1/', + # 0 existing dirs: error (because / should always exist) + None, + # 1 existing dir: + ('/', ['dir1']), + # 2 existing dirs: + ('/dir1', []), + # 3 existing dirs: Same as 2 because we never get to the third + ), + ) + THREE_DIRS_DATA + + +ONE_DIR_DATA = (('dir1', + # 0 existing dirs: + ('.', ['dir1']), + # 1 existing dir: + ('dir1', []), + # 2 existing dirs: Same as 1 because we never get to the third + ), + ('dir1/', + # 0 existing dirs: + ('.', ['dir1']), + # 1 existing dir: + ('dir1', []), + # 2 existing dirs: Same as 1 because we never get to the third + ), + ) + TWO_DIRS_DATA + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[4]) for d in THREE_DIRS_DATA)) +def test_split_pre_existing_dir_three_levels_exist(directory, expected, mocker): + mocker.patch('os.path.exists', side_effect=[True, True, True]) + split_pre_existing_dir(directory) == expected + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[3]) for d in TWO_DIRS_DATA)) +def test_split_pre_existing_dir_two_levels_exist(directory, expected, mocker): + mocker.patch('os.path.exists', side_effect=[True, True, False]) + split_pre_existing_dir(directory) == expected + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[2]) for d in ONE_DIR_DATA)) +def test_split_pre_existing_dir_one_level_exists(directory, expected, mocker): + mocker.patch('os.path.exists', side_effect=[True, False, False]) + split_pre_existing_dir(directory) == expected + + +@pytest.mark.parametrize('directory', (d[0] for d in ONE_DIR_DATA if d[1] is None)) +def test_split_pre_existing_dir_root_does_not_exist(directory, mocker): + mocker.patch('os.path.exists', return_value=False) + with pytest.raises(AnsibleModuleError) as excinfo: + split_pre_existing_dir(directory) + assert excinfo.value.results['msg'].startswith("The '/' directory doesn't exist on this machine.") + + +@pytest.mark.parametrize('directory, expected', ((d[0], d[1]) for d in ONE_DIR_DATA if not d[0].startswith('/'))) +def test_split_pre_existing_dir_working_dir_exists(directory, expected, mocker): + mocker.patch('os.path.exists', return_value=False) + split_pre_existing_dir(directory) == expected + + +# +# Info helpful for making new test cases: +# +# base_mode = {'dir no perms': 0o040000, +# 'file no perms': 0o100000, +# 'dir all perms': 0o400000 | 0o777, +# 'file all perms': 0o100000, | 0o777} +# +# perm_bits = {'x': 0b001, +# 'w': 0b010, +# 'r': 0b100} +# +# role_shift = {'u': 6, +# 'g': 3, +# 'o': 0} + +DATA = ( # Going from no permissions to setting all for user, group, and/or other + (0o040000, u'a+rwx', 0o0777), + (0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777), + (0o040000, u'o+rwx', 0o0007), + (0o040000, u'g+rwx', 0o0070), + (0o040000, u'u+rwx', 0o0700), + + # Going from all permissions to none for user, group, and/or other + (0o040777, u'a-rwx', 0o0000), + (0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000), + (0o040777, u'o-rwx', 0o0770), + (0o040777, u'g-rwx', 0o0707), + (0o040777, u'u-rwx', 0o0077), + + # now using absolute assignment from None to a set of perms + (0o040000, u'a=rwx', 0o0777), + (0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777), + (0o040000, u'o=rwx', 0o0007), + (0o040000, u'g=rwx', 0o0070), + (0o040000, u'u=rwx', 0o0700), + + # X effect on files and dirs + (0o040000, u'a+X', 0o0111), + (0o100000, u'a+X', 0), + (0o040000, u'a=X', 0o0111), + (0o100000, u'a=X', 0), + (0o040777, u'a-X', 0o0666), + # Same as chmod but is it a bug? + # chmod a-X statfile <== removes execute from statfile + (0o100777, u'a-X', 0o0666), + + # Multiple permissions + (0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755), + (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644), +) + +UMASK_DATA = ( + (0o100000, '+rwx', 0o770), + (0o100777, '-rwx', 0o007), +) + +INVALID_DATA = ( + (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"), + (0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"), +) + + +@pytest.mark.parametrize('stat_info, mode_string, expected', DATA) +def test_good_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected + + +@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA) +def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_umask = mocker.patch('os.umask') + mock_umask.return_value = 0o7 + + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected + + +@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA) +def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + with pytest.raises(ValueError) as exc: + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah' + assert exc.match(expected) diff --git a/test/units/modules/test_iptables.py b/test/units/modules/test_iptables.py new file mode 100644 index 00000000..25a157e5 --- /dev/null +++ b/test/units/modules/test_iptables.py @@ -0,0 +1,919 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat.mock import patch +from ansible.module_utils import basic +from ansible.modules import iptables +from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args + + +def get_bin_path(*args, **kwargs): + return "/sbin/iptables" + + +def get_iptables_version(iptables_path, module): + return "1.8.2" + + +class TestIptables(ModuleTestCase): + + def setUp(self): + super(TestIptables, self).setUp() + self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path) + self.mock_get_bin_path.start() + self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone' + self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version) + self.mock_get_iptables_version.start() + self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone' + + def test_without_required_parameters(self): + """Failure must occurs when all parameters are missing""" + with self.assertRaises(AnsibleFailJson): + set_module_args({}) + iptables.main() + + def test_flush_table_without_chain(self): + """Test flush without chain, flush the table""" + set_module_args({ + 'flush': True, + }) + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.return_value = 0, '', '' # successful execution, no output + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables') + self.assertEqual(run_command.call_args[0][0][1], '-t') + self.assertEqual(run_command.call_args[0][0][2], 'filter') + self.assertEqual(run_command.call_args[0][0][3], '-F') + + def test_flush_table_check_true(self): + """Test flush without parameters and check == true""" + set_module_args({ + 'flush': True, + '_ansible_check_mode': True, + }) + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.return_value = 0, '', '' # successful execution, no output + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 0) + +# TODO ADD test flush table nat +# TODO ADD test flush with chain +# TODO ADD test flush with chain and table nat + + def test_policy_table(self): + """Test change policy of a chain""" + set_module_args({ + 'policy': 'ACCEPT', + 'chain': 'INPUT', + }) + commands_results = [ + (0, 'Chain INPUT (policy DROP)\n', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + # import pdb + # pdb.set_trace() + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-L', + 'INPUT', + ]) + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-P', + 'INPUT', + 'ACCEPT', + ]) + + def test_policy_table_no_change(self): + """Test don't change policy of a chain if the policy is right""" + set_module_args({ + 'policy': 'ACCEPT', + 'chain': 'INPUT', + }) + commands_results = [ + (0, 'Chain INPUT (policy ACCEPT)\n', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertFalse(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + # import pdb + # pdb.set_trace() + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-L', + 'INPUT', + ]) + + def test_policy_table_changed_false(self): + """Test flush without parameters and change == false""" + set_module_args({ + 'policy': 'ACCEPT', + 'chain': 'INPUT', + '_ansible_check_mode': True, + }) + commands_results = [ + (0, 'Chain INPUT (policy DROP)\n', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + # import pdb + # pdb.set_trace() + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-L', + 'INPUT', + ]) + +# TODO ADD test policy without chain fail +# TODO ADD test policy with chain don't exists +# TODO ADD test policy with wrong choice fail + + def test_insert_rule_change_false(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'OUTPUT', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'ACCEPT', + 'action': 'insert', + '_ansible_check_mode': True, + }) + + commands_results = [ + (1, '', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + # import pdb + # pdb.set_trace() + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + + def test_insert_rule(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'OUTPUT', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'ACCEPT', + 'action': 'insert' + }) + + commands_results = [ + (1, '', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + # import pdb + # pdb.set_trace() + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-I', + 'OUTPUT', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + + def test_append_rule_check_mode(self): + """Test append a redirection rule in check mode""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'REDIRECT', + 'table': 'nat', + 'to_destination': '5.5.5.5/32', + 'protocol': 'udp', + 'destination_port': '22', + 'to_ports': '8600', + '_ansible_check_mode': True, + }) + + commands_results = [ + (1, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'REDIRECT', + '--to-destination', + '5.5.5.5/32', + '--destination-port', + '22', + '--to-ports', + '8600' + ]) + + def test_append_rule(self): + """Test append a redirection rule""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'REDIRECT', + 'table': 'nat', + 'to_destination': '5.5.5.5/32', + 'protocol': 'udp', + 'destination_port': '22', + 'to_ports': '8600' + }) + + commands_results = [ + (1, '', ''), + (0, '', '') + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'REDIRECT', + '--to-destination', + '5.5.5.5/32', + '--destination-port', + '22', + '--to-ports', + '8600' + ]) + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-A', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'REDIRECT', + '--to-destination', + '5.5.5.5/32', + '--destination-port', + '22', + '--to-ports', + '8600' + ]) + + def test_remove_rule(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'SNAT', + 'table': 'nat', + 'to_source': '5.5.5.5/32', + 'protocol': 'udp', + 'source_port': '22', + 'to_ports': '8600', + 'state': 'absent', + 'in_interface': 'eth0', + 'out_interface': 'eth1', + 'comment': 'this is a comment' + }) + + commands_results = [ + (0, '', ''), + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 2) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'SNAT', + '--to-source', + '5.5.5.5/32', + '-i', + 'eth0', + '-o', + 'eth1', + '--source-port', + '22', + '--to-ports', + '8600', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + self.assertEqual(run_command.call_args_list[1][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-D', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'SNAT', + '--to-source', + '5.5.5.5/32', + '-i', + 'eth0', + '-o', + 'eth1', + '--source-port', + '22', + '--to-ports', + '8600', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + + def test_remove_rule_check_mode(self): + """Test flush without parameters check mode""" + set_module_args({ + 'chain': 'PREROUTING', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'SNAT', + 'table': 'nat', + 'to_source': '5.5.5.5/32', + 'protocol': 'udp', + 'source_port': '22', + 'to_ports': '8600', + 'state': 'absent', + 'in_interface': 'eth0', + 'out_interface': 'eth1', + 'comment': 'this is a comment', + '_ansible_check_mode': True, + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'nat', + '-C', + 'PREROUTING', + '-p', + 'udp', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'SNAT', + '--to-source', + '5.5.5.5/32', + '-i', + 'eth0', + '-o', + 'eth1', + '--source-port', + '22', + '--to-ports', + '8600', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + + def test_insert_with_reject(self): + """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """ + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'tcp', + 'reject_with': 'tcp-reset', + 'ip_version': 'ipv4', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-p', + 'tcp', + '-j', + 'REJECT', + '--reject-with', + 'tcp-reset', + ]) + + def test_insert_jump_reject_with_reject(self): + """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """ + set_module_args({ + 'chain': 'INPUT', + 'protocol': 'tcp', + 'jump': 'REJECT', + 'reject_with': 'tcp-reset', + 'ip_version': 'ipv4', + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-p', + 'tcp', + '-j', + 'REJECT', + '--reject-with', + 'tcp-reset', + ]) + + def test_jump_tee_gateway_negative(self): + """ Missing gateway when JUMP is set to TEE """ + set_module_args({ + 'table': 'mangle', + 'chain': 'PREROUTING', + 'in_interface': 'eth0', + 'protocol': 'udp', + 'match': 'state', + 'jump': 'TEE', + 'ctstate': ['NEW'], + 'destination_port': '9521', + 'destination': '127.0.0.1' + }) + + with self.assertRaises(AnsibleFailJson) as e: + iptables.main() + self.assertTrue(e.exception.args[0]['failed']) + self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway') + + def test_jump_tee_gateway(self): + """ Using gateway when JUMP is set to TEE """ + set_module_args({ + 'table': 'mangle', + 'chain': 'PREROUTING', + 'in_interface': 'eth0', + 'protocol': 'udp', + 'match': 'state', + 'jump': 'TEE', + 'ctstate': ['NEW'], + 'destination_port': '9521', + 'gateway': '192.168.10.1', + 'destination': '127.0.0.1' + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'mangle', + '-C', 'PREROUTING', + '-p', 'udp', + '-d', '127.0.0.1', + '-m', 'state', + '-j', 'TEE', + '--gateway', '192.168.10.1', + '-i', 'eth0', + '--destination-port', '9521', + '--state', 'NEW' + ]) + + def test_tcp_flags(self): + """ Test various ways of inputting tcp_flags """ + args = [ + { + 'chain': 'OUTPUT', + 'protocol': 'tcp', + 'jump': 'DROP', + 'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"' + }, + { + 'chain': 'OUTPUT', + 'protocol': 'tcp', + 'jump': 'DROP', + 'tcp_flags': { + 'flags': 'ALL', + 'flags_set': 'ACK,RST,SYN,FIN' + } + }, + { + 'chain': 'OUTPUT', + 'protocol': 'tcp', + 'jump': 'DROP', + 'tcp_flags': { + 'flags': ['ALL'], + 'flags_set': ['ACK', 'RST', 'SYN', 'FIN'] + } + }, + + ] + + for item in args: + set_module_args(item) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-p', + 'tcp', + '--tcp-flags', + 'ALL', + 'ACK,RST,SYN,FIN', + '-j', + 'DROP' + ]) + + def test_log_level(self): + """ Test various ways of log level flag """ + + log_levels = ['0', '1', '2', '3', '4', '5', '6', '7', + 'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug'] + + for log_lvl in log_levels: + set_module_args({ + 'chain': 'INPUT', + 'jump': 'LOG', + 'log_level': log_lvl, + 'source': '1.2.3.4/32', + 'log_prefix': '** DROP-this_ip **' + }) + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', 'filter', + '-C', 'INPUT', + '-s', '1.2.3.4/32', + '-j', 'LOG', + '--log-prefix', '** DROP-this_ip **', + '--log-level', log_lvl + ]) + + def test_iprange(self): + """ Test iprange module with its flags src_range and dst_range """ + set_module_args({ + 'chain': 'INPUT', + 'match': ['iprange'], + 'src_range': '192.168.1.100-192.168.1.199', + 'jump': 'ACCEPT' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-m', + 'iprange', + '-j', + 'ACCEPT', + '--src-range', + '192.168.1.100-192.168.1.199', + ]) + + set_module_args({ + 'chain': 'INPUT', + 'src_range': '192.168.1.100-192.168.1.199', + 'dst_range': '10.0.0.50-10.0.0.100', + 'jump': 'ACCEPT' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-j', + 'ACCEPT', + '-m', + 'iprange', + '--src-range', + '192.168.1.100-192.168.1.199', + '--dst-range', + '10.0.0.50-10.0.0.100' + ]) + + set_module_args({ + 'chain': 'INPUT', + 'dst_range': '10.0.0.50-10.0.0.100', + 'jump': 'ACCEPT' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-j', + 'ACCEPT', + '-m', + 'iprange', + '--dst-range', + '10.0.0.50-10.0.0.100' + ]) + + def test_insert_rule_with_wait(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'OUTPUT', + 'source': '1.2.3.4/32', + 'destination': '7.8.9.10/42', + 'jump': 'ACCEPT', + 'action': 'insert', + 'wait': '10' + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'OUTPUT', + '-w', + '10', + '-s', + '1.2.3.4/32', + '-d', + '7.8.9.10/42', + '-j', + 'ACCEPT' + ]) + + def test_comment_position_at_end(self): + """Test flush without parameters""" + set_module_args({ + 'chain': 'INPUT', + 'jump': 'ACCEPT', + 'action': 'insert', + 'ctstate': ['NEW'], + 'comment': 'this is a comment', + '_ansible_check_mode': True, + }) + + commands_results = [ + (0, '', ''), + ] + + with patch.object(basic.AnsibleModule, 'run_command') as run_command: + run_command.side_effect = commands_results + with self.assertRaises(AnsibleExitJson) as result: + iptables.main() + self.assertTrue(result.exception.args[0]['changed']) + + self.assertEqual(run_command.call_count, 1) + self.assertEqual(run_command.call_args_list[0][0][0], [ + '/sbin/iptables', + '-t', + 'filter', + '-C', + 'INPUT', + '-j', + 'ACCEPT', + '-m', + 'conntrack', + '--ctstate', + 'NEW', + '-m', + 'comment', + '--comment', + 'this is a comment' + ]) + self.assertEqual(run_command.call_args[0][0][14], 'this is a comment') diff --git a/test/units/modules/test_known_hosts.py b/test/units/modules/test_known_hosts.py new file mode 100644 index 00000000..3b6dfd86 --- /dev/null +++ b/test/units/modules/test_known_hosts.py @@ -0,0 +1,110 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import tempfile +from ansible.module_utils import basic + +from units.compat import unittest +from ansible.module_utils._text import to_bytes +from ansible.module_utils.basic import AnsibleModule + +from ansible.modules.known_hosts import compute_diff, sanity_check + + +class KnownHostsDiffTestCase(unittest.TestCase): + + def _create_file(self, content): + tmp_file = tempfile.NamedTemporaryFile(prefix='ansible-test-', suffix='-known_hosts', delete=False) + tmp_file.write(to_bytes(content)) + tmp_file.close() + self.addCleanup(os.unlink, tmp_file.name) + return tmp_file.name + + def test_no_existing_file(self): + path = tempfile.mktemp(prefix='ansible-test-', suffix='-known_hosts') + key = 'example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key) + self.assertEqual(diff, { + 'before_header': '/dev/null', + 'after_header': path, + 'before': '', + 'after': 'example.com ssh-rsa AAAAetc\n', + }) + + def test_key_addition(self): + path = self._create_file( + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'two.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n', + }) + + def test_no_change(self): + path = self._create_file( + 'one.example.com ssh-rsa AAAAetc\n' + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=1, replace_or_add=False, state='present', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n', + 'after': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n', + }) + + def test_key_change(self): + path = self._create_file( + 'one.example.com ssh-rsa AAAaetc\n' + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=1, replace_or_add=True, state='present', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'one.example.com ssh-rsa AAAaetc\ntwo.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n', + }) + + def test_key_removal(self): + path = self._create_file( + 'one.example.com ssh-rsa AAAAetc\n' + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=1, replace_or_add=False, state='absent', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\n', + }) + + def test_key_removal_no_change(self): + path = self._create_file( + 'two.example.com ssh-rsa BBBBetc\n' + ) + key = 'one.example.com ssh-rsa AAAAetc\n' + diff = compute_diff(path, found_line=None, replace_or_add=False, state='absent', key=key) + self.assertEqual(diff, { + 'before_header': path, + 'after_header': path, + 'before': 'two.example.com ssh-rsa BBBBetc\n', + 'after': 'two.example.com ssh-rsa BBBBetc\n', + }) + + def test_sanity_check(self): + basic._load_params = lambda: {} + # Module used internally to execute ssh-keygen system executable + module = AnsibleModule(argument_spec={}) + host = '10.0.0.1' + key = '%s ssh-rsa ASDF foo@bar' % (host,) + keygen = module.get_bin_path('ssh-keygen') + sanity_check(module, host, key, keygen) diff --git a/test/units/modules/test_pip.py b/test/units/modules/test_pip.py new file mode 100644 index 00000000..7f0f8b07 --- /dev/null +++ b/test/units/modules/test_pip.py @@ -0,0 +1,38 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +import pytest + +from ansible.modules import pip + + +pytestmark = pytest.mark.usefixtures('patch_ansible_module') + + +@pytest.mark.parametrize('patch_ansible_module', [{'name': 'six'}], indirect=['patch_ansible_module']) +def test_failure_when_pip_absent(mocker, capfd): + get_bin_path = mocker.patch('ansible.module_utils.basic.AnsibleModule.get_bin_path') + get_bin_path.return_value = None + + with pytest.raises(SystemExit): + pip.main() + + out, err = capfd.readouterr() + results = json.loads(out) + assert results['failed'] + assert 'pip needs to be installed' in results['msg'] + + +@pytest.mark.parametrize('patch_ansible_module, test_input, expected', [ + [None, ['django>1.11.1', '<1.11.2', 'ipaddress', 'simpleproject<2.0.0', '>1.1.0'], + ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']], + [None, ['django>1.11.1,<1.11.2,ipaddress', 'simpleproject<2.0.0,>1.1.0'], + ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']], + [None, ['django>1.11.1', '<1.11.2', 'ipaddress,simpleproject<2.0.0,>1.1.0'], + ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']]]) +def test_recover_package_name(test_input, expected): + assert pip._recover_package_name(test_input) == expected diff --git a/test/units/modules/test_systemd.py b/test/units/modules/test_systemd.py new file mode 100644 index 00000000..52c212a0 --- /dev/null +++ b/test/units/modules/test_systemd.py @@ -0,0 +1,52 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.modules.systemd import parse_systemctl_show + + +class ParseSystemctlShowTestCase(unittest.TestCase): + + def test_simple(self): + lines = [ + 'Type=simple', + 'Restart=no', + 'Requires=system.slice sysinit.target', + 'Description=Blah blah blah', + ] + parsed = parse_systemctl_show(lines) + self.assertEqual(parsed, { + 'Type': 'simple', + 'Restart': 'no', + 'Requires': 'system.slice sysinit.target', + 'Description': 'Blah blah blah', + }) + + def test_multiline_exec(self): + # This was taken from a real service that specified "ExecStart=/bin/echo foo\nbar" + lines = [ + 'Type=simple', + 'ExecStart={ path=/bin/echo ; argv[]=/bin/echo foo', + 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }', + 'Description=blah', + ] + parsed = parse_systemctl_show(lines) + self.assertEqual(parsed, { + 'Type': 'simple', + 'ExecStart': '{ path=/bin/echo ; argv[]=/bin/echo foo\n' + 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }', + 'Description': 'blah', + }) + + def test_single_line_with_brace(self): + lines = [ + 'Type=simple', + 'Description={ this is confusing', + 'Restart=no', + ] + parsed = parse_systemctl_show(lines) + self.assertEqual(parsed, { + 'Type': 'simple', + 'Description': '{ this is confusing', + 'Restart': 'no', + }) diff --git a/test/units/modules/test_yum.py b/test/units/modules/test_yum.py new file mode 100644 index 00000000..e5d601a6 --- /dev/null +++ b/test/units/modules/test_yum.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest + +from ansible.modules.yum import YumModule + + +yum_plugin_load_error = """ +Plugin "product-id" can't be imported +Plugin "search-disabled-repos" can't be imported +Plugin "subscription-manager" can't be imported +Plugin "product-id" can't be imported +Plugin "search-disabled-repos" can't be imported +Plugin "subscription-manager" can't be imported +""" + +# from https://github.com/ansible/ansible/issues/20608#issuecomment-276106505 +wrapped_output_1 = """ +Загружены модули: fastestmirror +Loading mirror speeds from cached hostfile + * base: mirror.h1host.ru + * extras: mirror.h1host.ru + * updates: mirror.h1host.ru + +vms-agent.x86_64 0.0-9 dev +""" + +# from https://github.com/ansible/ansible/issues/20608#issuecomment-276971275 +wrapped_output_2 = """ +Загружены модули: fastestmirror +Loading mirror speeds from cached hostfile + * base: mirror.corbina.net + * extras: mirror.corbina.net + * updates: mirror.corbina.net + +empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty.x86_64 + 0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1-0 + addons +libtiff.x86_64 4.0.3-27.el7_3 updates +""" + +# From https://github.com/ansible/ansible/issues/20608#issuecomment-276698431 +wrapped_output_3 = """ +Loaded plugins: fastestmirror, langpacks +Loading mirror speeds from cached hostfile + +ceph.x86_64 1:11.2.0-0.el7 ceph +ceph-base.x86_64 1:11.2.0-0.el7 ceph +ceph-common.x86_64 1:11.2.0-0.el7 ceph +ceph-mds.x86_64 1:11.2.0-0.el7 ceph +ceph-mon.x86_64 1:11.2.0-0.el7 ceph +ceph-osd.x86_64 1:11.2.0-0.el7 ceph +ceph-selinux.x86_64 1:11.2.0-0.el7 ceph +libcephfs1.x86_64 1:11.0.2-0.el7 ceph +librados2.x86_64 1:11.2.0-0.el7 ceph +libradosstriper1.x86_64 1:11.2.0-0.el7 ceph +librbd1.x86_64 1:11.2.0-0.el7 ceph +librgw2.x86_64 1:11.2.0-0.el7 ceph +python-cephfs.x86_64 1:11.2.0-0.el7 ceph +python-rados.x86_64 1:11.2.0-0.el7 ceph +python-rbd.x86_64 1:11.2.0-0.el7 ceph +""" + +# from https://github.com/ansible/ansible-modules-core/issues/4318#issuecomment-251416661 +wrapped_output_4 = """ +ipxe-roms-qemu.noarch 20160127-1.git6366fa7a.el7 + rhelosp-9.0-director-puddle +quota.x86_64 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z +quota-nls.noarch 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z +rdma.noarch 7.2_4.1_rc6-2.el7 rhelosp-rhel-7.2-z +screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2 + rhelosp-rhel-7.2-z +sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle +sssd-client.x86_64 1.13.0-40.el7_2.12 rhelosp-rhel-7.2-z +""" + + +# A 'normal-ish' yum check-update output, without any wrapped lines +unwrapped_output_rhel7 = """ + +Loaded plugins: etckeeper, product-id, search-disabled-repos, subscription- + : manager +This system is not registered to Red Hat Subscription Management. You can use subscription-manager to register. + +NetworkManager-openvpn.x86_64 1:1.2.6-1.el7 epel +NetworkManager-openvpn-gnome.x86_64 1:1.2.6-1.el7 epel +cabal-install.x86_64 1.16.1.0-2.el7 epel +cgit.x86_64 1.1-1.el7 epel +python34-libs.x86_64 3.4.5-3.el7 epel +python34-test.x86_64 3.4.5-3.el7 epel +python34-tkinter.x86_64 3.4.5-3.el7 epel +python34-tools.x86_64 3.4.5-3.el7 epel +qgit.x86_64 2.6-4.el7 epel +rdiff-backup.x86_64 1.2.8-12.el7 epel +stoken-libs.x86_64 0.91-1.el7 epel +xlockmore.x86_64 5.49-2.el7 epel +""" + +# Some wrapped obsoletes for prepending to output for testing both +wrapped_output_rhel7_obsoletes_postfix = """ +Obsoleting Packages +ddashboard.x86_64 0.2.0.1-1.el7_3 mhlavink-developerdashboard + developerdashboard.x86_64 0.1.12.2-1.el7_2 @mhlavink-developerdashboard +python-bugzilla.noarch 1.2.2-3.el7_2.1 mhlavink-developerdashboard + python-bugzilla-develdashboardfixes.noarch + 1.2.2-3.el7 @mhlavink-developerdashboard +python2-futures.noarch 3.0.5-1.el7 epel + python-futures.noarch 3.0.3-1.el7 @epel +python2-pip.noarch 8.1.2-5.el7 epel + python-pip.noarch 7.1.0-1.el7 @epel +python2-pyxdg.noarch 0.25-6.el7 epel + pyxdg.noarch 0.25-5.el7 @epel +python2-simplejson.x86_64 3.10.0-1.el7 epel + python-simplejson.x86_64 3.3.3-1.el7 @epel +Security: kernel-3.10.0-327.28.2.el7.x86_64 is an installed security update +Security: kernel-3.10.0-327.22.2.el7.x86_64 is the currently running version +""" + +longname = """ +Loaded plugins: fastestmirror, priorities, rhnplugin +This system is receiving updates from RHN Classic or Red Hat Satellite. +Loading mirror speeds from cached hostfile + +xxxxxxxxxxxxxxxxxxxxxxxxxx.noarch + 1.16-1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +glibc.x86_64 2.17-157.el7_3.1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx""" + + +unwrapped_output_rhel7_obsoletes = unwrapped_output_rhel7 + wrapped_output_rhel7_obsoletes_postfix +unwrapped_output_rhel7_expected_new_obsoletes_pkgs = [ + "ddashboard", "python-bugzilla", "python2-futures", "python2-pip", + "python2-pyxdg", "python2-simplejson" +] +unwrapped_output_rhel7_expected_old_obsoletes_pkgs = [ + "developerdashboard", "python-bugzilla-develdashboardfixes", + "python-futures", "python-pip", "pyxdg", "python-simplejson" +] +unwrapped_output_rhel7_expected_updated_pkgs = [ + "NetworkManager-openvpn", "NetworkManager-openvpn-gnome", "cabal-install", + "cgit", "python34-libs", "python34-test", "python34-tkinter", + "python34-tools", "qgit", "rdiff-backup", "stoken-libs", "xlockmore" +] + + +class TestYumUpdateCheckParse(unittest.TestCase): + def _assert_expected(self, expected_pkgs, result): + + for expected_pkg in expected_pkgs: + self.assertIn(expected_pkg, result) + self.assertEqual(len(result), len(expected_pkgs)) + self.assertIsInstance(result, dict) + + def test_empty_output(self): + res, obs = YumModule.parse_check_update("") + expected_pkgs = [] + self._assert_expected(expected_pkgs, res) + + def test_longname(self): + res, obs = YumModule.parse_check_update(longname) + expected_pkgs = ['xxxxxxxxxxxxxxxxxxxxxxxxxx', 'glibc'] + self._assert_expected(expected_pkgs, res) + + def test_plugin_load_error(self): + res, obs = YumModule.parse_check_update(yum_plugin_load_error) + expected_pkgs = [] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_1(self): + res, obs = YumModule.parse_check_update(wrapped_output_1) + expected_pkgs = ["vms-agent"] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_2(self): + res, obs = YumModule.parse_check_update(wrapped_output_2) + expected_pkgs = ["empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty", + "libtiff"] + + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_3(self): + res, obs = YumModule.parse_check_update(wrapped_output_3) + expected_pkgs = ["ceph", "ceph-base", "ceph-common", "ceph-mds", + "ceph-mon", "ceph-osd", "ceph-selinux", "libcephfs1", + "librados2", "libradosstriper1", "librbd1", "librgw2", + "python-cephfs", "python-rados", "python-rbd"] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_4(self): + res, obs = YumModule.parse_check_update(wrapped_output_4) + + expected_pkgs = ["ipxe-roms-qemu", "quota", "quota-nls", "rdma", "screen", + "sos", "sssd-client"] + self._assert_expected(expected_pkgs, res) + + def test_wrapped_output_rhel7(self): + res, obs = YumModule.parse_check_update(unwrapped_output_rhel7) + self._assert_expected(unwrapped_output_rhel7_expected_updated_pkgs, res) + + def test_wrapped_output_rhel7_obsoletes(self): + res, obs = YumModule.parse_check_update(unwrapped_output_rhel7_obsoletes) + self._assert_expected( + unwrapped_output_rhel7_expected_updated_pkgs + unwrapped_output_rhel7_expected_new_obsoletes_pkgs, + res + ) + self._assert_expected(unwrapped_output_rhel7_expected_old_obsoletes_pkgs, obs) diff --git a/test/units/modules/utils.py b/test/units/modules/utils.py new file mode 100644 index 00000000..6d169e36 --- /dev/null +++ b/test/units/modules/utils.py @@ -0,0 +1,50 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +from units.compat import unittest +from units.compat.mock import patch +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes + + +def set_module_args(args): + if '_ansible_remote_tmp' not in args: + args['_ansible_remote_tmp'] = '/tmp' + if '_ansible_keep_remote_files' not in args: + args['_ansible_keep_remote_files'] = False + + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) + + +class AnsibleExitJson(Exception): + pass + + +class AnsibleFailJson(Exception): + pass + + +def exit_json(*args, **kwargs): + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class ModuleTestCase(unittest.TestCase): + + def setUp(self): + self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json) + self.mock_module.start() + self.mock_sleep = patch('time.sleep') + self.mock_sleep.start() + set_module_args({}) + self.addCleanup(self.mock_module.stop) + self.addCleanup(self.mock_sleep.stop) |