summaryrefslogtreecommitdiffstats
path: root/collections-debian-merged/ansible_collections/community/hrobot/tests
diff options
context:
space:
mode:
Diffstat (limited to 'collections-debian-merged/ansible_collections/community/hrobot/tests')
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml2
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt0
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt0
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt0
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py251
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py188
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py161
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py244
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py71
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py1290
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py280
-rw-r--r--collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt5
12 files changed, 2492 insertions, 0 deletions
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml b/collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml
new file mode 100644
index 00000000..e2d8cdda
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml
@@ -0,0 +1,2 @@
+unit_tests_dependencies:
+- community.internal_test_tools
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py
new file mode 100644
index 00000000..098be7e5
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py
@@ -0,0 +1,251 @@
+# Copyright (c), Felix Fontein <felix@fontein.de>, 2020
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+import json
+import textwrap
+
+import pytest
+
+from mock import MagicMock
+
+from ansible import constants as C
+from ansible.errors import AnsibleError
+from ansible.inventory.data import InventoryData
+from ansible.inventory.manager import InventoryManager
+
+from ansible_collections.community.internal_test_tools.tests.unit.mock.path import mock_unfrackpath_noop
+from ansible_collections.community.internal_test_tools.tests.unit.mock.loader import DictDataLoader
+from ansible_collections.community.internal_test_tools.tests.unit.utils.open_url_framework import (
+ OpenUrlCall,
+ OpenUrlProxy,
+)
+
+from ansible_collections.community.hrobot.plugins.inventory.robot import InventoryModule
+from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL
+
+
+@pytest.fixture(scope="module")
+def inventory():
+ r = InventoryModule()
+ r.inventory = InventoryData()
+ return r
+
+
+def get_option(option):
+ if option == 'filters':
+ return {}
+ if option == 'hetzner_user':
+ return 'test'
+ if option == 'hetzner_password':
+ return 'hunter2'
+ return False
+
+
+def test_populate(inventory, mocker):
+ open_url = OpenUrlProxy([
+ OpenUrlCall('GET', 200)
+ .result_json([
+ {
+ 'server': {
+ 'server_ip': '1.2.3.4',
+ },
+ },
+ {
+ 'server': {
+ 'server_ip': '1.2.3.5',
+ 'server_name': 'test-server',
+ },
+ },
+ ])
+ .expect_url('{0}/server'.format(BASE_URL)),
+ ])
+ mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url)
+
+ inventory.get_option = mocker.MagicMock(side_effect=get_option)
+ inventory.populate(inventory.get_servers())
+
+ open_url.assert_is_done()
+
+ host_1 = inventory.inventory.get_host('1.2.3.4')
+ host_2 = inventory.inventory.get_host('test-server')
+
+ host_1_vars = host_1.get_vars()
+ host_2_vars = host_2.get_vars()
+
+ assert host_1_vars['ansible_host'] == '1.2.3.4'
+ assert host_1_vars['hrobot_server_ip'] == '1.2.3.4'
+ assert 'hrobot_server_name' not in host_1_vars
+ assert host_2_vars['ansible_host'] == '1.2.3.5'
+ assert host_2_vars['hrobot_server_ip'] == '1.2.3.5'
+ assert host_2_vars['hrobot_server_name'] == 'test-server'
+
+
+def test_inventory_file_simple(mocker):
+ open_url = OpenUrlProxy([
+ OpenUrlCall('GET', 200)
+ .result_json([
+ {
+ 'server': {
+ 'server_ip': '1.2.3.4',
+ 'dc': 'foo',
+ },
+ },
+ {
+ 'server': {
+ 'server_ip': '1.2.3.5',
+ 'server_name': 'test-server',
+ 'dc': 'foo',
+ },
+ },
+ {
+ 'server': {
+ 'server_ip': '1.2.3.6',
+ 'server_name': 'test-server-2',
+ 'dc': 'bar',
+ },
+ },
+ ])
+ .expect_url('{0}/server'.format(BASE_URL)),
+ ])
+ mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url)
+ mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop)
+ mocker.patch('os.path.exists', lambda x: True)
+ mocker.patch('os.access', lambda x, y: True)
+
+ inventory_filename = "test.robot.yaml"
+ C.INVENTORY_ENABLED = ['community.hrobot.robot']
+ inventory_file = {inventory_filename: textwrap.dedent("""\
+ ---
+ plugin: community.hrobot.robot
+ hetzner_user: test
+ hetzner_password: hunter2
+ filters:
+ dc: foo
+ """)}
+ im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename)
+ open_url.assert_is_done()
+
+ assert im._inventory.hosts
+ assert '1.2.3.4' in im._inventory.hosts
+ assert 'test-server' in im._inventory.hosts
+ assert 'test-server-2' not in im._inventory.hosts
+ assert im._inventory.get_host('1.2.3.4') in im._inventory.groups['ungrouped'].hosts
+ assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts
+ assert len(im._inventory.groups['ungrouped'].hosts) == 2
+ assert len(im._inventory.groups['all'].hosts) == 0
+
+
+@pytest.mark.parametrize("error_result", [
+ None,
+ json.dumps(dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ ), sort_keys=True).encode('utf-8')
+])
+def test_inventory_file_fail(mocker, error_result):
+ open_url = OpenUrlProxy([
+ OpenUrlCall('GET', 200)
+ .result_error(error_result)
+ .expect_url('{0}/server'.format(BASE_URL)),
+ ])
+ mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url)
+ mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop)
+ mocker.patch('os.path.exists', lambda x: True)
+ mocker.patch('os.access', lambda x, y: True)
+
+ inventory_filename = "test.robot.yml"
+ C.INVENTORY_ENABLED = ['community.hrobot.robot']
+ inventory_file = {inventory_filename: textwrap.dedent("""\
+ ---
+ plugin: community.hrobot.robot
+ hetzner_user: test
+ hetzner_password: hunter2
+ filters:
+ dc: foo
+ """)}
+ im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename)
+ open_url.assert_is_done()
+
+ assert not im._inventory.hosts
+ assert '1.2.3.4' not in im._inventory.hosts
+ assert 'test-server' not in im._inventory.hosts
+ assert 'test-server-2' not in im._inventory.hosts
+ assert len(im._inventory.groups['ungrouped'].hosts) == 0
+ assert len(im._inventory.groups['all'].hosts) == 0
+
+
+def test_inventory_wrong_file(mocker):
+ open_url = OpenUrlProxy([])
+ mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url)
+ mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop)
+ mocker.patch('os.path.exists', lambda x: True)
+ mocker.patch('os.access', lambda x, y: True)
+
+ inventory_filename = "test.bobot.yml"
+ C.INVENTORY_ENABLED = ['community.hrobot.robot']
+ inventory_file = {inventory_filename: textwrap.dedent("""\
+ ---
+ plugin: community.hrobot.robot
+ hetzner_user: test
+ hetzner_password: hunter2
+ """)}
+ im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename)
+ open_url.assert_is_done()
+
+ assert not im._inventory.hosts
+ assert '1.2.3.4' not in im._inventory.hosts
+ assert 'test-server' not in im._inventory.hosts
+ assert 'test-server-2' not in im._inventory.hosts
+ assert len(im._inventory.groups['ungrouped'].hosts) == 0
+ assert len(im._inventory.groups['all'].hosts) == 0
+
+
+def test_inventory_file_collision(mocker):
+ open_url = OpenUrlProxy([
+ OpenUrlCall('GET', 200)
+ .result_json([
+ {
+ 'server': {
+ 'server_ip': '1.2.3.4',
+ 'server_name': 'test-server',
+ },
+ },
+ {
+ 'server': {
+ 'server_ip': '1.2.3.5',
+ 'server_name': 'test-server',
+ },
+ },
+ ])
+ .expect_url('{0}/server'.format(BASE_URL)),
+ ])
+ mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url)
+ mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop)
+ mocker.patch('os.path.exists', lambda x: True)
+ mocker.patch('os.access', lambda x, y: True)
+
+ inventory_filename = "test.robot.yaml"
+ C.INVENTORY_ENABLED = ['community.hrobot.robot']
+ inventory_file = {inventory_filename: textwrap.dedent("""\
+ ---
+ plugin: community.hrobot.robot
+ hetzner_user: test
+ hetzner_password: hunter2
+ """)}
+ im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename)
+ open_url.assert_is_done()
+
+ assert im._inventory.hosts
+ assert 'test-server' in im._inventory.hosts
+ assert im._inventory.get_host('test-server').get_vars()['ansible_host'] == '1.2.3.4'
+ assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts
+ assert len(im._inventory.groups['ungrouped'].hosts) == 1
+ assert len(im._inventory.groups['all'].hosts) == 0
+ # TODO: check for warning
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py
new file mode 100644
index 00000000..6a2b784e
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py
@@ -0,0 +1,188 @@
+# Copyright: (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import copy
+import json
+import pytest
+
+from mock import MagicMock
+from ansible_collections.community.hrobot.plugins.module_utils import robot
+from ansible_collections.community.hrobot.plugins.module_utils import failover
+
+
+class ModuleFailException(Exception):
+ def __init__(self, msg, **kwargs):
+ super(ModuleFailException, self).__init__(msg)
+ self.fail_msg = msg
+ self.fail_kwargs = kwargs
+
+
+def get_module_mock():
+ def f(msg, **kwargs):
+ raise ModuleFailException(msg, **kwargs)
+
+ module = MagicMock()
+ module.fail_json = f
+ module.from_json = json.loads
+ return module
+
+
+# ########################################################################################
+
+GET_FAILOVER_SUCCESS = [
+ (
+ '1.2.3.4',
+ (None, dict(
+ body=json.dumps(dict(
+ failover=dict(
+ active_server_ip='1.1.1.1',
+ ip='1.2.3.4',
+ netmask='255.255.255.255',
+ )
+ )).encode('utf-8'),
+ )),
+ '1.1.1.1',
+ dict(
+ active_server_ip='1.1.1.1',
+ ip='1.2.3.4',
+ netmask='255.255.255.255',
+ )
+ ),
+]
+
+
+GET_FAILOVER_FAIL = [
+ (
+ '1.2.3.4',
+ (None, dict(
+ body=json.dumps(dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ )).encode('utf-8'),
+ )),
+ 'Request failed: 400 foo (bar)'
+ ),
+ (
+ '1.2.3.4',
+ (None, dict(
+ body='{"foo": "bar"}'.encode('utf-8'),
+ )),
+ 'Cannot interpret result: {"foo": "bar"}'
+ ),
+]
+
+
+@pytest.mark.parametrize("ip, return_value, result, record", GET_FAILOVER_SUCCESS)
+def test_get_failover_record(monkeypatch, ip, return_value, result, record):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value))
+
+ assert failover.get_failover_record(module, ip) == record
+
+
+@pytest.mark.parametrize("ip, return_value, result", GET_FAILOVER_FAIL)
+def test_get_failover_record_fail(monkeypatch, ip, return_value, result):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value))
+
+ with pytest.raises(ModuleFailException) as exc:
+ failover.get_failover_record(module, ip)
+
+ assert exc.value.fail_msg == result
+ assert exc.value.fail_kwargs == dict()
+
+
+@pytest.mark.parametrize("ip, return_value, result, record", GET_FAILOVER_SUCCESS)
+def test_get_failover(monkeypatch, ip, return_value, result, record):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value))
+
+ assert failover.get_failover(module, ip) == result
+
+
+@pytest.mark.parametrize("ip, return_value, result", GET_FAILOVER_FAIL)
+def test_get_failover_fail(monkeypatch, ip, return_value, result):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value))
+
+ with pytest.raises(ModuleFailException) as exc:
+ failover.get_failover(module, ip)
+
+ assert exc.value.fail_msg == result
+ assert exc.value.fail_kwargs == dict()
+
+
+# ########################################################################################
+
+SET_FAILOVER_SUCCESS = [
+ (
+ '1.2.3.4',
+ '1.1.1.1',
+ (None, dict(
+ body=json.dumps(dict(
+ failover=dict(
+ active_server_ip='1.1.1.2',
+ )
+ )).encode('utf-8'),
+ )),
+ ('1.1.1.2', True)
+ ),
+ (
+ '1.2.3.4',
+ '1.1.1.1',
+ (None, dict(
+ body=json.dumps(dict(
+ error=dict(
+ code="FAILOVER_ALREADY_ROUTED",
+ status=400,
+ message="Failover already routed",
+ ),
+ )).encode('utf-8'),
+ )),
+ ('1.1.1.1', False)
+ ),
+]
+
+
+SET_FAILOVER_FAIL = [
+ (
+ '1.2.3.4',
+ '1.1.1.1',
+ (None, dict(
+ body=json.dumps(dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ )).encode('utf-8'),
+ )),
+ 'Request failed: 400 foo (bar)'
+ ),
+]
+
+
+@pytest.mark.parametrize("ip, value, return_value, result", SET_FAILOVER_SUCCESS)
+def test_set_failover(monkeypatch, ip, value, return_value, result):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value))
+
+ assert failover.set_failover(module, ip, value) == result
+
+
+@pytest.mark.parametrize("ip, value, return_value, result", SET_FAILOVER_FAIL)
+def test_set_failover_fail(monkeypatch, ip, value, return_value, result):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value))
+
+ with pytest.raises(ModuleFailException) as exc:
+ failover.set_failover(module, ip, value)
+
+ assert exc.value.fail_msg == result
+ assert exc.value.fail_kwargs == dict()
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py
new file mode 100644
index 00000000..3ef71c29
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py
@@ -0,0 +1,161 @@
+# Copyright: (c) 2017 Ansible Project
+# Copyright (c), Felix Fontein <felix@fontein.de>, 2019-2020
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import copy
+import json
+import pytest
+
+from mock import MagicMock
+from ansible_collections.community.hrobot.plugins.module_utils import robot
+
+
+class ModuleFailException(Exception):
+ def __init__(self, msg, **kwargs):
+ super(ModuleFailException, self).__init__(msg)
+ self.fail_msg = msg
+ self.fail_kwargs = kwargs
+
+
+def get_module_mock():
+ def f(msg, **kwargs):
+ raise ModuleFailException(msg, **kwargs)
+
+ module = MagicMock()
+ module.fail_json = f
+ module.from_json = json.loads
+ return module
+
+
+# ########################################################################################
+
+FETCH_URL_JSON_SUCCESS = [
+ (
+ (None, dict(
+ body=json.dumps(dict(
+ a='b'
+ )).encode('utf-8'),
+ )),
+ None,
+ (dict(
+ a='b'
+ ), None)
+ ),
+ (
+ (None, dict(
+ body=json.dumps(dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ a='b'
+ )).encode('utf-8'),
+ )),
+ ['foo'],
+ (dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ a='b'
+ ), 'foo')
+ ),
+]
+
+
+FETCH_URL_JSON_FAIL = [
+ (
+ (None, dict(
+ body=json.dumps(dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ )).encode('utf-8'),
+ )),
+ None,
+ 'Request failed: 400 foo (bar)'
+ ),
+ (
+ (None, dict(
+ body=json.dumps(dict(
+ error=dict(
+ code="foo",
+ status=400,
+ message="bar",
+ ),
+ )).encode('utf-8'),
+ )),
+ ['bar'],
+ 'Request failed: 400 foo (bar)'
+ ),
+ (
+ (None, dict(body='{this is not json}'.encode('utf-8'))),
+ [],
+ 'Cannot decode content retrieved from https://foo/bar'
+ ),
+ (
+ (None, dict()),
+ [],
+ 'Cannot retrieve content from https://foo/bar'
+ ),
+]
+
+
+@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_SUCCESS)
+def test_fetch_url_json(monkeypatch, return_value, accept_errors, result):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=return_value)
+
+ assert robot.fetch_url_json(module, 'https://foo/bar', accept_errors=accept_errors) == result
+
+
+@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_FAIL)
+def test_fetch_url_json_fail(monkeypatch, return_value, accept_errors, result):
+ module = get_module_mock()
+ robot.fetch_url = MagicMock(return_value=return_value)
+
+ with pytest.raises(ModuleFailException) as exc:
+ robot.fetch_url_json(module, 'https://foo/bar', accept_errors=accept_errors)
+
+ assert exc.value.fail_msg == result
+ assert exc.value.fail_kwargs == dict()
+
+
+@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_SUCCESS)
+def test_plugin_open_url_json(monkeypatch, return_value, accept_errors, result):
+ response = MagicMock()
+ response.read = MagicMock(return_value=return_value[1]['body'])
+ robot.open_url = MagicMock(return_value=response)
+ plugin = MagicMock()
+
+ assert robot.plugin_open_url_json(plugin, 'https://foo/bar', accept_errors=accept_errors) == result
+
+
+@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_FAIL)
+def test_plugin_open_url_json_fail(monkeypatch, return_value, accept_errors, result):
+ response = MagicMock()
+ response.read = MagicMock(return_value=return_value[1].get('body', ''))
+ robot.open_url = MagicMock(side_effect=robot.HTTPError('https://foo/bar', 400, 'Error!', {}, response))
+ plugin = MagicMock()
+
+ with pytest.raises(robot.PluginException) as exc:
+ robot.plugin_open_url_json(plugin, 'https://foo/bar', accept_errors=accept_errors)
+
+ assert exc.value.error_message == result
+
+
+def test_plugin_open_url_json_fail_other(monkeypatch):
+ robot.open_url = MagicMock(side_effect=Exception('buh!'))
+ plugin = MagicMock()
+
+ with pytest.raises(robot.PluginException) as exc:
+ robot.plugin_open_url_json(plugin, 'https://foo/bar')
+
+ assert exc.value.error_message == 'Failed request to Hetzner Robot server endpoint https://foo/bar: buh!'
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py
new file mode 100644
index 00000000..4c3d57e9
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py
@@ -0,0 +1,244 @@
+# (c) 2020 Felix Fontein <felix@fontein.de>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import (
+ FetchUrlCall,
+ BaseTestModule,
+)
+
+from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL
+from ansible_collections.community.hrobot.plugins.modules import failover_ip
+
+
+class TestHetznerFailoverIP(BaseTestModule):
+ MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.failover_ip.AnsibleModule'
+ MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url'
+
+ # Tests for state idempotence (routed and unrouted)
+
+ def test_unrouted(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'unrouted',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': None,
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['value'] is None
+ assert result['state'] == 'unrouted'
+
+ def test_routed(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'routed',
+ 'value': '4.3.2.1',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '4.3.2.1',
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['value'] == '4.3.2.1'
+ assert result['state'] == 'routed'
+
+ # Tests for changing state (unrouted to routed, vice versa)
+
+ def test_unrouted_to_routed(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'routed',
+ 'value': '4.3.2.1',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': None,
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '4.3.2.1',
+ },
+ })
+ .expect_form_value('active_server_ip', '4.3.2.1')
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['value'] == '4.3.2.1'
+ assert result['state'] == 'routed'
+
+ def test_unrouted_to_routed_check_mode(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'routed',
+ 'value': '4.3.2.1',
+ '_ansible_check_mode': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': None,
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['value'] == '4.3.2.1'
+ assert result['state'] == 'routed'
+
+ def test_routed_to_unrouted(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'unrouted',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '4.3.2.1',
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('DELETE', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': None,
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['value'] is None
+ assert result['state'] == 'unrouted'
+
+ # Tests for re-routing
+
+ def test_rerouting(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'routed',
+ 'value': '4.3.2.1',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '5.4.3.2',
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '4.3.2.1',
+ },
+ })
+ .expect_form_value('active_server_ip', '4.3.2.1')
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['value'] == '4.3.2.1'
+ assert result['state'] == 'routed'
+
+ def test_rerouting_already_routed(self, mocker):
+ result = self.run_module_success(mocker, failover_ip, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ 'state': 'routed',
+ 'value': '4.3.2.1',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '5.4.3.2',
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 409)
+ .result_json({
+ 'error': {
+ 'status': 409,
+ 'code': 'FAILOVER_ALREADY_ROUTED',
+ 'message': 'Failover already routed',
+ },
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '4.3.2.1',
+ },
+ })
+ .expect_form_value('active_server_ip', '4.3.2.1')
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['value'] == '4.3.2.1'
+ assert result['state'] == 'routed'
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py
new file mode 100644
index 00000000..08e97f97
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py
@@ -0,0 +1,71 @@
+# (c) 2020 Felix Fontein <felix@fontein.de>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import (
+ FetchUrlCall,
+ BaseTestModule,
+)
+
+from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL
+from ansible_collections.community.hrobot.plugins.modules import failover_ip_info
+
+
+class TestHetznerFailoverIPInfo(BaseTestModule):
+ MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.failover_ip_info.AnsibleModule'
+ MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url'
+
+ # Tests for state (routed and unrouted)
+
+ def test_unrouted(self, mocker):
+ result = self.run_module_success(mocker, failover_ip_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': None,
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['value'] is None
+ assert result['state'] == 'unrouted'
+ assert result['failover_ip'] == '1.2.3.4'
+ assert result['server_ip'] == '2.3.4.5'
+ assert result['server_number'] == 2345
+
+ def test_routed(self, mocker):
+ result = self.run_module_success(mocker, failover_ip_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'failover_ip': '1.2.3.4',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'failover': {
+ 'ip': '1.2.3.4',
+ 'netmask': '255.255.255.255',
+ 'server_ip': '2.3.4.5',
+ 'server_number': 2345,
+ 'active_server_ip': '4.3.2.1',
+ },
+ })
+ .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['value'] == '4.3.2.1'
+ assert result['state'] == 'routed'
+ assert result['failover_ip'] == '1.2.3.4'
+ assert result['server_ip'] == '2.3.4.5'
+ assert result['server_number'] == 2345
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py
new file mode 100644
index 00000000..ec8e38fc
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py
@@ -0,0 +1,1290 @@
+# (c) 2019 Felix Fontein <felix@fontein.de>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+import pytest
+
+from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import (
+ FetchUrlCall,
+ BaseTestModule,
+)
+
+from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL
+from ansible_collections.community.hrobot.plugins.modules import firewall
+
+
+def create_params(parameter, *values):
+ assert len(values) > 1
+ result = []
+ for i in range(1, len(values)):
+ result.append((parameter, values[i - 1], values[i]))
+ return result
+
+
+def flatten(list_of_lists):
+ result = []
+ for l in list_of_lists:
+ result.extend(l)
+ return result
+
+
+class TestHetznerFirewall(BaseTestModule):
+ MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.firewall.AnsibleModule'
+ MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url'
+
+ # Tests for state (absent and present)
+
+ def test_absent_idempotency(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'absent',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'disabled'
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_absent_idempotency_no_rules(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'absent',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'disabled'
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert 'rules' in result['firewall']
+ assert 'input' in result['firewall']['rules']
+ assert len(result['firewall']['rules']['input']) == 0
+
+ def test_absent_changed(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'absent',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'disabled'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'disabled'
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_absent_changed_no_rules(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'absent',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'disabled'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'active'
+ assert len(result['diff']['before']['rules']['input']) == 0
+ assert result['diff']['after']['status'] == 'disabled'
+ assert len(result['diff']['after']['rules']['input']) == 0
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert len(result['firewall']['rules']['input']) == 0
+
+ def test_present_idempotency(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_present_changed(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'active'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ # Tests for state (absent and present) with check mode
+
+ def test_absent_idempotency_check(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'absent',
+ '_ansible_check_mode': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'disabled'
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_absent_changed_check(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'absent',
+ '_ansible_check_mode': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'disabled'
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_present_idempotency_check(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ '_ansible_check_mode': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_present_changed_check(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ '_ansible_check_mode': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ # Tests for port
+
+ def test_port_idempotency(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'port': 'main',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['port'] == 'main'
+ assert result['diff']['after']['port'] == 'main'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert result['firewall']['port'] == 'main'
+
+ def test_port_changed(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'port': 'main',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': True,
+ 'port': 'kvm',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('port', 'main'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['port'] == 'kvm'
+ assert result['diff']['after']['port'] == 'main'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert result['firewall']['port'] == 'main'
+
+ # Tests for whitelist_hos
+
+ def test_whitelist_hos_idempotency(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'whitelist_hos': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['whitelist_hos'] is True
+ assert result['diff']['after']['whitelist_hos'] is True
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert result['firewall']['whitelist_hos'] is True
+
+ def test_whitelist_hos_changed(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'whitelist_hos': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('whitelist_hos', 'true'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['whitelist_hos'] is False
+ assert result['diff']['after']['whitelist_hos'] is True
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert result['firewall']['whitelist_hos'] is True
+
+ # Tests for wait_for_configured in getting status
+
+ def test_wait_get(self, mocker):
+ mocker.patch('time.sleep', lambda duration: None)
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'wait_for_configured': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_wait_get_timeout(self, mocker):
+ mocker.patch('time.sleep', lambda duration: None)
+ result = self.run_module_failed(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'wait_for_configured': True,
+ 'timeout': 0,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
+
+ def test_nowait_get(self, mocker):
+ result = self.run_module_failed(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'wait_for_configured': False,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['msg'] == 'Firewall configuration cannot be read as it is not configured.'
+
+ # Tests for wait_for_configured in setting status
+
+ def test_wait_update(self, mocker):
+ mocker.patch('time.sleep', lambda duration: None)
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'wait_for_configured': True,
+ 'state': 'present',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_wait_update_timeout(self, mocker):
+ mocker.patch('time.sleep', lambda duration: None)
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'wait_for_configured': True,
+ 'timeout': 0,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'in process'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert 'Timeout while waiting for firewall to be configured.' in result['warnings']
+
+ def test_nowait_update(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'wait_for_configured': False,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'disabled'
+ assert result['diff']['after']['status'] == 'active'
+ assert result['firewall']['status'] == 'in process'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ # Idempotency checks: different amount of input rules
+
+ def test_input_rule_len_change_0_1(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'rules': {
+ 'input': [
+ {
+ 'ip_version': 'ipv4',
+ 'action': 'discard',
+ },
+ ],
+ },
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [
+ {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ },
+ ],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'active')
+ .expect_form_value_absent('rules[input][0][name]')
+ .expect_form_value('rules[input][0][ip_version]', 'ipv4')
+ .expect_form_value_absent('rules[input][0][dst_ip]')
+ .expect_form_value_absent('rules[input][0][dst_port]')
+ .expect_form_value_absent('rules[input][0][src_ip]')
+ .expect_form_value_absent('rules[input][0][src_port]')
+ .expect_form_value_absent('rules[input][0][protocol]')
+ .expect_form_value_absent('rules[input][0][tcp_flags]')
+ .expect_form_value('rules[input][0][action]', 'discard')
+ .expect_form_value_absent('rules[input][1][action]'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert len(result['diff']['before']['rules']['input']) == 0
+ assert len(result['diff']['after']['rules']['input']) == 1
+ assert result['firewall']['status'] == 'active'
+ assert len(result['firewall']['rules']['input']) == 1
+
+ def test_input_rule_len_change_1_0(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'rules': {
+ },
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [
+ {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ },
+ ],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'active')
+ .expect_form_value_absent('rules[input][0][action]'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert len(result['diff']['before']['rules']['input']) == 1
+ assert len(result['diff']['after']['rules']['input']) == 0
+ assert result['firewall']['status'] == 'active'
+ assert len(result['firewall']['rules']['input']) == 0
+
+ def test_input_rule_len_change_1_2(self, mocker):
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'rules': {
+ 'input': [
+ {
+ 'ip_version': 'ipv4',
+ 'dst_port': 80,
+ 'protocol': 'tcp',
+ 'action': 'accept',
+ },
+ {
+ 'ip_version': 'ipv4',
+ 'action': 'discard',
+ },
+ ],
+ },
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [
+ {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ },
+ ],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [
+ {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': '80',
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': 'tcp',
+ 'tcp_flags': None,
+ 'action': 'accept',
+ },
+ {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ },
+ ],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'active')
+ .expect_form_value('rules[input][0][action]', 'accept')
+ .expect_form_value('rules[input][1][action]', 'discard')
+ .expect_form_value_absent('rules[input][2][action]'),
+ ])
+ assert result['changed'] is True
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert len(result['diff']['before']['rules']['input']) == 1
+ assert len(result['diff']['after']['rules']['input']) == 2
+ assert result['firewall']['status'] == 'active'
+ assert len(result['firewall']['rules']['input']) == 2
+
+ # Idempotency checks: change one value
+
+ @pytest.mark.parametrize("parameter, before, after", flatten([
+ create_params('name', None, '', 'Test', 'Test', 'foo', '', None),
+ create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'),
+ create_params('dst_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None),
+ create_params('dst_port', None, '80', '80-443', '80-443', None),
+ create_params('src_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None),
+ create_params('src_port', None, '80', '80-443', '80-443', None),
+ create_params('protocol', None, 'tcp', 'tcp', 'udp', 'udp', None),
+ create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None),
+ create_params('action', 'accept', 'accept', 'discard', 'discard'),
+ ]))
+ def test_input_rule_value_change(self, mocker, parameter, before, after):
+ input_call = {
+ 'ip_version': 'ipv4',
+ 'action': 'discard',
+ }
+ input_before = {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ }
+ input_after = {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ }
+ if after is not None:
+ input_call[parameter] = after
+ input_before[parameter] = before
+ input_after[parameter] = after
+
+ calls = [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [input_before],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ]
+
+ changed = (before != after)
+ if changed:
+ after_call = (
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [input_after],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'active')
+ .expect_form_value_absent('rules[input][1][action]')
+ )
+ if parameter != 'ip_version':
+ after_call.expect_form_value('rules[input][0][ip_version]', 'ipv4')
+ if parameter != 'action':
+ after_call.expect_form_value('rules[input][0][action]', 'discard')
+ if after is not None:
+ after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after)
+ else:
+ after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter))
+ calls.append(after_call)
+
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'rules': {
+ 'input': [input_call],
+ },
+ }, calls)
+ assert result['changed'] == changed
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert len(result['diff']['before']['rules']['input']) == 1
+ assert len(result['diff']['after']['rules']['input']) == 1
+ assert result['diff']['before']['rules']['input'][0][parameter] == before
+ assert result['diff']['after']['rules']['input'][0][parameter] == after
+ assert result['firewall']['status'] == 'active'
+ assert len(result['firewall']['rules']['input']) == 1
+ assert result['firewall']['rules']['input'][0][parameter] == after
+
+ # Idempotency checks: IP address normalization
+
+ @pytest.mark.parametrize("ip_version, parameter, before_normalized, after_normalized, after", [
+ ('ipv4', 'src_ip', '1.2.3.4/32', '1.2.3.4/32', '1.2.3.4'),
+ ('ipv6', 'src_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3::4'),
+ ('ipv6', 'dst_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3:0::4'),
+ ('ipv6', 'dst_ip', '::/0', '::/0', '0:0::0/0'),
+ ('ipv6', 'dst_ip', '::/0', '::1/0', '0:0::0:1/0'),
+ ('ipv6', 'dst_ip', '::/0', None, None),
+ ])
+ def test_input_rule_ip_normalization(self, mocker, ip_version, parameter, before_normalized, after_normalized, after):
+ assert ip_version in ('ipv4', 'ipv6')
+ assert parameter in ('src_ip', 'dst_ip')
+ input_call = {
+ 'ip_version': ip_version,
+ 'action': 'discard',
+ }
+ input_before = {
+ 'name': None,
+ 'ip_version': ip_version,
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ }
+ input_after = {
+ 'name': None,
+ 'ip_version': ip_version,
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ }
+ if after is not None:
+ input_call[parameter] = after
+ input_before[parameter] = before_normalized
+ input_after[parameter] = after_normalized
+
+ calls = [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': True,
+ 'port': 'main',
+ 'rules': {
+ 'input': [input_before],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ]
+
+ changed = (before_normalized != after_normalized)
+ if changed:
+ after_call = (
+ FetchUrlCall('POST', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [input_after],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL))
+ .expect_form_value('status', 'active')
+ .expect_form_value_absent('rules[input][1][action]')
+ )
+ after_call.expect_form_value('rules[input][0][ip_version]', ip_version)
+ after_call.expect_form_value('rules[input][0][action]', 'discard')
+ if after_normalized is None:
+ after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter))
+ else:
+ after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized)
+ calls.append(after_call)
+
+ result = self.run_module_success(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'rules': {
+ 'input': [input_call],
+ },
+ }, calls)
+ assert result['changed'] == changed
+ assert result['diff']['before']['status'] == 'active'
+ assert result['diff']['after']['status'] == 'active'
+ assert len(result['diff']['before']['rules']['input']) == 1
+ assert len(result['diff']['after']['rules']['input']) == 1
+ assert result['diff']['before']['rules']['input'][0][parameter] == before_normalized
+ assert result['diff']['after']['rules']['input'][0][parameter] == after_normalized
+ assert result['firewall']['status'] == 'active'
+ assert len(result['firewall']['rules']['input']) == 1
+ assert result['firewall']['rules']['input'][0][parameter] == after_normalized
+
+ # Missing requirements
+
+ def test_fail_no_ipaddress(self, mocker):
+ try:
+ firewall.HAS_IPADDRESS = False
+ firewall.IPADDRESS_IMP_ERR = 'This is\na traceback'
+ result = self.run_module_failed(mocker, firewall, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'state': 'present',
+ 'wait_for_configured': True,
+ 'timeout': 0,
+ }, [])
+ assert result['msg'].startswith('Failed to import the required Python library (ipaddress) on')
+ assert result['exception'] == 'This is\na traceback'
+ finally:
+ firewall.HAS_IPADDRESS = True
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py
new file mode 100644
index 00000000..cce6459f
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py
@@ -0,0 +1,280 @@
+# (c) 2019 Felix Fontein <felix@fontein.de>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import (
+ FetchUrlCall,
+ BaseTestModule,
+)
+
+from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL
+from ansible_collections.community.hrobot.plugins.modules import firewall_info
+
+
+class TestHetznerFirewallInfo(BaseTestModule):
+ MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.firewall_info.AnsibleModule'
+ MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url'
+
+ # Tests for state (absent and present)
+
+ def test_absent(self, mocker):
+ result = self.run_module_success(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_absent_no_rules(self, mocker):
+ result = self.run_module_success(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'disabled',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['firewall']['status'] == 'disabled'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert 'rules' in result['firewall']
+ assert 'input' in result['firewall']['rules']
+ assert len(result['firewall']['rules']['input']) == 0
+
+ def test_present(self, mocker):
+ result = self.run_module_success(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert len(result['firewall']['rules']['input']) == 0
+
+ def test_present_w_rules(self, mocker):
+ result = self.run_module_success(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [
+ {
+ 'name': 'Accept HTTPS traffic',
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': '443',
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': 'tcp',
+ 'tcp_flags': None,
+ 'action': 'accept',
+ },
+ {
+ 'name': None,
+ 'ip_version': 'ipv4',
+ 'dst_ip': None,
+ 'dst_port': None,
+ 'src_ip': None,
+ 'src_port': None,
+ 'protocol': None,
+ 'tcp_flags': None,
+ 'action': 'discard',
+ }
+ ],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+ assert len(result['firewall']['rules']['input']) == 2
+ assert result['firewall']['rules']['input'][0]['name'] == 'Accept HTTPS traffic'
+ assert result['firewall']['rules']['input'][0]['dst_port'] == '443'
+ assert result['firewall']['rules']['input'][0]['action'] == 'accept'
+ assert result['firewall']['rules']['input'][1]['dst_port'] is None
+ assert result['firewall']['rules']['input'][1]['action'] == 'discard'
+
+ # Tests for wait_for_configured in getting status
+
+ def test_wait_get(self, mocker):
+ mocker.patch('time.sleep', lambda duration: None)
+ result = self.run_module_success(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'wait_for_configured': True,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'active',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['firewall']['status'] == 'active'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
+
+ def test_wait_get_timeout(self, mocker):
+ mocker.patch('time.sleep', lambda duration: None)
+ result = self.run_module_failed(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'wait_for_configured': True,
+ 'timeout': 0,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
+
+ def test_nowait_get(self, mocker):
+ result = self.run_module_success(mocker, firewall_info, {
+ 'hetzner_user': '',
+ 'hetzner_password': '',
+ 'server_ip': '1.2.3.4',
+ 'wait_for_configured': False,
+ }, [
+ FetchUrlCall('GET', 200)
+ .result_json({
+ 'firewall': {
+ 'server_ip': '1.2.3.4',
+ 'server_number': 1,
+ 'status': 'in process',
+ 'whitelist_hos': False,
+ 'port': 'main',
+ 'rules': {
+ 'input': [],
+ },
+ },
+ })
+ .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)),
+ ])
+ assert result['changed'] is False
+ assert result['firewall']['status'] == 'in process'
+ assert result['firewall']['server_ip'] == '1.2.3.4'
+ assert result['firewall']['server_number'] == 1
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt
new file mode 100644
index 00000000..86e56874
--- /dev/null
+++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt
@@ -0,0 +1,5 @@
+unittest2 ; python_version < '2.7'
+importlib ; python_version < '2.7'
+
+# firewall module
+ipaddress ; python_version < '3.3'