summaryrefslogtreecommitdiffstats
path: root/test/units/modules
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-05 16:16:49 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-05 16:16:49 +0000
commit48e387c5c12026a567eb7b293a3a590241c0cecb (patch)
tree80f2573be2d7d534b8ac4d2a852fe43f7ac35324 /test/units/modules
parentReleasing progress-linux version 2.16.6-1~progress7.99u1. (diff)
downloadansible-core-48e387c5c12026a567eb7b293a3a590241c0cecb.tar.xz
ansible-core-48e387c5c12026a567eb7b293a3a590241c0cecb.zip
Merging upstream version 2.17.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/units/modules')
-rw-r--r--test/units/modules/conftest.py3
-rw-r--r--test/units/modules/test_apt.py83
-rw-r--r--test/units/modules/test_apt_key.py7
-rw-r--r--test/units/modules/test_async_wrapper.py3
-rw-r--r--test/units/modules/test_copy.py4
-rw-r--r--test/units/modules/test_hostname.py8
-rw-r--r--test/units/modules/test_iptables.py2574
-rw-r--r--test/units/modules/test_known_hosts.py5
-rw-r--r--test/units/modules/test_pip.py3
-rw-r--r--test/units/modules/test_service.py6
-rw-r--r--test/units/modules/test_service_facts.py7
-rw-r--r--test/units/modules/test_systemd.py5
-rw-r--r--test/units/modules/test_unarchive.py4
-rw-r--r--test/units/modules/test_uri.py43
-rw-r--r--test/units/modules/test_yum.py222
-rw-r--r--test/units/modules/utils.py7
16 files changed, 1523 insertions, 1461 deletions
diff --git a/test/units/modules/conftest.py b/test/units/modules/conftest.py
index c60c586..f94b236 100644
--- a/test/units/modules/conftest.py
+++ b/test/units/modules/conftest.py
@@ -1,8 +1,7 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import json
diff --git a/test/units/modules/test_apt.py b/test/units/modules/test_apt.py
index a5aa4a9..d207320 100644
--- a/test/units/modules/test_apt.py
+++ b/test/units/modules/test_apt.py
@@ -1,46 +1,45 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+# Copyright: Contributors to the Ansible project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import annotations
import collections
-from units.compat.mock import Mock
-from units.compat import unittest
-
-from ansible.modules.apt import (
- expand_pkgspec_from_fnmatches,
+from ansible.modules.apt import expand_pkgspec_from_fnmatches
+import pytest
+
+FakePackage = collections.namedtuple("Package", ("name",))
+fake_cache = [
+ FakePackage("apt"),
+ FakePackage("apt-utils"),
+ FakePackage("not-selected"),
+]
+
+
+@pytest.mark.parametrize(
+ ("test_input", "expected"),
+ [
+ pytest.param(
+ ["apt"],
+ ["apt"],
+ id="trivial",
+ ),
+ pytest.param(
+ ["apt=1.0*"],
+ ["apt=1.0*"],
+ id="version-wildcard",
+ ),
+ pytest.param(
+ ["apt*=1.0*"],
+ ["apt", "apt-utils"],
+ id="pkgname-wildcard-version",
+ ),
+ pytest.param(
+ ["apt*"],
+ ["apt", "apt-utils"],
+ id="pkgname-expands",
+ ),
+ ],
)
-
-
-class AptExpandPkgspecTestCase(unittest.TestCase):
-
- def setUp(self):
- FakePackage = collections.namedtuple("Package", ("name",))
- self.fake_cache = [
- FakePackage("apt"),
- FakePackage("apt-utils"),
- FakePackage("not-selected"),
- ]
-
- def test_trivial(self):
- pkg = ["apt"]
- self.assertEqual(
- expand_pkgspec_from_fnmatches(None, pkg, self.fake_cache), pkg)
-
- def test_version_wildcard(self):
- pkg = ["apt=1.0*"]
- self.assertEqual(
- expand_pkgspec_from_fnmatches(None, pkg, self.fake_cache), pkg)
-
- def test_pkgname_wildcard_version_wildcard(self):
- pkg = ["apt*=1.0*"]
- m_mock = Mock()
- self.assertEqual(
- expand_pkgspec_from_fnmatches(m_mock, pkg, self.fake_cache),
- ['apt', 'apt-utils'])
-
- def test_pkgname_expands(self):
- pkg = ["apt*"]
- m_mock = Mock()
- self.assertEqual(
- expand_pkgspec_from_fnmatches(m_mock, pkg, self.fake_cache),
- ["apt", "apt-utils"])
+def test_expand_pkgspec_from_fnmatches(test_input, expected):
+ """Test positive cases of ``expand_pkgspec_from_fnmatches``."""
+ assert expand_pkgspec_from_fnmatches(None, test_input, fake_cache) == expected
diff --git a/test/units/modules/test_apt_key.py b/test/units/modules/test_apt_key.py
index 37cd53b..051dc2e 100644
--- a/test/units/modules/test_apt_key.py
+++ b/test/units/modules/test_apt_key.py
@@ -1,10 +1,9 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import os
-from units.compat.mock import patch, Mock
-from units.compat import unittest
+from unittest.mock import patch, Mock
+import unittest
from ansible.modules import apt_key
diff --git a/test/units/modules/test_async_wrapper.py b/test/units/modules/test_async_wrapper.py
index dbaf683..1e3b02f 100644
--- a/test/units/modules/test_async_wrapper.py
+++ b/test/units/modules/test_async_wrapper.py
@@ -1,7 +1,6 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import os
diff --git a/test/units/modules/test_copy.py b/test/units/modules/test_copy.py
index beeef6d..49e3324 100644
--- a/test/units/modules/test_copy.py
+++ b/test/units/modules/test_copy.py
@@ -3,9 +3,7 @@
# (c) 2018 Ansible Project
# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import pytest
diff --git a/test/units/modules/test_hostname.py b/test/units/modules/test_hostname.py
index 1aa4a57..20b8336 100644
--- a/test/units/modules/test_hostname.py
+++ b/test/units/modules/test_hostname.py
@@ -1,15 +1,13 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import os
import shutil
import tempfile
-from units.compat.mock import patch, MagicMock, mock_open
+from unittest.mock import patch, MagicMock, mock_open
from ansible.module_utils.common._utils import get_all_subclasses
from ansible.modules import hostname
from units.modules.utils import ModuleTestCase, set_module_args
-from ansible.module_utils.six import PY2
class TestHostname(ModuleTestCase):
@@ -28,8 +26,6 @@ class TestHostname(ModuleTestCase):
m = mock_open()
builtins = 'builtins'
- if PY2:
- builtins = '__builtin__'
with patch('%s.open' % builtins, m):
instance.get_permanent_hostname()
instance.get_current_hostname()
diff --git a/test/units/modules/test_iptables.py b/test/units/modules/test_iptables.py
index 2459cf7..67e3909 100644
--- a/test/units/modules/test_iptables.py
+++ b/test/units/modules/test_iptables.py
@@ -1,1170 +1,1430 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+# Copyright: Contributors to the Ansible project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import annotations
+
+from units.modules.utils import AnsibleExitJson, AnsibleFailJson, set_module_args, fail_json, exit_json
+import pytest
-from units.compat.mock import patch
-from ansible.module_utils import basic
from ansible.modules import iptables
-from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
-
-
-def get_bin_path(*args, **kwargs):
- return "/sbin/iptables"
-
-
-def get_iptables_version(iptables_path, module):
- return "1.8.2"
-
-
-class TestIptables(ModuleTestCase):
-
- def setUp(self):
- super(TestIptables, self).setUp()
- self.mock_get_bin_path = patch.object(basic.AnsibleModule, 'get_bin_path', get_bin_path)
- self.mock_get_bin_path.start()
- self.addCleanup(self.mock_get_bin_path.stop) # ensure that the patching is 'undone'
- self.mock_get_iptables_version = patch.object(iptables, 'get_iptables_version', get_iptables_version)
- self.mock_get_iptables_version.start()
- self.addCleanup(self.mock_get_iptables_version.stop) # ensure that the patching is 'undone'
-
- def test_without_required_parameters(self):
- """Failure must occurs when all parameters are missing"""
- with self.assertRaises(AnsibleFailJson):
- set_module_args({})
- iptables.main()
-
- def test_flush_table_without_chain(self):
- """Test flush without chain, flush the table"""
- set_module_args({
- 'flush': True,
- })
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.return_value = 0, '', '' # successful execution, no output
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args[0][0][0], '/sbin/iptables')
- self.assertEqual(run_command.call_args[0][0][1], '-t')
- self.assertEqual(run_command.call_args[0][0][2], 'filter')
- self.assertEqual(run_command.call_args[0][0][3], '-F')
-
- def test_flush_table_check_true(self):
- """Test flush without parameters and check == true"""
- set_module_args({
- 'flush': True,
- '_ansible_check_mode': True,
- })
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.return_value = 0, '', '' # successful execution, no output
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 0)
-
-# TODO ADD test flush table nat
-# TODO ADD test flush with chain
-# TODO ADD test flush with chain and table nat
-
- def test_policy_table(self):
- """Test change policy of a chain"""
- set_module_args({
- 'policy': 'ACCEPT',
- 'chain': 'INPUT',
- })
- commands_results = [
- (0, 'Chain INPUT (policy DROP)\n', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-L',
- 'INPUT',
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-P',
- 'INPUT',
- 'ACCEPT',
- ])
-
- def test_policy_table_no_change(self):
- """Test don't change policy of a chain if the policy is right"""
- set_module_args({
- 'policy': 'ACCEPT',
- 'chain': 'INPUT',
- })
- commands_results = [
- (0, 'Chain INPUT (policy ACCEPT)\n', ''),
- (0, '', '')
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertFalse(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-L',
- 'INPUT',
- ])
-
- def test_policy_table_changed_false(self):
- """Test flush without parameters and change == false"""
- set_module_args({
- 'policy': 'ACCEPT',
- 'chain': 'INPUT',
- '_ansible_check_mode': True,
- })
- commands_results = [
- (0, 'Chain INPUT (policy DROP)\n', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-L',
- 'INPUT',
- ])
+
+IPTABLES_CMD = "/sbin/iptables"
+IPTABLES_VERSION = "1.8.2"
+CONST_INPUT_FILTER = [IPTABLES_CMD, "-t", "filter", "-L", "INPUT",]
+
+
+@pytest.fixture
+def _mock_basic_commands(mocker):
+ """Mock basic commands like get_bin_path and get_iptables_version."""
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.get_bin_path",
+ return_value=IPTABLES_CMD,
+ )
+ mocker.patch("ansible.modules.iptables.get_iptables_version", return_value=IPTABLES_VERSION)
+
+
+def test_without_required_parameters(mocker):
+ """Test module without any parameters."""
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.fail_json",
+ side_effect=fail_json,
+ )
+ set_module_args({})
+ with pytest.raises(AnsibleFailJson) as exc:
+ iptables.main()
+
+ assert exc.value.args[0]["failed"]
+ assert "Failed to find required executable" in exc.value.args[0]["msg"]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_flush_table_without_chain(mocker):
+ """Test flush without chain, flush the table."""
+ set_module_args(
+ {
+ "flush": True,
+ }
+ )
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command", return_value=(0, "", "")
+ )
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args[0][0]
+ assert first_cmd_args_list[0], IPTABLES_CMD
+ assert first_cmd_args_list[1], "-t"
+ assert first_cmd_args_list[2], "filter"
+ assert first_cmd_args_list[3], "-F"
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_flush_table_check_true(mocker):
+ """Test flush without parameters and check == true."""
+ set_module_args(
+ {
+ "flush": True,
+ "_ansible_check_mode": True,
+ }
+ )
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command", return_value=(0, "", "")
+ )
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 0
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_policy_table(mocker):
+ """Test change policy of a chain."""
+ set_module_args(
+ {
+ "policy": "ACCEPT",
+ "chain": "INPUT",
+ }
+ )
+ commands_results = [(0, "Chain INPUT (policy DROP)\n", ""), (0, "", "")]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 2
+ first_cmd_args_list = run_command.call_args_list[0]
+ second_cmd_args_list = run_command.call_args_list[1]
+ assert first_cmd_args_list[0][0] == CONST_INPUT_FILTER
+ assert second_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-P",
+ "INPUT",
+ "ACCEPT",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+@pytest.mark.parametrize(
+ ("test_input", "commands_results"),
+ [
+ pytest.param(
+ {
+ "policy": "ACCEPT",
+ "chain": "INPUT",
+ "_ansible_check_mode": True,
+ },
+ [
+ (0, "Chain INPUT (policy DROP)\n", ""),
+ ],
+ id="policy-table-no-change",
+ ),
+ pytest.param(
+ {
+ "policy": "ACCEPT",
+ "chain": "INPUT",
+ },
+ [
+ (0, "Chain INPUT (policy ACCEPT)\n", ""),
+ (0, "", ""),
+ ],
+ id="policy-table-change-false",
+ )
+ ]
+)
+def test_policy_table_flush(mocker, test_input, commands_results):
+ """Test flush without parameters and change == false."""
+ set_module_args(test_input)
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == CONST_INPUT_FILTER
+
# TODO ADD test policy without chain fail
# TODO ADD test policy with chain don't exists
# TODO ADD test policy with wrong choice fail
- def test_insert_rule_change_false(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'OUTPUT',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'ACCEPT',
- 'action': 'insert',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (1, '', ''), # check_rule_present
- (0, '', ''), # check_chain_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
-
- def test_insert_rule(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'OUTPUT',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'ACCEPT',
- 'action': 'insert'
- })
-
- commands_results = [
- (1, '', ''), # check_rule_present
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-I',
- 'OUTPUT',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
-
- def test_append_rule_check_mode(self):
- """Test append a redirection rule in check mode"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'REDIRECT',
- 'table': 'nat',
- 'to_destination': '5.5.5.5/32',
- 'protocol': 'udp',
- 'destination_port': '22',
- 'to_ports': '8600',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (1, '', ''), # check_rule_present
- (0, '', ''), # check_chain_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'REDIRECT',
- '--to-destination',
- '5.5.5.5/32',
- '--destination-port',
- '22',
- '--to-ports',
- '8600'
- ])
-
- def test_append_rule(self):
- """Test append a redirection rule"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'REDIRECT',
- 'table': 'nat',
- 'to_destination': '5.5.5.5/32',
- 'protocol': 'udp',
- 'destination_port': '22',
- 'to_ports': '8600'
- })
-
- commands_results = [
- (1, '', ''), # check_rule_present
- (0, '', ''), # check_chain_present
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'REDIRECT',
- '--to-destination',
- '5.5.5.5/32',
- '--destination-port',
- '22',
- '--to-ports',
- '8600'
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-A',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'REDIRECT',
- '--to-destination',
- '5.5.5.5/32',
- '--destination-port',
- '22',
- '--to-ports',
- '8600'
- ])
-
- def test_remove_rule(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'SNAT',
- 'table': 'nat',
- 'to_source': '5.5.5.5/32',
- 'protocol': 'udp',
- 'source_port': '22',
- 'to_ports': '8600',
- 'state': 'absent',
- 'in_interface': 'eth0',
- 'out_interface': 'eth1',
- 'comment': 'this is a comment'
- })
-
- commands_results = [
- (0, '', ''),
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'SNAT',
- '--to-source',
- '5.5.5.5/32',
- '-i',
- 'eth0',
- '-o',
- 'eth1',
- '--source-port',
- '22',
- '--to-ports',
- '8600',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-D',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'SNAT',
- '--to-source',
- '5.5.5.5/32',
- '-i',
- 'eth0',
- '-o',
- 'eth1',
- '--source-port',
- '22',
- '--to-ports',
- '8600',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
-
- def test_remove_rule_check_mode(self):
- """Test flush without parameters check mode"""
- set_module_args({
- 'chain': 'PREROUTING',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'SNAT',
- 'table': 'nat',
- 'to_source': '5.5.5.5/32',
- 'protocol': 'udp',
- 'source_port': '22',
- 'to_ports': '8600',
- 'state': 'absent',
- 'in_interface': 'eth0',
- 'out_interface': 'eth1',
- 'comment': 'this is a comment',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'nat',
- '-C',
- 'PREROUTING',
- '-p',
- 'udp',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'SNAT',
- '--to-source',
- '5.5.5.5/32',
- '-i',
- 'eth0',
- '-o',
- 'eth1',
- '--source-port',
- '22',
- '--to-ports',
- '8600',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
-
- def test_insert_with_reject(self):
- """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'tcp',
- 'reject_with': 'tcp-reset',
- 'ip_version': 'ipv4',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-p',
- 'tcp',
- '-j',
- 'REJECT',
- '--reject-with',
- 'tcp-reset',
- ])
-
- def test_insert_jump_reject_with_reject(self):
- """ Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988 """
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'tcp',
- 'jump': 'REJECT',
- 'reject_with': 'tcp-reset',
- 'ip_version': 'ipv4',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-p',
- 'tcp',
- '-j',
- 'REJECT',
- '--reject-with',
- 'tcp-reset',
- ])
-
- def test_jump_tee_gateway_negative(self):
- """ Missing gateway when JUMP is set to TEE """
- set_module_args({
- 'table': 'mangle',
- 'chain': 'PREROUTING',
- 'in_interface': 'eth0',
- 'protocol': 'udp',
- 'match': 'state',
- 'jump': 'TEE',
- 'ctstate': ['NEW'],
- 'destination_port': '9521',
- 'destination': '127.0.0.1'
- })
-
- with self.assertRaises(AnsibleFailJson) as e:
- iptables.main()
- self.assertTrue(e.exception.args[0]['failed'])
- self.assertEqual(e.exception.args[0]['msg'], 'jump is TEE but all of the following are missing: gateway')
-
- def test_jump_tee_gateway(self):
- """ Using gateway when JUMP is set to TEE """
- set_module_args({
- 'table': 'mangle',
- 'chain': 'PREROUTING',
- 'in_interface': 'eth0',
- 'protocol': 'udp',
- 'match': 'state',
- 'jump': 'TEE',
- 'ctstate': ['NEW'],
- 'destination_port': '9521',
- 'gateway': '192.168.10.1',
- 'destination': '127.0.0.1'
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'mangle',
- '-C', 'PREROUTING',
- '-p', 'udp',
- '-d', '127.0.0.1',
- '-m', 'state',
- '-j', 'TEE',
- '--gateway', '192.168.10.1',
- '-i', 'eth0',
- '--destination-port', '9521',
- '--state', 'NEW'
- ])
-
- def test_tcp_flags(self):
- """ Test various ways of inputting tcp_flags """
- args = [
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_insert_rule_change_false(mocker):
+ """Test flush without parameters."""
+ set_module_args(
+ {
+ "chain": "OUTPUT",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "ACCEPT",
+ "action": "insert",
+ "_ansible_check_mode": True,
+ }
+ )
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ (0, "", ""), # check_chain_present
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "OUTPUT",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "ACCEPT",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_insert_rule(mocker):
+ """Test flush without parameters."""
+ set_module_args(
+ {
+ "chain": "OUTPUT",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "ACCEPT",
+ "action": "insert",
+ }
+ )
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ (0, "", ""),
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 2
+ first_cmd_args_list = run_command.call_args_list[0]
+ second_cmd_args_list = run_command.call_args_list[1]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "OUTPUT",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "ACCEPT",
+ ]
+ assert second_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-I",
+ "OUTPUT",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "ACCEPT",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_append_rule_check_mode(mocker):
+ """Test append a redirection rule in check mode."""
+ set_module_args(
+ {
+ "chain": "PREROUTING",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "REDIRECT",
+ "table": "nat",
+ "to_destination": "5.5.5.5/32",
+ "protocol": "udp",
+ "destination_port": "22",
+ "to_ports": "8600",
+ "_ansible_check_mode": True,
+ }
+ )
+
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ (0, "", ""), # check_chain_present
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "nat",
+ "-C",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "REDIRECT",
+ "--to-destination",
+ "5.5.5.5/32",
+ "--destination-port",
+ "22",
+ "--to-ports",
+ "8600",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_append_rule(mocker):
+ """Test append a redirection rule."""
+ set_module_args(
+ {
+ "chain": "PREROUTING",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "REDIRECT",
+ "table": "nat",
+ "to_destination": "5.5.5.5/32",
+ "protocol": "udp",
+ "destination_port": "22",
+ "to_ports": "8600",
+ }
+ )
+
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ (0, "", ""), # check_chain_present
+ (0, "", ""),
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 2
+ first_cmd_args_list = run_command.call_args_list[0]
+ second_cmd_args_list = run_command.call_args_list[1]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "nat",
+ "-C",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "REDIRECT",
+ "--to-destination",
+ "5.5.5.5/32",
+ "--destination-port",
+ "22",
+ "--to-ports",
+ "8600",
+ ]
+ assert second_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "nat",
+ "-A",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "REDIRECT",
+ "--to-destination",
+ "5.5.5.5/32",
+ "--destination-port",
+ "22",
+ "--to-ports",
+ "8600",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_remove_rule(mocker):
+ """Test flush without parameters."""
+ set_module_args(
+ {
+ "chain": "PREROUTING",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "SNAT",
+ "table": "nat",
+ "to_source": "5.5.5.5/32",
+ "protocol": "udp",
+ "source_port": "22",
+ "to_ports": "8600",
+ "state": "absent",
+ "in_interface": "eth0",
+ "out_interface": "eth1",
+ "comment": "this is a comment",
+ }
+ )
+
+ commands_results = [
+ (0, "", ""),
+ (0, "", ""),
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 2
+ first_cmd_args_list = run_command.call_args_list[0]
+ second_cmd_args_list = run_command.call_args_list[1]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "nat",
+ "-C",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "SNAT",
+ "--to-source",
+ "5.5.5.5/32",
+ "-i",
+ "eth0",
+ "-o",
+ "eth1",
+ "--source-port",
+ "22",
+ "--to-ports",
+ "8600",
+ "-m",
+ "comment",
+ "--comment",
+ "this is a comment",
+ ]
+ assert second_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "nat",
+ "-D",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "SNAT",
+ "--to-source",
+ "5.5.5.5/32",
+ "-i",
+ "eth0",
+ "-o",
+ "eth1",
+ "--source-port",
+ "22",
+ "--to-ports",
+ "8600",
+ "-m",
+ "comment",
+ "--comment",
+ "this is a comment",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_remove_rule_check_mode(mocker):
+ """Test flush without parameters check mode."""
+ set_module_args(
+ {
+ "chain": "PREROUTING",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "SNAT",
+ "table": "nat",
+ "to_source": "5.5.5.5/32",
+ "protocol": "udp",
+ "source_port": "22",
+ "to_ports": "8600",
+ "state": "absent",
+ "in_interface": "eth0",
+ "out_interface": "eth1",
+ "comment": "this is a comment",
+ "_ansible_check_mode": True,
+ }
+ )
+
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "nat",
+ "-C",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "SNAT",
+ "--to-source",
+ "5.5.5.5/32",
+ "-i",
+ "eth0",
+ "-o",
+ "eth1",
+ "--source-port",
+ "22",
+ "--to-ports",
+ "8600",
+ "-m",
+ "comment",
+ "--comment",
+ "this is a comment",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+@pytest.mark.parametrize(
+ ("test_input", "expected"),
+ [
+ pytest.param(
{
- 'chain': 'OUTPUT',
- 'protocol': 'tcp',
- 'jump': 'DROP',
- 'tcp_flags': 'flags=ALL flags_set="ACK,RST,SYN,FIN"'
+ "chain": "INPUT",
+ "protocol": "tcp",
+ "reject_with": "tcp-reset",
+ "ip_version": "ipv4",
},
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "tcp",
+ "-j",
+ "REJECT",
+ "--reject-with",
+ "tcp-reset",
+ ],
+ id="insert-reject-with",
+ ),
+ pytest.param(
{
- 'chain': 'OUTPUT',
- 'protocol': 'tcp',
- 'jump': 'DROP',
- 'tcp_flags': {
- 'flags': 'ALL',
- 'flags_set': 'ACK,RST,SYN,FIN'
- }
+ "chain": "INPUT",
+ "protocol": "tcp",
+ "jump": "REJECT",
+ "reject_with": "tcp-reset",
+ "ip_version": "ipv4",
},
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "tcp",
+ "-j",
+ "REJECT",
+ "--reject-with",
+ "tcp-reset",
+ ],
+ id="update-reject-with",
+ ),
+ ]
+)
+def test_insert_with_reject(mocker, test_input, expected):
+ """Using reject_with with a previously defined jump: REJECT results in two Jump statements #18988."""
+ set_module_args(test_input)
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == expected
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_jump_tee_gateway_negative(mocker):
+ """Missing gateway when JUMP is set to TEE."""
+ set_module_args(
+ {
+ "table": "mangle",
+ "chain": "PREROUTING",
+ "in_interface": "eth0",
+ "protocol": "udp",
+ "match": "state",
+ "jump": "TEE",
+ "ctstate": ["NEW"],
+ "destination_port": "9521",
+ "destination": "127.0.0.1",
+ }
+ )
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.fail_json",
+ side_effect=fail_json,
+ )
+ jump_err_msg = "jump is TEE but all of the following are missing: gateway"
+ with pytest.raises(AnsibleFailJson, match=jump_err_msg) as exc:
+ iptables.main()
+ assert exc.value.args[0]["failed"]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_jump_tee_gateway(mocker):
+ """Using gateway when JUMP is set to TEE."""
+ set_module_args(
+ {
+ "table": "mangle",
+ "chain": "PREROUTING",
+ "in_interface": "eth0",
+ "protocol": "udp",
+ "match": "state",
+ "jump": "TEE",
+ "ctstate": ["NEW"],
+ "destination_port": "9521",
+ "gateway": "192.168.10.1",
+ "destination": "127.0.0.1",
+ }
+ )
+ commands_results = [
+ (0, "", ""),
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "mangle",
+ "-C",
+ "PREROUTING",
+ "-p",
+ "udp",
+ "-d",
+ "127.0.0.1",
+ "-m",
+ "state",
+ "-j",
+ "TEE",
+ "--gateway",
+ "192.168.10.1",
+ "-i",
+ "eth0",
+ "--destination-port",
+ "9521",
+ "--state",
+ "NEW",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+@pytest.mark.parametrize(
+ ("test_input"),
+ [
+ pytest.param(
+ 'flags=ALL flags_set="ACK,RST,SYN,FIN"',
+ id="tcp-flags-str"
+ ),
+ pytest.param(
{
- 'chain': 'OUTPUT',
- 'protocol': 'tcp',
- 'jump': 'DROP',
- 'tcp_flags': {
- 'flags': ['ALL'],
- 'flags_set': ['ACK', 'RST', 'SYN', 'FIN']
- }
+ "flags": "ALL", "flags_set": "ACK,RST,SYN,FIN"
},
-
- ]
-
- for item in args:
- set_module_args(item)
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-p',
- 'tcp',
- '--tcp-flags',
- 'ALL',
- 'ACK,RST,SYN,FIN',
- '-j',
- 'DROP'
- ])
-
- def test_log_level(self):
- """ Test various ways of log level flag """
-
- log_levels = ['0', '1', '2', '3', '4', '5', '6', '7',
- 'emerg', 'alert', 'crit', 'error', 'warning', 'notice', 'info', 'debug']
-
- for log_lvl in log_levels:
- set_module_args({
- 'chain': 'INPUT',
- 'jump': 'LOG',
- 'log_level': log_lvl,
- 'source': '1.2.3.4/32',
- 'log_prefix': '** DROP-this_ip **'
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-C', 'INPUT',
- '-s', '1.2.3.4/32',
- '-j', 'LOG',
- '--log-prefix', '** DROP-this_ip **',
- '--log-level', log_lvl
- ])
-
- def test_iprange(self):
- """ Test iprange module with its flags src_range and dst_range """
- set_module_args({
- 'chain': 'INPUT',
- 'match': ['iprange'],
- 'src_range': '192.168.1.100-192.168.1.199',
- 'jump': 'ACCEPT'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-m',
- 'iprange',
- '-j',
- 'ACCEPT',
- '--src-range',
- '192.168.1.100-192.168.1.199',
- ])
-
- set_module_args({
- 'chain': 'INPUT',
- 'src_range': '192.168.1.100-192.168.1.199',
- 'dst_range': '10.0.0.50-10.0.0.100',
- 'jump': 'ACCEPT'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-j',
- 'ACCEPT',
- '-m',
- 'iprange',
- '--src-range',
- '192.168.1.100-192.168.1.199',
- '--dst-range',
- '10.0.0.50-10.0.0.100'
- ])
-
- set_module_args({
- 'chain': 'INPUT',
- 'dst_range': '10.0.0.50-10.0.0.100',
- 'jump': 'ACCEPT'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-j',
- 'ACCEPT',
- '-m',
- 'iprange',
- '--dst-range',
- '10.0.0.50-10.0.0.100'
- ])
-
- def test_insert_rule_with_wait(self):
- """Test flush without parameters"""
- set_module_args({
- 'chain': 'OUTPUT',
- 'source': '1.2.3.4/32',
- 'destination': '7.8.9.10/42',
- 'jump': 'ACCEPT',
- 'action': 'insert',
- 'wait': '10'
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'OUTPUT',
- '-w',
- '10',
- '-s',
- '1.2.3.4/32',
- '-d',
- '7.8.9.10/42',
- '-j',
- 'ACCEPT'
- ])
-
- def test_comment_position_at_end(self):
- """Test comment position to make sure it is at the end of command"""
- set_module_args({
- 'chain': 'INPUT',
- 'jump': 'ACCEPT',
- 'action': 'insert',
- 'ctstate': ['NEW'],
- 'comment': 'this is a comment',
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t',
- 'filter',
- '-C',
- 'INPUT',
- '-j',
- 'ACCEPT',
- '-m',
- 'conntrack',
- '--ctstate',
- 'NEW',
- '-m',
- 'comment',
- '--comment',
- 'this is a comment'
- ])
- self.assertEqual(run_command.call_args[0][0][14], 'this is a comment')
-
- def test_destination_ports(self):
- """ Test multiport module usage with multiple ports """
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'tcp',
- 'in_interface': 'eth0',
- 'source': '192.168.0.1/32',
- 'destination_ports': ['80', '443', '8081:8085'],
- 'jump': 'ACCEPT',
- 'comment': 'this is a comment',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-C', 'INPUT',
- '-p', 'tcp',
- '-s', '192.168.0.1/32',
- '-j', 'ACCEPT',
- '-m', 'multiport',
- '--dports', '80,443,8081:8085',
- '-i', 'eth0',
- '-m', 'comment',
- '--comment', 'this is a comment'
- ])
-
- def test_match_set(self):
- """ Test match_set together with match_set_flags """
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'tcp',
- 'match_set': 'admin_hosts',
- 'match_set_flags': 'src',
- 'destination_port': '22',
- 'jump': 'ACCEPT',
- 'comment': 'this is a comment',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-C', 'INPUT',
- '-p', 'tcp',
- '-j', 'ACCEPT',
- '--destination-port', '22',
- '-m', 'set',
- '--match-set', 'admin_hosts', 'src',
- '-m', 'comment',
- '--comment', 'this is a comment'
- ])
-
- set_module_args({
- 'chain': 'INPUT',
- 'protocol': 'udp',
- 'match_set': 'banned_hosts',
- 'match_set_flags': 'src,dst',
- 'jump': 'REJECT',
- })
- commands_results = [
- (0, '', ''),
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-C', 'INPUT',
- '-p', 'udp',
- '-j', 'REJECT',
- '-m', 'set',
- '--match-set', 'banned_hosts', 'src,dst'
- ])
-
- def test_chain_creation(self):
- """Test chain creation when absent"""
- set_module_args({
- 'chain': 'FOOBAR',
- 'state': 'present',
- 'chain_management': True,
- })
-
- commands_results = [
- (1, '', ''), # check_chain_present
- (0, '', ''), # create_chain
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
-
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-L', 'FOOBAR',
- ])
-
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-N', 'FOOBAR',
- ])
-
- commands_results = [
- (0, '', ''), # check_rule_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertFalse(result.exception.args[0]['changed'])
-
- def test_chain_creation_check_mode(self):
- """Test chain creation when absent"""
- set_module_args({
- 'chain': 'FOOBAR',
- 'state': 'present',
- 'chain_management': True,
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (1, '', ''), # check_rule_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
-
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-L', 'FOOBAR',
- ])
-
- commands_results = [
- (0, '', ''), # check_rule_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertFalse(result.exception.args[0]['changed'])
-
- def test_chain_deletion(self):
- """Test chain deletion when present"""
- set_module_args({
- 'chain': 'FOOBAR',
- 'state': 'absent',
- 'chain_management': True,
- })
-
- commands_results = [
- (0, '', ''), # check_chain_present
- (0, '', ''), # delete_chain
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 2)
-
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-L', 'FOOBAR',
- ])
-
- self.assertEqual(run_command.call_args_list[1][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-X', 'FOOBAR',
- ])
-
- commands_results = [
- (1, '', ''), # check_rule_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertFalse(result.exception.args[0]['changed'])
-
- def test_chain_deletion_check_mode(self):
- """Test chain deletion when present"""
- set_module_args({
- 'chain': 'FOOBAR',
- 'state': 'absent',
- 'chain_management': True,
- '_ansible_check_mode': True,
- })
-
- commands_results = [
- (0, '', ''), # check_chain_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertTrue(result.exception.args[0]['changed'])
-
- self.assertEqual(run_command.call_count, 1)
-
- self.assertEqual(run_command.call_args_list[0][0][0], [
- '/sbin/iptables',
- '-t', 'filter',
- '-L', 'FOOBAR',
- ])
-
- commands_results = [
- (1, '', ''), # check_rule_present
- ]
-
- with patch.object(basic.AnsibleModule, 'run_command') as run_command:
- run_command.side_effect = commands_results
- with self.assertRaises(AnsibleExitJson) as result:
- iptables.main()
- self.assertFalse(result.exception.args[0]['changed'])
+ id="tcp-flags-dict"
+ ),
+ pytest.param(
+ {
+ "flags": ["ALL"], "flags_set": ["ACK", "RST", "SYN", "FIN"]
+ },
+ id="tcp-flags-list"
+ ),
+ ],
+)
+def test_tcp_flags(mocker, test_input):
+ """Test various ways of inputting tcp_flags."""
+ rule_data = {
+ "chain": "OUTPUT",
+ "protocol": "tcp",
+ "jump": "DROP",
+ "tcp_flags": test_input,
+ }
+
+ set_module_args(rule_data)
+
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "OUTPUT",
+ "-p",
+ "tcp",
+ "--tcp-flags",
+ "ALL",
+ "ACK,RST,SYN,FIN",
+ "-j",
+ "DROP",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+@pytest.mark.parametrize(
+ "log_level",
+ [
+ "0",
+ "1",
+ "2",
+ "3",
+ "4",
+ "5",
+ "6",
+ "7",
+ "emerg",
+ "alert",
+ "crit",
+ "error",
+ "warning",
+ "notice",
+ "info",
+ "debug",
+ ],
+)
+def test_log_level(mocker, log_level):
+ """Test various ways of log level flag."""
+
+ set_module_args(
+ {
+ "chain": "INPUT",
+ "jump": "LOG",
+ "log_level": log_level,
+ "source": "1.2.3.4/32",
+ "log_prefix": "** DROP-this_ip **",
+ }
+ )
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-s",
+ "1.2.3.4/32",
+ "-j",
+ "LOG",
+ "--log-prefix",
+ "** DROP-this_ip **",
+ "--log-level",
+ log_level,
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+@pytest.mark.parametrize(
+ ("test_input", "expected"),
+ [
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "match": ["iprange"],
+ "src_range": "192.168.1.100-192.168.1.199",
+ "jump": "ACCEPT",
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-m",
+ "iprange",
+ "-j",
+ "ACCEPT",
+ "--src-range",
+ "192.168.1.100-192.168.1.199",
+ ],
+ id="src-range",
+ ),
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "src_range": "192.168.1.100-192.168.1.199",
+ "dst_range": "10.0.0.50-10.0.0.100",
+ "jump": "ACCEPT",
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-j",
+ "ACCEPT",
+ "-m",
+ "iprange",
+ "--src-range",
+ "192.168.1.100-192.168.1.199",
+ "--dst-range",
+ "10.0.0.50-10.0.0.100",
+ ],
+ id="src-range-dst-range",
+ ),
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "dst_range": "10.0.0.50-10.0.0.100",
+ "jump": "ACCEPT"
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-j",
+ "ACCEPT",
+ "-m",
+ "iprange",
+ "--dst-range",
+ "10.0.0.50-10.0.0.100",
+ ],
+ id="dst-range"
+ ),
+ ],
+)
+def test_iprange(mocker, test_input, expected):
+ """Test iprange module with its flags src_range and dst_range."""
+ set_module_args(test_input)
+
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == expected
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_insert_rule_with_wait(mocker):
+ """Test flush without parameters."""
+ set_module_args(
+ {
+ "chain": "OUTPUT",
+ "source": "1.2.3.4/32",
+ "destination": "7.8.9.10/42",
+ "jump": "ACCEPT",
+ "action": "insert",
+ "wait": "10",
+ }
+ )
+
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "OUTPUT",
+ "-w",
+ "10",
+ "-s",
+ "1.2.3.4/32",
+ "-d",
+ "7.8.9.10/42",
+ "-j",
+ "ACCEPT",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_comment_position_at_end(mocker):
+ """Test comment position to make sure it is at the end of command."""
+ set_module_args(
+ {
+ "chain": "INPUT",
+ "jump": "ACCEPT",
+ "action": "insert",
+ "ctstate": ["NEW"],
+ "comment": "this is a comment",
+ "_ansible_check_mode": True,
+ }
+ )
+
+ commands_results = [
+ (0, "", ""),
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-j",
+ "ACCEPT",
+ "-m",
+ "conntrack",
+ "--ctstate",
+ "NEW",
+ "-m",
+ "comment",
+ "--comment",
+ "this is a comment",
+ ]
+ assert run_command.call_args[0][0][14] == "this is a comment"
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_destination_ports(mocker):
+ """Test multiport module usage with multiple ports."""
+ set_module_args(
+ {
+ "chain": "INPUT",
+ "protocol": "tcp",
+ "in_interface": "eth0",
+ "source": "192.168.0.1/32",
+ "destination_ports": ["80", "443", "8081:8085"],
+ "jump": "ACCEPT",
+ "comment": "this is a comment",
+ }
+ )
+ commands_results = [
+ (0, "", ""),
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "tcp",
+ "-s",
+ "192.168.0.1/32",
+ "-j",
+ "ACCEPT",
+ "-m",
+ "multiport",
+ "--dports",
+ "80,443,8081:8085",
+ "-i",
+ "eth0",
+ "-m",
+ "comment",
+ "--comment",
+ "this is a comment",
+ ]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+@pytest.mark.parametrize(
+ ("test_input", "expected"),
+ [
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "protocol": "tcp",
+ "match_set": "admin_hosts",
+ "match_set_flags": "src",
+ "destination_port": "22",
+ "jump": "ACCEPT",
+ "comment": "this is a comment",
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "tcp",
+ "-j",
+ "ACCEPT",
+ "--destination-port",
+ "22",
+ "-m",
+ "set",
+ "--match-set",
+ "admin_hosts",
+ "src",
+ "-m",
+ "comment",
+ "--comment",
+ "this is a comment",
+ ],
+ id="match-set-src",
+ ),
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "protocol": "udp",
+ "match_set": "banned_hosts",
+ "match_set_flags": "src,dst",
+ "jump": "REJECT",
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "udp",
+ "-j",
+ "REJECT",
+ "-m",
+ "set",
+ "--match-set",
+ "banned_hosts",
+ "src,dst",
+ ],
+ id="match-set-src-dst",
+ ),
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "protocol": "udp",
+ "match_set": "banned_hosts_dst",
+ "match_set_flags": "dst,dst",
+ "jump": "REJECT",
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "udp",
+ "-j",
+ "REJECT",
+ "-m",
+ "set",
+ "--match-set",
+ "banned_hosts_dst",
+ "dst,dst",
+ ],
+ id="match-set-dst-dst",
+ ),
+ pytest.param(
+ {
+ "chain": "INPUT",
+ "protocol": "udp",
+ "match_set": "banned_hosts",
+ "match_set_flags": "src,src",
+ "jump": "REJECT",
+ },
+ [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-C",
+ "INPUT",
+ "-p",
+ "udp",
+ "-j",
+ "REJECT",
+ "-m",
+ "set",
+ "--match-set",
+ "banned_hosts",
+ "src,src",
+ ],
+ id="match-set-src-src",
+ ),
+ ],
+)
+def test_match_set(mocker, test_input, expected):
+ """Test match_set together with match_set_flags."""
+ set_module_args(test_input)
+ commands_results = [
+ (0, "", ""),
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(SystemExit):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == expected
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_chain_creation(mocker):
+ """Test chain creation when absent."""
+ set_module_args(
+ {
+ "chain": "FOOBAR",
+ "state": "present",
+ "chain_management": True,
+ }
+ )
+
+ commands_results = [
+ (1, "", ""), # check_chain_present
+ (0, "", ""), # create_chain
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.exit_json",
+ side_effect=exit_json,
+ )
+ with pytest.raises(AnsibleExitJson):
+ iptables.main()
+
+ assert run_command.call_count == 2
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-L",
+ "FOOBAR",
+ ]
+
+ second_cmd_args_list = run_command.call_args_list[1]
+ assert second_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-N",
+ "FOOBAR",
+ ]
+
+ commands_results = [
+ (0, "", ""), # check_rule_present
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ iptables.main()
+ assert not exc.value.args[0]["changed"]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_chain_creation_check_mode(mocker):
+ """Test chain creation when absent in check mode."""
+ set_module_args(
+ {
+ "chain": "FOOBAR",
+ "state": "present",
+ "chain_management": True,
+ "_ansible_check_mode": True,
+ }
+ )
+
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.exit_json",
+ side_effect=exit_json,
+ )
+ with pytest.raises(AnsibleExitJson):
+ iptables.main()
+
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-L",
+ "FOOBAR",
+ ]
+
+ commands_results = [
+ (0, "", ""), # check_rule_present
+ ]
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+ with pytest.raises(AnsibleExitJson) as exc:
+ iptables.main()
+
+ assert not exc.value.args[0]["changed"]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_chain_deletion(mocker):
+ """Test chain deletion when present."""
+ set_module_args(
+ {
+ "chain": "FOOBAR",
+ "state": "absent",
+ "chain_management": True,
+ }
+ )
+
+ commands_results = [
+ (0, "", ""), # check_chain_present
+ (0, "", ""), # delete_chain
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.exit_json",
+ side_effect=exit_json,
+ )
+ with pytest.raises(AnsibleExitJson) as exc:
+ iptables.main()
+
+ assert exc.value.args[0]["changed"]
+ assert run_command.call_count == 2
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-L",
+ "FOOBAR",
+ ]
+ second_cmd_args_list = run_command.call_args_list[1]
+ assert second_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-X",
+ "FOOBAR",
+ ]
+
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ iptables.main()
+
+ assert not exc.value.args[0]["changed"]
+
+
+@pytest.mark.usefixtures('_mock_basic_commands')
+def test_chain_deletion_check_mode(mocker):
+ """Test chain deletion when present in check mode."""
+ set_module_args(
+ {
+ "chain": "FOOBAR",
+ "state": "absent",
+ "chain_management": True,
+ "_ansible_check_mode": True,
+ }
+ )
+
+ commands_results = [
+ (0, "", ""), # check_chain_present
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.exit_json",
+ side_effect=exit_json,
+ )
+ with pytest.raises(AnsibleExitJson) as exc:
+ iptables.main()
+
+ assert exc.value.args[0]["changed"]
+ assert run_command.call_count == 1
+ first_cmd_args_list = run_command.call_args_list[0]
+ assert first_cmd_args_list[0][0] == [
+ IPTABLES_CMD,
+ "-t",
+ "filter",
+ "-L",
+ "FOOBAR",
+ ]
+
+ commands_results = [
+ (1, "", ""), # check_rule_present
+ ]
+
+ run_command = mocker.patch(
+ "ansible.module_utils.basic.AnsibleModule.run_command",
+ side_effect=commands_results,
+ )
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ iptables.main()
+
+ assert not exc.value.args[0]["changed"]
diff --git a/test/units/modules/test_known_hosts.py b/test/units/modules/test_known_hosts.py
index 667f3e5..f98c998 100644
--- a/test/units/modules/test_known_hosts.py
+++ b/test/units/modules/test_known_hosts.py
@@ -1,11 +1,10 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import os
import tempfile
from ansible.module_utils import basic
-from units.compat import unittest
+import unittest
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.basic import AnsibleModule
diff --git a/test/units/modules/test_pip.py b/test/units/modules/test_pip.py
index 5640b80..7ddee22 100644
--- a/test/units/modules/test_pip.py
+++ b/test/units/modules/test_pip.py
@@ -1,7 +1,6 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import json
diff --git a/test/units/modules/test_service.py b/test/units/modules/test_service.py
index caabd74..684645e 100644
--- a/test/units/modules/test_service.py
+++ b/test/units/modules/test_service.py
@@ -2,9 +2,8 @@
# Copyright: (c) 2021, Abhijeet Kasurde <akasurde@redhat.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import absolute_import, division, print_function
+from __future__ import annotations
-__metaclass__ = type
import json
import platform
@@ -12,7 +11,6 @@ import platform
import pytest
from ansible.modules import service
from ansible.module_utils.basic import AnsibleModule
-from ansible.module_utils.six import PY2
from units.modules.utils import set_module_args
@@ -29,7 +27,7 @@ def mocker_sunos_service(mocker):
# Read a mocked /etc/release file
mocked_etc_release_data = mocker.mock_open(
read_data=" Oracle Solaris 12.0")
- builtin_open = "__builtin__.open" if PY2 else "builtins.open"
+ builtin_open = "builtins.open"
mocker.patch(builtin_open, mocked_etc_release_data)
service_status = mocker.patch.object(
diff --git a/test/units/modules/test_service_facts.py b/test/units/modules/test_service_facts.py
index 07f6827..6917b8b 100644
--- a/test/units/modules/test_service_facts.py
+++ b/test/units/modules/test_service_facts.py
@@ -2,11 +2,10 @@
# Copyright (c) 2020 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
+from __future__ import annotations
-from units.compat import unittest
-from units.compat.mock import patch
+import unittest
+from unittest.mock import patch
from ansible.module_utils import basic
from ansible.modules.service_facts import AIXScanService
diff --git a/test/units/modules/test_systemd.py b/test/units/modules/test_systemd.py
index 52c212a..1c5339c 100644
--- a/test/units/modules/test_systemd.py
+++ b/test/units/modules/test_systemd.py
@@ -1,7 +1,6 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
-from units.compat import unittest
+import unittest
from ansible.modules.systemd import parse_systemctl_show
diff --git a/test/units/modules/test_unarchive.py b/test/units/modules/test_unarchive.py
index 935231b..e66d0a1 100644
--- a/test/units/modules/test_unarchive.py
+++ b/test/units/modules/test_unarchive.py
@@ -1,6 +1,4 @@
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
+from __future__ import annotations
import pytest
diff --git a/test/units/modules/test_uri.py b/test/units/modules/test_uri.py
new file mode 100644
index 0000000..2aeb464
--- /dev/null
+++ b/test/units/modules/test_uri.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Copyright:
+# (c) 2023 Ansible Project
+# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
+from ansible.modules import uri
+
+
+class TestUri(ModuleTestCase):
+
+ def test_main_no_args(self):
+ """Module must fail if called with no args."""
+ with self.assertRaises(AnsibleFailJson):
+ set_module_args({})
+ uri.main()
+
+ def test_main_no_force(self):
+ """The "force" parameter to fetch_url() must be absent or false when the module is called without "force"."""
+ set_module_args({"url": "http://example.com/"})
+ resp = MagicMock()
+ resp.headers.get_content_type.return_value = "text/html"
+ info = {"url": "http://example.com/", "status": 200}
+ with patch.object(uri, "fetch_url", return_value=(resp, info)) as fetch_url:
+ with self.assertRaises(AnsibleExitJson):
+ uri.main()
+ fetch_url.assert_called_once()
+ assert not fetch_url.call_args[1].get("force")
+
+ def test_main_force(self):
+ """The "force" parameter to fetch_url() must be true when the module is called with "force"."""
+ set_module_args({"url": "http://example.com/", "force": True})
+ resp = MagicMock()
+ resp.headers.get_content_type.return_value = "text/html"
+ info = {"url": "http://example.com/", "status": 200}
+ with patch.object(uri, "fetch_url", return_value=(resp, info)) as fetch_url:
+ with self.assertRaises(AnsibleExitJson):
+ uri.main()
+ fetch_url.assert_called_once()
+ assert fetch_url.call_args[1].get("force")
diff --git a/test/units/modules/test_yum.py b/test/units/modules/test_yum.py
deleted file mode 100644
index 8052eff..0000000
--- a/test/units/modules/test_yum.py
+++ /dev/null
@@ -1,222 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-from units.compat import unittest
-
-from ansible.modules.yum import YumModule
-
-
-yum_plugin_load_error = """
-Plugin "product-id" can't be imported
-Plugin "search-disabled-repos" can't be imported
-Plugin "subscription-manager" can't be imported
-Plugin "product-id" can't be imported
-Plugin "search-disabled-repos" can't be imported
-Plugin "subscription-manager" can't be imported
-"""
-
-# from https://github.com/ansible/ansible/issues/20608#issuecomment-276106505
-wrapped_output_1 = """
-Загружены модули: fastestmirror
-Loading mirror speeds from cached hostfile
- * base: mirror.h1host.ru
- * extras: mirror.h1host.ru
- * updates: mirror.h1host.ru
-
-vms-agent.x86_64 0.0-9 dev
-"""
-
-# from https://github.com/ansible/ansible/issues/20608#issuecomment-276971275
-wrapped_output_2 = """
-Загружены модули: fastestmirror
-Loading mirror speeds from cached hostfile
- * base: mirror.corbina.net
- * extras: mirror.corbina.net
- * updates: mirror.corbina.net
-
-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty.x86_64
- 0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1-0
- addons
-libtiff.x86_64 4.0.3-27.el7_3 updates
-"""
-
-# From https://github.com/ansible/ansible/issues/20608#issuecomment-276698431
-wrapped_output_3 = """
-Loaded plugins: fastestmirror, langpacks
-Loading mirror speeds from cached hostfile
-
-ceph.x86_64 1:11.2.0-0.el7 ceph
-ceph-base.x86_64 1:11.2.0-0.el7 ceph
-ceph-common.x86_64 1:11.2.0-0.el7 ceph
-ceph-mds.x86_64 1:11.2.0-0.el7 ceph
-ceph-mon.x86_64 1:11.2.0-0.el7 ceph
-ceph-osd.x86_64 1:11.2.0-0.el7 ceph
-ceph-selinux.x86_64 1:11.2.0-0.el7 ceph
-libcephfs1.x86_64 1:11.0.2-0.el7 ceph
-librados2.x86_64 1:11.2.0-0.el7 ceph
-libradosstriper1.x86_64 1:11.2.0-0.el7 ceph
-librbd1.x86_64 1:11.2.0-0.el7 ceph
-librgw2.x86_64 1:11.2.0-0.el7 ceph
-python-cephfs.x86_64 1:11.2.0-0.el7 ceph
-python-rados.x86_64 1:11.2.0-0.el7 ceph
-python-rbd.x86_64 1:11.2.0-0.el7 ceph
-"""
-
-# from https://github.com/ansible/ansible-modules-core/issues/4318#issuecomment-251416661
-wrapped_output_4 = """
-ipxe-roms-qemu.noarch 20160127-1.git6366fa7a.el7
- rhelosp-9.0-director-puddle
-quota.x86_64 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z
-quota-nls.noarch 1:4.01-11.el7_2.1 rhelosp-rhel-7.2-z
-rdma.noarch 7.2_4.1_rc6-2.el7 rhelosp-rhel-7.2-z
-screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2
- rhelosp-rhel-7.2-z
-sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle
-sssd-client.x86_64 1.13.0-40.el7_2.12 rhelosp-rhel-7.2-z
-"""
-
-
-# A 'normal-ish' yum check-update output, without any wrapped lines
-unwrapped_output_rhel7 = """
-
-Loaded plugins: etckeeper, product-id, search-disabled-repos, subscription-
- : manager
-This system is not registered to Red Hat Subscription Management. You can use subscription-manager to register.
-
-NetworkManager-openvpn.x86_64 1:1.2.6-1.el7 epel
-NetworkManager-openvpn-gnome.x86_64 1:1.2.6-1.el7 epel
-cabal-install.x86_64 1.16.1.0-2.el7 epel
-cgit.x86_64 1.1-1.el7 epel
-python34-libs.x86_64 3.4.5-3.el7 epel
-python34-test.x86_64 3.4.5-3.el7 epel
-python34-tkinter.x86_64 3.4.5-3.el7 epel
-python34-tools.x86_64 3.4.5-3.el7 epel
-qgit.x86_64 2.6-4.el7 epel
-rdiff-backup.x86_64 1.2.8-12.el7 epel
-stoken-libs.x86_64 0.91-1.el7 epel
-xlockmore.x86_64 5.49-2.el7 epel
-"""
-
-# Some wrapped obsoletes for prepending to output for testing both
-wrapped_output_rhel7_obsoletes_postfix = """
-Obsoleting Packages
-ddashboard.x86_64 0.2.0.1-1.el7_3 mhlavink-developerdashboard
- developerdashboard.x86_64 0.1.12.2-1.el7_2 @mhlavink-developerdashboard
-python-bugzilla.noarch 1.2.2-3.el7_2.1 mhlavink-developerdashboard
- python-bugzilla-develdashboardfixes.noarch
- 1.2.2-3.el7 @mhlavink-developerdashboard
-python2-futures.noarch 3.0.5-1.el7 epel
- python-futures.noarch 3.0.3-1.el7 @epel
-python2-pip.noarch 8.1.2-5.el7 epel
- python-pip.noarch 7.1.0-1.el7 @epel
-python2-pyxdg.noarch 0.25-6.el7 epel
- pyxdg.noarch 0.25-5.el7 @epel
-python2-simplejson.x86_64 3.10.0-1.el7 epel
- python-simplejson.x86_64 3.3.3-1.el7 @epel
-Security: kernel-3.10.0-327.28.2.el7.x86_64 is an installed security update
-Security: kernel-3.10.0-327.22.2.el7.x86_64 is the currently running version
-"""
-
-wrapped_output_multiple_empty_lines = """
-Loaded plugins: langpacks, product-id, search-disabled-repos, subscription-manager
-
-This system is not registered with an entitlement server. You can use subscription-manager to register.
-
-
-screen.x86_64 4.1.0-0.23.20120314git3c2946.el7_2
- rhelosp-rhel-7.2-z
-sos.noarch 3.2-36.el7ost.2 rhelosp-9.0-puddle
-"""
-
-longname = """
-Loaded plugins: fastestmirror, priorities, rhnplugin
-This system is receiving updates from RHN Classic or Red Hat Satellite.
-Loading mirror speeds from cached hostfile
-
-xxxxxxxxxxxxxxxxxxxxxxxxxx.noarch
- 1.16-1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-glibc.x86_64 2.17-157.el7_3.1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"""
-
-
-unwrapped_output_rhel7_obsoletes = unwrapped_output_rhel7 + wrapped_output_rhel7_obsoletes_postfix
-unwrapped_output_rhel7_expected_new_obsoletes_pkgs = [
- "ddashboard", "python-bugzilla", "python2-futures", "python2-pip",
- "python2-pyxdg", "python2-simplejson"
-]
-unwrapped_output_rhel7_expected_old_obsoletes_pkgs = [
- "developerdashboard", "python-bugzilla-develdashboardfixes",
- "python-futures", "python-pip", "pyxdg", "python-simplejson"
-]
-unwrapped_output_rhel7_expected_updated_pkgs = [
- "NetworkManager-openvpn", "NetworkManager-openvpn-gnome", "cabal-install",
- "cgit", "python34-libs", "python34-test", "python34-tkinter",
- "python34-tools", "qgit", "rdiff-backup", "stoken-libs", "xlockmore"
-]
-
-
-class TestYumUpdateCheckParse(unittest.TestCase):
- def _assert_expected(self, expected_pkgs, result):
-
- for expected_pkg in expected_pkgs:
- self.assertIn(expected_pkg, result)
- self.assertEqual(len(result), len(expected_pkgs))
- self.assertIsInstance(result, dict)
-
- def test_empty_output(self):
- res, obs = YumModule.parse_check_update("")
- expected_pkgs = []
- self._assert_expected(expected_pkgs, res)
-
- def test_longname(self):
- res, obs = YumModule.parse_check_update(longname)
- expected_pkgs = ['xxxxxxxxxxxxxxxxxxxxxxxxxx', 'glibc']
- self._assert_expected(expected_pkgs, res)
-
- def test_plugin_load_error(self):
- res, obs = YumModule.parse_check_update(yum_plugin_load_error)
- expected_pkgs = []
- self._assert_expected(expected_pkgs, res)
-
- def test_wrapped_output_1(self):
- res, obs = YumModule.parse_check_update(wrapped_output_1)
- expected_pkgs = ["vms-agent"]
- self._assert_expected(expected_pkgs, res)
-
- def test_wrapped_output_2(self):
- res, obs = YumModule.parse_check_update(wrapped_output_2)
- expected_pkgs = ["empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty-empty",
- "libtiff"]
-
- self._assert_expected(expected_pkgs, res)
-
- def test_wrapped_output_3(self):
- res, obs = YumModule.parse_check_update(wrapped_output_3)
- expected_pkgs = ["ceph", "ceph-base", "ceph-common", "ceph-mds",
- "ceph-mon", "ceph-osd", "ceph-selinux", "libcephfs1",
- "librados2", "libradosstriper1", "librbd1", "librgw2",
- "python-cephfs", "python-rados", "python-rbd"]
- self._assert_expected(expected_pkgs, res)
-
- def test_wrapped_output_4(self):
- res, obs = YumModule.parse_check_update(wrapped_output_4)
-
- expected_pkgs = ["ipxe-roms-qemu", "quota", "quota-nls", "rdma", "screen",
- "sos", "sssd-client"]
- self._assert_expected(expected_pkgs, res)
-
- def test_wrapped_output_rhel7(self):
- res, obs = YumModule.parse_check_update(unwrapped_output_rhel7)
- self._assert_expected(unwrapped_output_rhel7_expected_updated_pkgs, res)
-
- def test_wrapped_output_rhel7_obsoletes(self):
- res, obs = YumModule.parse_check_update(unwrapped_output_rhel7_obsoletes)
- self._assert_expected(
- unwrapped_output_rhel7_expected_updated_pkgs + unwrapped_output_rhel7_expected_new_obsoletes_pkgs,
- res
- )
- self._assert_expected(unwrapped_output_rhel7_expected_old_obsoletes_pkgs, obs)
-
- def test_wrapped_output_multiple_empty_lines(self):
- res, obs = YumModule.parse_check_update(wrapped_output_multiple_empty_lines)
- self._assert_expected(['screen', 'sos'], res)
diff --git a/test/units/modules/utils.py b/test/units/modules/utils.py
index b56229e..4e83d1f 100644
--- a/test/units/modules/utils.py
+++ b/test/units/modules/utils.py
@@ -1,10 +1,9 @@
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import json
-from units.compat import unittest
-from units.compat.mock import patch
+import unittest
+from unittest.mock import patch
from ansible.module_utils import basic
from ansible.module_utils.common.text.converters import to_bytes