summaryrefslogtreecommitdiffstats
path: root/test/units/modules
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/units/modules/__init__.py0
-rw-r--r--test/units/modules/conftest.py31
-rw-r--r--test/units/modules/test_apt.py53
-rw-r--r--test/units/modules/test_async_wrapper.py57
-rw-r--r--test/units/modules/test_copy.py215
-rw-r--r--test/units/modules/test_iptables.py919
-rw-r--r--test/units/modules/test_known_hosts.py110
-rw-r--r--test/units/modules/test_pip.py38
-rw-r--r--test/units/modules/test_systemd.py52
-rw-r--r--test/units/modules/test_yum.py207
-rw-r--r--test/units/modules/utils.py50
11 files changed, 1732 insertions, 0 deletions
diff --git a/test/units/modules/__init__.py b/test/units/modules/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/units/modules/__init__.py
diff --git a/test/units/modules/conftest.py b/test/units/modules/conftest.py
new file mode 100644
index 00000000..a7d1e047
--- /dev/null
+++ b/test/units/modules/conftest.py
@@ -0,0 +1,31 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.module_utils.six import string_types
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils.common._collections_compat import MutableMapping
+
+
+@pytest.fixture
+def patch_ansible_module(request, mocker):
+ if isinstance(request.param, string_types):
+ args = request.param
+ elif isinstance(request.param, MutableMapping):
+ if 'ANSIBLE_MODULE_ARGS' not in request.param:
+ request.param = {'ANSIBLE_MODULE_ARGS': request.param}
+ if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
+ request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
+ if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
+ request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
+ args = json.dumps(request.param)
+ else:
+ raise Exception('Malformed data to the patch_ansible_module pytest fixture')
+
+ mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))
diff --git a/test/units/modules/test_apt.py b/test/units/modules/test_apt.py
new file mode 100644
index 00000000..3daf3c11
--- /dev/null
+++ b/test/units/modules/test_apt.py
@@ -0,0 +1,53 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import collections
+import sys
+
+from units.compat import mock
+from units.compat import unittest
+
+try:
+ from ansible.modules.apt import (
+ expand_pkgspec_from_fnmatches,
+ )
+except Exception:
+ # Need some more module_utils work (porting urls.py) before we can test
+ # modules. So don't error out in this case.
+ if sys.version_info[0] >= 3:
+ pass
+
+
+class AptExpandPkgspecTestCase(unittest.TestCase):
+
+ def setUp(self):
+ FakePackage = collections.namedtuple("Package", ("name",))
+ self.fake_cache = [
+ FakePackage("apt"),
+ FakePackage("apt-utils"),
+ FakePackage("not-selected"),
+ ]
+
+ def test_trivial(self):
+ foo = ["apt"]
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo)
+
+ def test_version_wildcard(self):
+ foo = ["apt=1.0*"]
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo)
+
+ def test_pkgname_wildcard_version_wildcard(self):
+ foo = ["apt*=1.0*"]
+ m_mock = mock.Mock()
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache),
+ ['apt', 'apt-utils'])
+
+ def test_pkgname_expands(self):
+ foo = ["apt*"]
+ m_mock = mock.Mock()
+ self.assertEqual(
+ expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache),
+ ["apt", "apt-utils"])
diff --git a/test/units/modules/test_async_wrapper.py b/test/units/modules/test_async_wrapper.py
new file mode 100644
index 00000000..762fc2fb
--- /dev/null
+++ b/test/units/modules/test_async_wrapper.py
@@ -0,0 +1,57 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+import os
+import json
+import shutil
+import tempfile
+
+import pytest
+
+from units.compat.mock import patch, MagicMock
+from ansible.modules import async_wrapper
+
+from pprint import pprint
+
+
+class TestAsyncWrapper:
+
+ def test_run_module(self, monkeypatch):
+
+ def mock_get_interpreter(module_path):
+ return ['/usr/bin/python']
+
+ module_result = {'rc': 0}
+ module_lines = [
+ '#!/usr/bin/python',
+ 'import sys',
+ 'sys.stderr.write("stderr stuff")',
+ "print('%s')" % json.dumps(module_result)
+ ]
+ module_data = '\n'.join(module_lines) + '\n'
+ module_data = module_data.encode('utf-8')
+
+ workdir = tempfile.mkdtemp()
+ fh, fn = tempfile.mkstemp(dir=workdir)
+
+ with open(fn, 'wb') as f:
+ f.write(module_data)
+
+ command = fn
+ jobid = 0
+ jobpath = os.path.join(os.path.dirname(command), 'job')
+
+ monkeypatch.setattr(async_wrapper, '_get_interpreter', mock_get_interpreter)
+
+ res = async_wrapper._run_module(command, jobid, jobpath)
+
+ with open(os.path.join(workdir, 'job'), 'r') as f:
+ jres = json.loads(f.read())
+
+ shutil.rmtree(workdir)
+
+ assert jres.get('rc') == 0
+ assert jres.get('stderr') == 'stderr stuff'
diff --git a/test/units/modules/test_copy.py b/test/units/modules/test_copy.py
new file mode 100644
index 00000000..20c309b6
--- /dev/null
+++ b/test/units/modules/test_copy.py
@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+# Copyright:
+# (c) 2018 Ansible Project
+# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible.modules.copy import AnsibleModuleError, split_pre_existing_dir
+
+from ansible.module_utils.basic import AnsibleModule
+
+
+THREE_DIRS_DATA = (('/dir1/dir2',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1', 'dir2']),
+ # 2 existing dirs:
+ ('/dir1', ['dir2']),
+ # 3 existing dirs:
+ ('/dir1/dir2', [])
+ ),
+ ('/dir1/dir2/',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1', 'dir2']),
+ # 2 existing dirs:
+ ('/dir1', ['dir2']),
+ # 3 existing dirs:
+ ('/dir1/dir2', [])
+ ),
+ )
+
+
+TWO_DIRS_DATA = (('dir1/dir2',
+ # 0 existing dirs:
+ ('.', ['dir1', 'dir2']),
+ # 1 existing dir:
+ ('dir1', ['dir2']),
+ # 2 existing dirs:
+ ('dir1/dir2', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ('dir1/dir2/',
+ # 0 existing dirs:
+ ('.', ['dir1', 'dir2']),
+ # 1 existing dir:
+ ('dir1', ['dir2']),
+ # 2 existing dirs:
+ ('dir1/dir2', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ('/dir1',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1']),
+ # 2 existing dirs:
+ ('/dir1', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ('/dir1/',
+ # 0 existing dirs: error (because / should always exist)
+ None,
+ # 1 existing dir:
+ ('/', ['dir1']),
+ # 2 existing dirs:
+ ('/dir1', []),
+ # 3 existing dirs: Same as 2 because we never get to the third
+ ),
+ ) + THREE_DIRS_DATA
+
+
+ONE_DIR_DATA = (('dir1',
+ # 0 existing dirs:
+ ('.', ['dir1']),
+ # 1 existing dir:
+ ('dir1', []),
+ # 2 existing dirs: Same as 1 because we never get to the third
+ ),
+ ('dir1/',
+ # 0 existing dirs:
+ ('.', ['dir1']),
+ # 1 existing dir:
+ ('dir1', []),
+ # 2 existing dirs: Same as 1 because we never get to the third
+ ),
+ ) + TWO_DIRS_DATA
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[4]) for d in THREE_DIRS_DATA))
+def test_split_pre_existing_dir_three_levels_exist(directory, expected, mocker):
+ mocker.patch('os.path.exists', side_effect=[True, True, True])
+ split_pre_existing_dir(directory) == expected
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[3]) for d in TWO_DIRS_DATA))
+def test_split_pre_existing_dir_two_levels_exist(directory, expected, mocker):
+ mocker.patch('os.path.exists', side_effect=[True, True, False])
+ split_pre_existing_dir(directory) == expected
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[2]) for d in ONE_DIR_DATA))
+def test_split_pre_existing_dir_one_level_exists(directory, expected, mocker):
+ mocker.patch('os.path.exists', side_effect=[True, False, False])
+ split_pre_existing_dir(directory) == expected
+
+
+@pytest.mark.parametrize('directory', (d[0] for d in ONE_DIR_DATA if d[1] is None))
+def test_split_pre_existing_dir_root_does_not_exist(directory, mocker):
+ mocker.patch('os.path.exists', return_value=False)
+ with pytest.raises(AnsibleModuleError) as excinfo:
+ split_pre_existing_dir(directory)
+ assert excinfo.value.results['msg'].startswith("The '/' directory doesn't exist on this machine.")
+
+
+@pytest.mark.parametrize('directory, expected', ((d[0], d[1]) for d in ONE_DIR_DATA if not d[0].startswith('/')))
+def test_split_pre_existing_dir_working_dir_exists(directory, expected, mocker):
+ mocker.patch('os.path.exists', return_value=False)
+ split_pre_existing_dir(directory) == expected
+
+
+#
+# Info helpful for making new test cases:
+#
+# base_mode = {'dir no perms': 0o040000,
+# 'file no perms': 0o100000,
+# 'dir all perms': 0o400000 | 0o777,
+# 'file all perms': 0o100000, | 0o777}
+#
+# perm_bits = {'x': 0b001,
+# 'w': 0b010,
+# 'r': 0b100}
+#
+# role_shift = {'u': 6,
+# 'g': 3,
+# 'o': 0}
+
+DATA = ( # Going from no permissions to setting all for user, group, and/or other
+ (0o040000, u'a+rwx', 0o0777),
+ (0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777),
+ (0o040000, u'o+rwx', 0o0007),
+ (0o040000, u'g+rwx', 0o0070),
+ (0o040000, u'u+rwx', 0o0700),
+
+ # Going from all permissions to none for user, group, and/or other
+ (0o040777, u'a-rwx', 0o0000),
+ (0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000),
+ (0o040777, u'o-rwx', 0o0770),
+ (0o040777, u'g-rwx', 0o0707),
+ (0o040777, u'u-rwx', 0o0077),
+
+ # now using absolute assignment from None to a set of perms
+ (0o040000, u'a=rwx', 0o0777),
+ (0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777),
+ (0o040000, u'o=rwx', 0o0007),
+ (0o040000, u'g=rwx', 0o0070),
+ (0o040000, u'u=rwx', 0o0700),
+
+ # X effect on files and dirs
+ (0o040000, u'a+X', 0o0111),
+ (0o100000, u'a+X', 0),
+ (0o040000, u'a=X', 0o0111),
+ (0o100000, u'a=X', 0),
+ (0o040777, u'a-X', 0o0666),
+ # Same as chmod but is it a bug?
+ # chmod a-X statfile <== removes execute from statfile
+ (0o100777, u'a-X', 0o0666),
+
+ # Multiple permissions
+ (0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755),
+ (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644),
+)
+
+UMASK_DATA = (
+ (0o100000, '+rwx', 0o770),
+ (0o100777, '-rwx', 0o007),
+)
+
+INVALID_DATA = (
+ (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"),
+ (0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"),
+)
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', DATA)
+def test_good_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA)
+def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_umask = mocker.patch('os.umask')
+ mock_umask.return_value = 0o7
+
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA)
+def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+ with pytest.raises(ValueError) as exc:
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah'
+ assert exc.match(expected)
diff --git a/test/units/modules/test_iptables.py b/test/units/modules/test_iptables.py
new file mode 100644
index 00000000..25a157e5
--- /dev/null
+++ b/test/units/modules/test_iptables.py
@@ -0,0 +1,919 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat.mock import patch
+from ansible.module_utils import basic
+from ansible.modules import iptables
+from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
+
+
+def get_bin_path(*args, **kwargs):
+ return "/sbin/iptables"
+
+
+def get_iptables_version(iptables_path, module):
+ return "1.8.2"
+
+
+class TestIptables(ModuleTestCase):
+
+ def setUp(self):
+ super(TestIptables, self).setUp()
+ self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path)
+ self.mock_get_bin_path.start()
+ self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone'
+ self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version)
+ self.mock_get_iptables_version.start()
+ self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone'
+
+ def test_without_required_parameters(self):
+ """Failure must occurs when all parameters are missing"""
+ with self.assertRaises(AnsibleFailJson):
+ set_module_args({})
+ iptables.main()
+
+ def test_flush_table_without_chain(self):
+ """Test flush without chain, flush the table"""
+ set_module_args({
+ 'flush': True,
+ })
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.return_value = 0, '', '' # successful execution, no output
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables')
+ self.assertEqual(run_command.call_args[0][0][1], '-t')
+ self.assertEqual(run_command.call_args[0][0][2], 'filter')
+ self.assertEqual(run_command.call_args[0][0][3], '-F')
+
+ def test_flush_table_check_true(self):
+ """Test flush without parameters and check == true"""
+ set_module_args({
+ 'flush': True,
+ '_ansible_check_mode': True,
+ })
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.return_value = 0, '', '' # successful execution, no output
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 0)
+
+# TODO ADD test flush table nat
+# TODO ADD test flush with chain
+# TODO ADD test flush with chain and table nat
+
+ def test_policy_table(self):
+ """Test change policy of a chain"""
+ set_module_args({
+ 'policy': 'ACCEPT',
+ 'chain': 'INPUT',
+ })
+ commands_results = [
+ (0, 'Chain INPUT (policy DROP)\n', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ # import pdb
+ # pdb.set_trace()
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-L',
+ 'INPUT',
+ ])
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-P',
+ 'INPUT',
+ 'ACCEPT',
+ ])
+
+ def test_policy_table_no_change(self):
+ """Test don't change policy of a chain if the policy is right"""
+ set_module_args({
+ 'policy': 'ACCEPT',
+ 'chain': 'INPUT',
+ })
+ commands_results = [
+ (0, 'Chain INPUT (policy ACCEPT)\n', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertFalse(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ # import pdb
+ # pdb.set_trace()
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-L',
+ 'INPUT',
+ ])
+
+ def test_policy_table_changed_false(self):
+ """Test flush without parameters and change == false"""
+ set_module_args({
+ 'policy': 'ACCEPT',
+ 'chain': 'INPUT',
+ '_ansible_check_mode': True,
+ })
+ commands_results = [
+ (0, 'Chain INPUT (policy DROP)\n', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ # import pdb
+ # pdb.set_trace()
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-L',
+ 'INPUT',
+ ])
+
+# TODO ADD test policy without chain fail
+# TODO ADD test policy with chain don't exists
+# TODO ADD test policy with wrong choice fail
+
+ def test_insert_rule_change_false(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'OUTPUT',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'ACCEPT',
+ 'action': 'insert',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (1, '', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ # import pdb
+ # pdb.set_trace()
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+
+ def test_insert_rule(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'OUTPUT',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'ACCEPT',
+ 'action': 'insert'
+ })
+
+ commands_results = [
+ (1, '', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ # import pdb
+ # pdb.set_trace()
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-I',
+ 'OUTPUT',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+
+ def test_append_rule_check_mode(self):
+ """Test append a redirection rule in check mode"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'REDIRECT',
+ 'table': 'nat',
+ 'to_destination': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'destination_port': '22',
+ 'to_ports': '8600',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (1, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'REDIRECT',
+ '--to-destination',
+ '5.5.5.5/32',
+ '--destination-port',
+ '22',
+ '--to-ports',
+ '8600'
+ ])
+
+ def test_append_rule(self):
+ """Test append a redirection rule"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'REDIRECT',
+ 'table': 'nat',
+ 'to_destination': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'destination_port': '22',
+ 'to_ports': '8600'
+ })
+
+ commands_results = [
+ (1, '', ''),
+ (0, '', '')
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'REDIRECT',
+ '--to-destination',
+ '5.5.5.5/32',
+ '--destination-port',
+ '22',
+ '--to-ports',
+ '8600'
+ ])
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-A',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'REDIRECT',
+ '--to-destination',
+ '5.5.5.5/32',
+ '--destination-port',
+ '22',
+ '--to-ports',
+ '8600'
+ ])
+
+ def test_remove_rule(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'SNAT',
+ 'table': 'nat',
+ 'to_source': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'source_port': '22',
+ 'to_ports': '8600',
+ 'state': 'absent',
+ 'in_interface': 'eth0',
+ 'out_interface': 'eth1',
+ 'comment': 'this is a comment'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 2)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'SNAT',
+ '--to-source',
+ '5.5.5.5/32',
+ '-i',
+ 'eth0',
+ '-o',
+ 'eth1',
+ '--source-port',
+ '22',
+ '--to-ports',
+ '8600',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+ self.assertEqual(run_command.call_args_list[1][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-D',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'SNAT',
+ '--to-source',
+ '5.5.5.5/32',
+ '-i',
+ 'eth0',
+ '-o',
+ 'eth1',
+ '--source-port',
+ '22',
+ '--to-ports',
+ '8600',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+
+ def test_remove_rule_check_mode(self):
+ """Test flush without parameters check mode"""
+ set_module_args({
+ 'chain': 'PREROUTING',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'SNAT',
+ 'table': 'nat',
+ 'to_source': '5.5.5.5/32',
+ 'protocol': 'udp',
+ 'source_port': '22',
+ 'to_ports': '8600',
+ 'state': 'absent',
+ 'in_interface': 'eth0',
+ 'out_interface': 'eth1',
+ 'comment': 'this is a comment',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'nat',
+ '-C',
+ 'PREROUTING',
+ '-p',
+ 'udp',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'SNAT',
+ '--to-source',
+ '5.5.5.5/32',
+ '-i',
+ 'eth0',
+ '-o',
+ 'eth1',
+ '--source-port',
+ '22',
+ '--to-ports',
+ '8600',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+
+ def test_insert_with_reject(self):
+ """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'tcp',
+ 'reject_with': 'tcp-reset',
+ 'ip_version': 'ipv4',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-p',
+ 'tcp',
+ '-j',
+ 'REJECT',
+ '--reject-with',
+ 'tcp-reset',
+ ])
+
+ def test_insert_jump_reject_with_reject(self):
+ """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'protocol': 'tcp',
+ 'jump': 'REJECT',
+ 'reject_with': 'tcp-reset',
+ 'ip_version': 'ipv4',
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-p',
+ 'tcp',
+ '-j',
+ 'REJECT',
+ '--reject-with',
+ 'tcp-reset',
+ ])
+
+ def test_jump_tee_gateway_negative(self):
+ """ Missing gateway when JUMP is set to TEE """
+ set_module_args({
+ 'table': 'mangle',
+ 'chain': 'PREROUTING',
+ 'in_interface': 'eth0',
+ 'protocol': 'udp',
+ 'match': 'state',
+ 'jump': 'TEE',
+ 'ctstate': ['NEW'],
+ 'destination_port': '9521',
+ 'destination': '127.0.0.1'
+ })
+
+ with self.assertRaises(AnsibleFailJson) as e:
+ iptables.main()
+ self.assertTrue(e.exception.args[0]['failed'])
+ self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway')
+
+ def test_jump_tee_gateway(self):
+ """ Using gateway when JUMP is set to TEE """
+ set_module_args({
+ 'table': 'mangle',
+ 'chain': 'PREROUTING',
+ 'in_interface': 'eth0',
+ 'protocol': 'udp',
+ 'match': 'state',
+ 'jump': 'TEE',
+ 'ctstate': ['NEW'],
+ 'destination_port': '9521',
+ 'gateway': '192.168.10.1',
+ 'destination': '127.0.0.1'
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'mangle',
+ '-C', 'PREROUTING',
+ '-p', 'udp',
+ '-d', '127.0.0.1',
+ '-m', 'state',
+ '-j', 'TEE',
+ '--gateway', '192.168.10.1',
+ '-i', 'eth0',
+ '--destination-port', '9521',
+ '--state', 'NEW'
+ ])
+
+ def test_tcp_flags(self):
+ """ Test various ways of inputting tcp_flags """
+ args = [
+ {
+ 'chain': 'OUTPUT',
+ 'protocol': 'tcp',
+ 'jump': 'DROP',
+ 'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"'
+ },
+ {
+ 'chain': 'OUTPUT',
+ 'protocol': 'tcp',
+ 'jump': 'DROP',
+ 'tcp_flags': {
+ 'flags': 'ALL',
+ 'flags_set': 'ACK,RST,SYN,FIN'
+ }
+ },
+ {
+ 'chain': 'OUTPUT',
+ 'protocol': 'tcp',
+ 'jump': 'DROP',
+ 'tcp_flags': {
+ 'flags': ['ALL'],
+ 'flags_set': ['ACK', 'RST', 'SYN', 'FIN']
+ }
+ },
+
+ ]
+
+ for item in args:
+ set_module_args(item)
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-p',
+ 'tcp',
+ '--tcp-flags',
+ 'ALL',
+ 'ACK,RST,SYN,FIN',
+ '-j',
+ 'DROP'
+ ])
+
+ def test_log_level(self):
+ """ Test various ways of log level flag """
+
+ log_levels = ['0', '1', '2', '3', '4', '5', '6', '7',
+ 'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug']
+
+ for log_lvl in log_levels:
+ set_module_args({
+ 'chain': 'INPUT',
+ 'jump': 'LOG',
+ 'log_level': log_lvl,
+ 'source': '1.2.3.4/32',
+ 'log_prefix': '** DROP-this_ip **'
+ })
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t', 'filter',
+ '-C', 'INPUT',
+ '-s', '1.2.3.4/32',
+ '-j', 'LOG',
+ '--log-prefix', '** DROP-this_ip **',
+ '--log-level', log_lvl
+ ])
+
+ def test_iprange(self):
+ """ Test iprange module with its flags src_range and dst_range """
+ set_module_args({
+ 'chain': 'INPUT',
+ 'match': ['iprange'],
+ 'src_range': '192.168.1.100-192.168.1.199',
+ 'jump': 'ACCEPT'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-m',
+ 'iprange',
+ '-j',
+ 'ACCEPT',
+ '--src-range',
+ '192.168.1.100-192.168.1.199',
+ ])
+
+ set_module_args({
+ 'chain': 'INPUT',
+ 'src_range': '192.168.1.100-192.168.1.199',
+ 'dst_range': '10.0.0.50-10.0.0.100',
+ 'jump': 'ACCEPT'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-j',
+ 'ACCEPT',
+ '-m',
+ 'iprange',
+ '--src-range',
+ '192.168.1.100-192.168.1.199',
+ '--dst-range',
+ '10.0.0.50-10.0.0.100'
+ ])
+
+ set_module_args({
+ 'chain': 'INPUT',
+ 'dst_range': '10.0.0.50-10.0.0.100',
+ 'jump': 'ACCEPT'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-j',
+ 'ACCEPT',
+ '-m',
+ 'iprange',
+ '--dst-range',
+ '10.0.0.50-10.0.0.100'
+ ])
+
+ def test_insert_rule_with_wait(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'OUTPUT',
+ 'source': '1.2.3.4/32',
+ 'destination': '7.8.9.10/42',
+ 'jump': 'ACCEPT',
+ 'action': 'insert',
+ 'wait': '10'
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'OUTPUT',
+ '-w',
+ '10',
+ '-s',
+ '1.2.3.4/32',
+ '-d',
+ '7.8.9.10/42',
+ '-j',
+ 'ACCEPT'
+ ])
+
+ def test_comment_position_at_end(self):
+ """Test flush without parameters"""
+ set_module_args({
+ 'chain': 'INPUT',
+ 'jump': 'ACCEPT',
+ 'action': 'insert',
+ 'ctstate': ['NEW'],
+ 'comment': 'this is a comment',
+ '_ansible_check_mode': True,
+ })
+
+ commands_results = [
+ (0, '', ''),
+ ]
+
+ with patch.object(basic.AnsibleModule, 'run_command') as run_command:
+ run_command.side_effect = commands_results
+ with self.assertRaises(AnsibleExitJson) as result:
+ iptables.main()
+ self.assertTrue(result.exception.args[0]['changed'])
+
+ self.assertEqual(run_command.call_count, 1)
+ self.assertEqual(run_command.call_args_list[0][0][0], [
+ '/sbin/iptables',
+ '-t',
+ 'filter',
+ '-C',
+ 'INPUT',
+ '-j',
+ 'ACCEPT',
+ '-m',
+ 'conntrack',
+ '--ctstate',
+ 'NEW',
+ '-m',
+ 'comment',
+ '--comment',
+ 'this is a comment'
+ ])
+ self.assertEqual(run_command.call_args[0][0][14], 'this is a comment')
diff --git a/test/units/modules/test_known_hosts.py b/test/units/modules/test_known_hosts.py
new file mode 100644
index 00000000..3b6dfd86
--- /dev/null
+++ b/test/units/modules/test_known_hosts.py
@@ -0,0 +1,110 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import tempfile
+from ansible.module_utils import basic
+
+from units.compat import unittest
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils.basic import AnsibleModule
+
+from ansible.modules.known_hosts import compute_diff, sanity_check
+
+
+class KnownHostsDiffTestCase(unittest.TestCase):
+
+ def _create_file(self, content):
+ tmp_file = tempfile.NamedTemporaryFile(prefix='ansible-test-', suffix='-known_hosts', delete=False)
+ tmp_file.write(to_bytes(content))
+ tmp_file.close()
+ self.addCleanup(os.unlink, tmp_file.name)
+ return tmp_file.name
+
+ def test_no_existing_file(self):
+ path = tempfile.mktemp(prefix='ansible-test-', suffix='-known_hosts')
+ key = 'example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': '/dev/null',
+ 'after_header': path,
+ 'before': '',
+ 'after': 'example.com ssh-rsa AAAAetc\n',
+ })
+
+ def test_key_addition(self):
+ path = self._create_file(
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=None, replace_or_add=False, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'two.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n',
+ })
+
+ def test_no_change(self):
+ path = self._create_file(
+ 'one.example.com ssh-rsa AAAAetc\n'
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=1, replace_or_add=False, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ 'after': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ })
+
+ def test_key_change(self):
+ path = self._create_file(
+ 'one.example.com ssh-rsa AAAaetc\n'
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=1, replace_or_add=True, state='present', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'one.example.com ssh-rsa AAAaetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\none.example.com ssh-rsa AAAAetc\n',
+ })
+
+ def test_key_removal(self):
+ path = self._create_file(
+ 'one.example.com ssh-rsa AAAAetc\n'
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=1, replace_or_add=False, state='absent', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'one.example.com ssh-rsa AAAAetc\ntwo.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\n',
+ })
+
+ def test_key_removal_no_change(self):
+ path = self._create_file(
+ 'two.example.com ssh-rsa BBBBetc\n'
+ )
+ key = 'one.example.com ssh-rsa AAAAetc\n'
+ diff = compute_diff(path, found_line=None, replace_or_add=False, state='absent', key=key)
+ self.assertEqual(diff, {
+ 'before_header': path,
+ 'after_header': path,
+ 'before': 'two.example.com ssh-rsa BBBBetc\n',
+ 'after': 'two.example.com ssh-rsa BBBBetc\n',
+ })
+
+ def test_sanity_check(self):
+ basic._load_params = lambda: {}
+ # Module used internally to execute ssh-keygen system executable
+ module = AnsibleModule(argument_spec={})
+ host = '10.0.0.1'
+ key = '%s ssh-rsa ASDF foo@bar' % (host,)
+ keygen = module.get_bin_path('ssh-keygen')
+ sanity_check(module, host, key, keygen)
diff --git a/test/units/modules/test_pip.py b/test/units/modules/test_pip.py
new file mode 100644
index 00000000..7f0f8b07
--- /dev/null
+++ b/test/units/modules/test_pip.py
@@ -0,0 +1,38 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.modules import pip
+
+
+pytestmark = pytest.mark.usefixtures('patch_ansible_module')
+
+
+@pytest.mark.parametrize('patch_ansible_module', [{'name': 'six'}], indirect=['patch_ansible_module'])
+def test_failure_when_pip_absent(mocker, capfd):
+ get_bin_path = mocker.patch('ansible.module_utils.basic.AnsibleModule.get_bin_path')
+ get_bin_path.return_value = None
+
+ with pytest.raises(SystemExit):
+ pip.main()
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+ assert results['failed']
+ assert 'pip needs to be installed' in results['msg']
+
+
+@pytest.mark.parametrize('patch_ansible_module, test_input, expected', [
+ [None, ['django>1.11.1', '<1.11.2', 'ipaddress', 'simpleproject<2.0.0', '>1.1.0'],
+ ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']],
+ [None, ['django>1.11.1,<1.11.2,ipaddress', 'simpleproject<2.0.0,>1.1.0'],
+ ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']],
+ [None, ['django>1.11.1', '<1.11.2', 'ipaddress,simpleproject<2.0.0,>1.1.0'],
+ ['django>1.11.1,<1.11.2', 'ipaddress', 'simpleproject<2.0.0,>1.1.0']]])
+def test_recover_package_name(test_input, expected):
+ assert pip._recover_package_name(test_input) == expected
diff --git a/test/units/modules/test_systemd.py b/test/units/modules/test_systemd.py
new file mode 100644
index 00000000..52c212a0
--- /dev/null
+++ b/test/units/modules/test_systemd.py
@@ -0,0 +1,52 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat import unittest
+from ansible.modules.systemd import parse_systemctl_show
+
+
+class ParseSystemctlShowTestCase(unittest.TestCase):
+
+ def test_simple(self):
+ lines = [
+ 'Type=simple',
+ 'Restart=no',
+ 'Requires=system.slice sysinit.target',
+ 'Description=Blah blah blah',
+ ]
+ parsed = parse_systemctl_show(lines)
+ self.assertEqual(parsed, {
+ 'Type': 'simple',
+ 'Restart': 'no',
+ 'Requires': 'system.slice sysinit.target',
+ 'Description': 'Blah blah blah',
+ })
+
+ def test_multiline_exec(self):
+ # This was taken from a real service that specified "ExecStart=/bin/echo foo\nbar"
+ lines = [
+ 'Type=simple',
+ 'ExecStart={ path=/bin/echo ; argv[]=/bin/echo foo',
+ 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }',
+ 'Description=blah',
+ ]
+ parsed = parse_systemctl_show(lines)
+ self.assertEqual(parsed, {
+ 'Type': 'simple',
+ 'ExecStart': '{ path=/bin/echo ; argv[]=/bin/echo foo\n'
+ 'bar ; ignore_errors=no ; start_time=[n/a] ; stop_time=[n/a] ; pid=0 ; code=(null) ; status=0/0 }',
+ 'Description': 'blah',
+ })
+
+ def test_single_line_with_brace(self):
+ lines = [
+ 'Type=simple',
+ 'Description={ this is confusing',
+ 'Restart=no',
+ ]
+ parsed = parse_systemctl_show(lines)
+ self.assertEqual(parsed, {
+ 'Type': 'simple',
+ 'Description': '{ this is confusing',
+ 'Restart': 'no',
+ })
diff --git a/test/units/modules/test_yum.py b/test/units/modules/test_yum.py
new file mode 100644
index 00000000..e5d601a6
--- /dev/null
+++ b/test/units/modules/test_yum.py
@@ -0,0 +1,207 @@
+# -*- coding: utf-8 -*-
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat import unittest
+
+from ansible.modules.yum import YumModule
+
+
+yum_plugin_load_error = """
+Plugin "product-id" can't be imported
+Plugin "search-disabled-repos" can't be imported
+Plugin "subscription-manager" can't be imported
+Plugin "product-id" can't be imported
+Plugin "search-disabled-repos" can't be imported
+Plugin "subscription-manager" can't be imported
+"""
+
+# from https://github.com/ansible/ansible/issues/20608#issuecomment-276106505
+wrapped_output_1 = """
+Загружены модули: fastestmirror
+Loading mirror speeds from cached hostfile
+ * base: mirror.h1host.ru
+ * extras: mirror.h1host.ru
+ * updates: mirror.h1host.ru
+
+vms-agent.x86_64 0.0-9 dev
+"""
+
+# from https://github.com/ansible/ansible/issues/20608#issuecomment-276971275
+wrapped_output_2 = """
+Загружены модули: fastestmirror
+Loading mirror speeds from cached hostfile
+ * base: mirror.corbina.net
+ * extras: mirror.corbina.net
+ * updates: mirror.corbina.net
+
+empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty.x86_64
+ 0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1-0
+ addons
+libtiff.x86_64 4.0.3-27.el7_3 updates
+"""
+
+# From https://github.com/ansible/ansible/issues/20608#issuecomment-276698431
+wrapped_output_3 = """
+Loaded plugins: fastestmirror, langpacks
+Loading mirror speeds from cached hostfile
+
+ceph.x86_64 1:11.2.0-0.el7 ceph
+ceph-base.x86_64 1:11.2.0-0.el7 ceph
+ceph-common.x86_64 1:11.2.0-0.el7 ceph
+ceph-mds.x86_64 1:11.2.0-0.el7 ceph
+ceph-mon.x86_64 1:11.2.0-0.el7 ceph
+ceph-osd.x86_64 1:11.2.0-0.el7 ceph
+ceph-selinux.x86_64 1:11.2.0-0.el7 ceph
+libcephfs1.x86_64 1:11.0.2-0.el7 ceph
+librados2.x86_64 1:11.2.0-0.el7 ceph
+libradosstriper1.x86_64 1:11.2.0-0.el7 ceph
+librbd1.x86_64 1:11.2.0-0.el7 ceph
+librgw2.x86_64 1:11.2.0-0.el7 ceph
+python-cephfs.x86_64 1:11.2.0-0.el7 ceph
+python-rados.x86_64 1:11.2.0-0.el7 ceph
+python-rbd.x86_64 1:11.2.0-0.el7 ceph
+"""
+
+# from https://github.com/ansible/ansible-modules-core/issues/4318#issuecomment-251416661
+wrapped_output_4 = """
+ipxe-roms-qemu.noarch 20160127-1.git6366fa7a.el7
+ rhelosp-9.0-director-puddle
+quota.x86_64 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z
+quota-nls.noarch 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z
+rdma.noarch 7.2_4.1_rc6-2.el7 rhelosp-rhel-7.2-z
+screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2
+ rhelosp-rhel-7.2-z
+sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle
+sssd-client.x86_64 1.13.0-40.el7_2.12 rhelosp-rhel-7.2-z
+"""
+
+
+# A 'normal-ish' yum check-update output, without any wrapped lines
+unwrapped_output_rhel7 = """
+
+Loaded plugins: etckeeper, product-id, search-disabled-repos, subscription-
+ : manager
+This system is not registered to Red Hat Subscription Management. You can use subscription-manager to register.
+
+NetworkManager-openvpn.x86_64 1:1.2.6-1.el7 epel
+NetworkManager-openvpn-gnome.x86_64 1:1.2.6-1.el7 epel
+cabal-install.x86_64 1.16.1.0-2.el7 epel
+cgit.x86_64 1.1-1.el7 epel
+python34-libs.x86_64 3.4.5-3.el7 epel
+python34-test.x86_64 3.4.5-3.el7 epel
+python34-tkinter.x86_64 3.4.5-3.el7 epel
+python34-tools.x86_64 3.4.5-3.el7 epel
+qgit.x86_64 2.6-4.el7 epel
+rdiff-backup.x86_64 1.2.8-12.el7 epel
+stoken-libs.x86_64 0.91-1.el7 epel
+xlockmore.x86_64 5.49-2.el7 epel
+"""
+
+# Some wrapped obsoletes for prepending to output for testing both
+wrapped_output_rhel7_obsoletes_postfix = """
+Obsoleting Packages
+ddashboard.x86_64 0.2.0.1-1.el7_3 mhlavink-developerdashboard
+ developerdashboard.x86_64 0.1.12.2-1.el7_2 @mhlavink-developerdashboard
+python-bugzilla.noarch 1.2.2-3.el7_2.1 mhlavink-developerdashboard
+ python-bugzilla-develdashboardfixes.noarch
+ 1.2.2-3.el7 @mhlavink-developerdashboard
+python2-futures.noarch 3.0.5-1.el7 epel
+ python-futures.noarch 3.0.3-1.el7 @epel
+python2-pip.noarch 8.1.2-5.el7 epel
+ python-pip.noarch 7.1.0-1.el7 @epel
+python2-pyxdg.noarch 0.25-6.el7 epel
+ pyxdg.noarch 0.25-5.el7 @epel
+python2-simplejson.x86_64 3.10.0-1.el7 epel
+ python-simplejson.x86_64 3.3.3-1.el7 @epel
+Security: kernel-3.10.0-327.28.2.el7.x86_64 is an installed security update
+Security: kernel-3.10.0-327.22.2.el7.x86_64 is the currently running version
+"""
+
+longname = """
+Loaded plugins: fastestmirror, priorities, rhnplugin
+This system is receiving updates from RHN Classic or Red Hat Satellite.
+Loading mirror speeds from cached hostfile
+
+xxxxxxxxxxxxxxxxxxxxxxxxxx.noarch
+ 1.16-1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+glibc.x86_64 2.17-157.el7_3.1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"""
+
+
+unwrapped_output_rhel7_obsoletes = unwrapped_output_rhel7 + wrapped_output_rhel7_obsoletes_postfix
+unwrapped_output_rhel7_expected_new_obsoletes_pkgs = [
+ "ddashboard", "python-bugzilla", "python2-futures", "python2-pip",
+ "python2-pyxdg", "python2-simplejson"
+]
+unwrapped_output_rhel7_expected_old_obsoletes_pkgs = [
+ "developerdashboard", "python-bugzilla-develdashboardfixes",
+ "python-futures", "python-pip", "pyxdg", "python-simplejson"
+]
+unwrapped_output_rhel7_expected_updated_pkgs = [
+ "NetworkManager-openvpn", "NetworkManager-openvpn-gnome", "cabal-install",
+ "cgit", "python34-libs", "python34-test", "python34-tkinter",
+ "python34-tools", "qgit", "rdiff-backup", "stoken-libs", "xlockmore"
+]
+
+
+class TestYumUpdateCheckParse(unittest.TestCase):
+ def _assert_expected(self, expected_pkgs, result):
+
+ for expected_pkg in expected_pkgs:
+ self.assertIn(expected_pkg, result)
+ self.assertEqual(len(result), len(expected_pkgs))
+ self.assertIsInstance(result, dict)
+
+ def test_empty_output(self):
+ res, obs = YumModule.parse_check_update("")
+ expected_pkgs = []
+ self._assert_expected(expected_pkgs, res)
+
+ def test_longname(self):
+ res, obs = YumModule.parse_check_update(longname)
+ expected_pkgs = ['xxxxxxxxxxxxxxxxxxxxxxxxxx', 'glibc']
+ self._assert_expected(expected_pkgs, res)
+
+ def test_plugin_load_error(self):
+ res, obs = YumModule.parse_check_update(yum_plugin_load_error)
+ expected_pkgs = []
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_1(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_1)
+ expected_pkgs = ["vms-agent"]
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_2(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_2)
+ expected_pkgs = ["empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty",
+ "libtiff"]
+
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_3(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_3)
+ expected_pkgs = ["ceph", "ceph-base", "ceph-common", "ceph-mds",
+ "ceph-mon", "ceph-osd", "ceph-selinux", "libcephfs1",
+ "librados2", "libradosstriper1", "librbd1", "librgw2",
+ "python-cephfs", "python-rados", "python-rbd"]
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_4(self):
+ res, obs = YumModule.parse_check_update(wrapped_output_4)
+
+ expected_pkgs = ["ipxe-roms-qemu", "quota", "quota-nls", "rdma", "screen",
+ "sos", "sssd-client"]
+ self._assert_expected(expected_pkgs, res)
+
+ def test_wrapped_output_rhel7(self):
+ res, obs = YumModule.parse_check_update(unwrapped_output_rhel7)
+ self._assert_expected(unwrapped_output_rhel7_expected_updated_pkgs, res)
+
+ def test_wrapped_output_rhel7_obsoletes(self):
+ res, obs = YumModule.parse_check_update(unwrapped_output_rhel7_obsoletes)
+ self._assert_expected(
+ unwrapped_output_rhel7_expected_updated_pkgs + unwrapped_output_rhel7_expected_new_obsoletes_pkgs,
+ res
+ )
+ self._assert_expected(unwrapped_output_rhel7_expected_old_obsoletes_pkgs, obs)
diff --git a/test/units/modules/utils.py b/test/units/modules/utils.py
new file mode 100644
index 00000000..6d169e36
--- /dev/null
+++ b/test/units/modules/utils.py
@@ -0,0 +1,50 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+from units.compat import unittest
+from units.compat.mock import patch
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+
+
+def set_module_args(args):
+ if '_ansible_remote_tmp' not in args:
+ args['_ansible_remote_tmp'] = '/tmp'
+ if '_ansible_keep_remote_files' not in args:
+ args['_ansible_keep_remote_files'] = False
+
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args)
+
+
+class AnsibleExitJson(Exception):
+ pass
+
+
+class AnsibleFailJson(Exception):
+ pass
+
+
+def exit_json(*args, **kwargs):
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs):
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class ModuleTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
+ self.mock_module.start()
+ self.mock_sleep = patch('time.sleep')
+ self.mock_sleep.start()
+ set_module_args({})
+ self.addCleanup(self.mock_module.stop)
+ self.addCleanup(self.mock_sleep.stop)