diff options
Diffstat (limited to 'collections-debian-merged/ansible_collections/community/hrobot/tests')
12 files changed, 2492 insertions, 0 deletions
diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml b/collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml new file mode 100644 index 00000000..e2d8cdda --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/requirements.yml @@ -0,0 +1,2 @@ +unit_tests_dependencies: +- community.internal_test_tools diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py new file mode 100644 index 00000000..098be7e5 --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py @@ -0,0 +1,251 @@ +# Copyright (c), Felix Fontein <felix@fontein.de>, 2020 +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import json +import textwrap + +import pytest + +from mock import MagicMock + +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.inventory.data import InventoryData +from ansible.inventory.manager import InventoryManager + +from ansible_collections.community.internal_test_tools.tests.unit.mock.path import mock_unfrackpath_noop +from ansible_collections.community.internal_test_tools.tests.unit.mock.loader import DictDataLoader +from ansible_collections.community.internal_test_tools.tests.unit.utils.open_url_framework import ( + OpenUrlCall, + OpenUrlProxy, +) + +from ansible_collections.community.hrobot.plugins.inventory.robot import InventoryModule +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL + + +@pytest.fixture(scope="module") +def inventory(): + r = InventoryModule() + r.inventory = InventoryData() + return r + + +def get_option(option): + if option == 'filters': + return {} + if option == 'hetzner_user': + return 'test' + if option == 'hetzner_password': + return 'hunter2' + return False + + +def test_populate(inventory, mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + }, + }, + ]) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + + inventory.get_option = mocker.MagicMock(side_effect=get_option) + inventory.populate(inventory.get_servers()) + + open_url.assert_is_done() + + host_1 = inventory.inventory.get_host('1.2.3.4') + host_2 = inventory.inventory.get_host('test-server') + + host_1_vars = host_1.get_vars() + host_2_vars = host_2.get_vars() + + assert host_1_vars['ansible_host'] == '1.2.3.4' + assert host_1_vars['hrobot_server_ip'] == '1.2.3.4' + assert 'hrobot_server_name' not in host_1_vars + assert host_2_vars['ansible_host'] == '1.2.3.5' + assert host_2_vars['hrobot_server_ip'] == '1.2.3.5' + assert host_2_vars['hrobot_server_name'] == 'test-server' + + +def test_inventory_file_simple(mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + 'dc': 'foo', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + 'dc': 'foo', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.6', + 'server_name': 'test-server-2', + 'dc': 'bar', + }, + }, + ]) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', lambda x: True) + mocker.patch('os.access', lambda x, y: True) + + inventory_filename = "test.robot.yaml" + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + filters: + dc: foo + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert im._inventory.hosts + assert '1.2.3.4' in im._inventory.hosts + assert 'test-server' in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert im._inventory.get_host('1.2.3.4') in im._inventory.groups['ungrouped'].hosts + assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 2 + assert len(im._inventory.groups['all'].hosts) == 0 + + +@pytest.mark.parametrize("error_result", [ + None, + json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + ), sort_keys=True).encode('utf-8') +]) +def test_inventory_file_fail(mocker, error_result): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_error(error_result) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', lambda x: True) + mocker.patch('os.access', lambda x, y: True) + + inventory_filename = "test.robot.yml" + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + filters: + dc: foo + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert not im._inventory.hosts + assert '1.2.3.4' not in im._inventory.hosts + assert 'test-server' not in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 0 + assert len(im._inventory.groups['all'].hosts) == 0 + + +def test_inventory_wrong_file(mocker): + open_url = OpenUrlProxy([]) + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', lambda x: True) + mocker.patch('os.access', lambda x, y: True) + + inventory_filename = "test.bobot.yml" + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert not im._inventory.hosts + assert '1.2.3.4' not in im._inventory.hosts + assert 'test-server' not in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 0 + assert len(im._inventory.groups['all'].hosts) == 0 + + +def test_inventory_file_collision(mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + 'server_name': 'test-server', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + }, + }, + ]) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', lambda x: True) + mocker.patch('os.access', lambda x, y: True) + + inventory_filename = "test.robot.yaml" + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert im._inventory.hosts + assert 'test-server' in im._inventory.hosts + assert im._inventory.get_host('test-server').get_vars()['ansible_host'] == '1.2.3.4' + assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 1 + assert len(im._inventory.groups['all'].hosts) == 0 + # TODO: check for warning diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py new file mode 100644 index 00000000..6a2b784e --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py @@ -0,0 +1,188 @@ +# Copyright: (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import copy +import json +import pytest + +from mock import MagicMock +from ansible_collections.community.hrobot.plugins.module_utils import robot +from ansible_collections.community.hrobot.plugins.module_utils import failover + + +class ModuleFailException(Exception): + def __init__(self, msg, **kwargs): + super(ModuleFailException, self).__init__(msg) + self.fail_msg = msg + self.fail_kwargs = kwargs + + +def get_module_mock(): + def f(msg, **kwargs): + raise ModuleFailException(msg, **kwargs) + + module = MagicMock() + module.fail_json = f + module.from_json = json.loads + return module + + +# ######################################################################################## + +GET_FAILOVER_SUCCESS = [ + ( + '1.2.3.4', + (None, dict( + body=json.dumps(dict( + failover=dict( + active_server_ip='1.1.1.1', + ip='1.2.3.4', + netmask='255.255.255.255', + ) + )).encode('utf-8'), + )), + '1.1.1.1', + dict( + active_server_ip='1.1.1.1', + ip='1.2.3.4', + netmask='255.255.255.255', + ) + ), +] + + +GET_FAILOVER_FAIL = [ + ( + '1.2.3.4', + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + 'Request failed: 400 foo (bar)' + ), + ( + '1.2.3.4', + (None, dict( + body='{"foo": "bar"}'.encode('utf-8'), + )), + 'Cannot interpret result: {"foo": "bar"}' + ), +] + + +@pytest.mark.parametrize("ip, return_value, result, record", GET_FAILOVER_SUCCESS) +def test_get_failover_record(monkeypatch, ip, return_value, result, record): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + assert failover.get_failover_record(module, ip) == record + + +@pytest.mark.parametrize("ip, return_value, result", GET_FAILOVER_FAIL) +def test_get_failover_record_fail(monkeypatch, ip, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + with pytest.raises(ModuleFailException) as exc: + failover.get_failover_record(module, ip) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() + + +@pytest.mark.parametrize("ip, return_value, result, record", GET_FAILOVER_SUCCESS) +def test_get_failover(monkeypatch, ip, return_value, result, record): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + assert failover.get_failover(module, ip) == result + + +@pytest.mark.parametrize("ip, return_value, result", GET_FAILOVER_FAIL) +def test_get_failover_fail(monkeypatch, ip, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + with pytest.raises(ModuleFailException) as exc: + failover.get_failover(module, ip) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() + + +# ######################################################################################## + +SET_FAILOVER_SUCCESS = [ + ( + '1.2.3.4', + '1.1.1.1', + (None, dict( + body=json.dumps(dict( + failover=dict( + active_server_ip='1.1.1.2', + ) + )).encode('utf-8'), + )), + ('1.1.1.2', True) + ), + ( + '1.2.3.4', + '1.1.1.1', + (None, dict( + body=json.dumps(dict( + error=dict( + code="FAILOVER_ALREADY_ROUTED", + status=400, + message="Failover already routed", + ), + )).encode('utf-8'), + )), + ('1.1.1.1', False) + ), +] + + +SET_FAILOVER_FAIL = [ + ( + '1.2.3.4', + '1.1.1.1', + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + 'Request failed: 400 foo (bar)' + ), +] + + +@pytest.mark.parametrize("ip, value, return_value, result", SET_FAILOVER_SUCCESS) +def test_set_failover(monkeypatch, ip, value, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + assert failover.set_failover(module, ip, value) == result + + +@pytest.mark.parametrize("ip, value, return_value, result", SET_FAILOVER_FAIL) +def test_set_failover_fail(monkeypatch, ip, value, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + with pytest.raises(ModuleFailException) as exc: + failover.set_failover(module, ip, value) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py new file mode 100644 index 00000000..3ef71c29 --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py @@ -0,0 +1,161 @@ +# Copyright: (c) 2017 Ansible Project +# Copyright (c), Felix Fontein <felix@fontein.de>, 2019-2020 +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import copy +import json +import pytest + +from mock import MagicMock +from ansible_collections.community.hrobot.plugins.module_utils import robot + + +class ModuleFailException(Exception): + def __init__(self, msg, **kwargs): + super(ModuleFailException, self).__init__(msg) + self.fail_msg = msg + self.fail_kwargs = kwargs + + +def get_module_mock(): + def f(msg, **kwargs): + raise ModuleFailException(msg, **kwargs) + + module = MagicMock() + module.fail_json = f + module.from_json = json.loads + return module + + +# ######################################################################################## + +FETCH_URL_JSON_SUCCESS = [ + ( + (None, dict( + body=json.dumps(dict( + a='b' + )).encode('utf-8'), + )), + None, + (dict( + a='b' + ), None) + ), + ( + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + a='b' + )).encode('utf-8'), + )), + ['foo'], + (dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + a='b' + ), 'foo') + ), +] + + +FETCH_URL_JSON_FAIL = [ + ( + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + None, + 'Request failed: 400 foo (bar)' + ), + ( + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + ['bar'], + 'Request failed: 400 foo (bar)' + ), + ( + (None, dict(body='{this is not json}'.encode('utf-8'))), + [], + 'Cannot decode content retrieved from https://foo/bar' + ), + ( + (None, dict()), + [], + 'Cannot retrieve content from https://foo/bar' + ), +] + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_SUCCESS) +def test_fetch_url_json(monkeypatch, return_value, accept_errors, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=return_value) + + assert robot.fetch_url_json(module, 'https://foo/bar', accept_errors=accept_errors) == result + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_FAIL) +def test_fetch_url_json_fail(monkeypatch, return_value, accept_errors, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=return_value) + + with pytest.raises(ModuleFailException) as exc: + robot.fetch_url_json(module, 'https://foo/bar', accept_errors=accept_errors) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_SUCCESS) +def test_plugin_open_url_json(monkeypatch, return_value, accept_errors, result): + response = MagicMock() + response.read = MagicMock(return_value=return_value[1]['body']) + robot.open_url = MagicMock(return_value=response) + plugin = MagicMock() + + assert robot.plugin_open_url_json(plugin, 'https://foo/bar', accept_errors=accept_errors) == result + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_FAIL) +def test_plugin_open_url_json_fail(monkeypatch, return_value, accept_errors, result): + response = MagicMock() + response.read = MagicMock(return_value=return_value[1].get('body', '')) + robot.open_url = MagicMock(side_effect=robot.HTTPError('https://foo/bar', 400, 'Error!', {}, response)) + plugin = MagicMock() + + with pytest.raises(robot.PluginException) as exc: + robot.plugin_open_url_json(plugin, 'https://foo/bar', accept_errors=accept_errors) + + assert exc.value.error_message == result + + +def test_plugin_open_url_json_fail_other(monkeypatch): + robot.open_url = MagicMock(side_effect=Exception('buh!')) + plugin = MagicMock() + + with pytest.raises(robot.PluginException) as exc: + robot.plugin_open_url_json(plugin, 'https://foo/bar') + + assert exc.value.error_message == 'Failed request to Hetzner Robot server endpoint https://foo/bar: buh!' diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py new file mode 100644 index 00000000..4c3d57e9 --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py @@ -0,0 +1,244 @@ +# (c) 2020 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import failover_ip + + +class TestHetznerFailoverIP(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.failover_ip.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state idempotence (routed and unrouted) + + def test_unrouted(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'unrouted', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] is None + assert result['state'] == 'unrouted' + + def test_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + # Tests for changing state (unrouted to routed, vice versa) + + def test_unrouted_to_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_form_value('active_server_ip', '4.3.2.1') + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + def test_unrouted_to_routed_check_mode(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + def test_routed_to_unrouted(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'unrouted', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] is None + assert result['state'] == 'unrouted' + + # Tests for re-routing + + def test_rerouting(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '5.4.3.2', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_form_value('active_server_ip', '4.3.2.1') + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + def test_rerouting_already_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '5.4.3.2', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 409) + .result_json({ + 'error': { + 'status': 409, + 'code': 'FAILOVER_ALREADY_ROUTED', + 'message': 'Failover already routed', + }, + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_form_value('active_server_ip', '4.3.2.1') + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py new file mode 100644 index 00000000..08e97f97 --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py @@ -0,0 +1,71 @@ +# (c) 2020 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import failover_ip_info + + +class TestHetznerFailoverIPInfo(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.failover_ip_info.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state (routed and unrouted) + + def test_unrouted(self, mocker): + result = self.run_module_success(mocker, failover_ip_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] is None + assert result['state'] == 'unrouted' + assert result['failover_ip'] == '1.2.3.4' + assert result['server_ip'] == '2.3.4.5' + assert result['server_number'] == 2345 + + def test_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + assert result['failover_ip'] == '1.2.3.4' + assert result['server_ip'] == '2.3.4.5' + assert result['server_number'] == 2345 diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py new file mode 100644 index 00000000..ec8e38fc --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py @@ -0,0 +1,1290 @@ +# (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import pytest + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import firewall + + +def create_params(parameter, *values): + assert len(values) > 1 + result = [] + for i in range(1, len(values)): + result.append((parameter, values[i - 1], values[i])) + return result + + +def flatten(list_of_lists): + result = [] + for l in list_of_lists: + result.extend(l) + return result + + +class TestHetznerFirewall(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.firewall.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state (absent and present) + + def test_absent_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_idempotency_no_rules(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'rules' in result['firewall'] + assert 'input' in result['firewall']['rules'] + assert len(result['firewall']['rules']['input']) == 0 + + def test_absent_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'disabled'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_changed_no_rules(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'disabled'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert result['diff']['after']['status'] == 'disabled' + assert len(result['diff']['after']['rules']['input']) == 0 + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert len(result['firewall']['rules']['input']) == 0 + + def test_present_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_present_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + # Tests for state (absent and present) with check mode + + def test_absent_idempotency_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_changed_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_present_idempotency_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_present_changed_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + # Tests for port + + def test_port_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'port': 'main', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['port'] == 'main' + assert result['diff']['after']['port'] == 'main' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['port'] == 'main' + + def test_port_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'port': 'main', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'kvm', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('port', 'main'), + ]) + assert result['changed'] is True + assert result['diff']['before']['port'] == 'kvm' + assert result['diff']['after']['port'] == 'main' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['port'] == 'main' + + # Tests for whitelist_hos + + def test_whitelist_hos_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'whitelist_hos': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['whitelist_hos'] is True + assert result['diff']['after']['whitelist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['whitelist_hos'] is True + + def test_whitelist_hos_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'whitelist_hos': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('whitelist_hos', 'true'), + ]) + assert result['changed'] is True + assert result['diff']['before']['whitelist_hos'] is False + assert result['diff']['after']['whitelist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['whitelist_hos'] is True + + # Tests for wait_for_configured in getting status + + def test_wait_get(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_wait_get_timeout(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_failed(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Timeout while waiting for firewall to be configured.' + + def test_nowait_get(self, mocker): + result = self.run_module_failed(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Firewall configuration cannot be read as it is not configured.' + + # Tests for wait_for_configured in setting status + + def test_wait_update(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': True, + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_wait_update_timeout(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'Timeout while waiting for firewall to be configured.' in result['warnings'] + + def test_nowait_update(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + # Idempotency checks: different amount of input rules + + def test_input_rule_len_change_0_1(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [ + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][name]') + .expect_form_value('rules[input][0][ip_version]', 'ipv4') + .expect_form_value_absent('rules[input][0][dst_ip]') + .expect_form_value_absent('rules[input][0][dst_port]') + .expect_form_value_absent('rules[input][0][src_ip]') + .expect_form_value_absent('rules[input][0][src_port]') + .expect_form_value_absent('rules[input][0][protocol]') + .expect_form_value_absent('rules[input][0][tcp_flags]') + .expect_form_value('rules[input][0][action]', 'discard') + .expect_form_value_absent('rules[input][1][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['input']) == 1 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + + def test_input_rule_len_change_1_0(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 0 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + + def test_input_rule_len_change_1_2(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [ + { + 'ip_version': 'ipv4', + 'dst_port': 80, + 'protocol': 'tcp', + 'action': 'accept', + }, + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': '80', + 'src_ip': None, + 'src_port': None, + 'protocol': 'tcp', + 'tcp_flags': None, + 'action': 'accept', + }, + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value('rules[input][0][action]', 'accept') + .expect_form_value('rules[input][1][action]', 'discard') + .expect_form_value_absent('rules[input][2][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 2 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 2 + + # Idempotency checks: change one value + + @pytest.mark.parametrize("parameter, before, after", flatten([ + create_params('name', None, '', 'Test', 'Test', 'foo', '', None), + create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'), + create_params('dst_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('dst_port', None, '80', '80-443', '80-443', None), + create_params('src_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('src_port', None, '80', '80-443', '80-443', None), + create_params('protocol', None, 'tcp', 'tcp', 'udp', 'udp', None), + create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None), + create_params('action', 'accept', 'accept', 'discard', 'discard'), + ])) + def test_input_rule_value_change(self, mocker, parameter, before, after): + input_call = { + 'ip_version': 'ipv4', + 'action': 'discard', + } + input_before = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + input_after = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + input_call[parameter] = after + input_before[parameter] = before + input_after[parameter] = after + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [input_before], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before != after) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [input_after], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][1][action]') + ) + if parameter != 'ip_version': + after_call.expect_form_value('rules[input][0][ip_version]', 'ipv4') + if parameter != 'action': + after_call.expect_form_value('rules[input][0][action]', 'discard') + if after is not None: + after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after) + else: + after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter)) + calls.append(after_call) + + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [input_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 1 + assert result['diff']['before']['rules']['input'][0][parameter] == before + assert result['diff']['after']['rules']['input'][0][parameter] == after + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert result['firewall']['rules']['input'][0][parameter] == after + + # Idempotency checks: IP address normalization + + @pytest.mark.parametrize("ip_version, parameter, before_normalized, after_normalized, after", [ + ('ipv4', 'src_ip', '1.2.3.4/32', '1.2.3.4/32', '1.2.3.4'), + ('ipv6', 'src_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3::4'), + ('ipv6', 'dst_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3:0::4'), + ('ipv6', 'dst_ip', '::/0', '::/0', '0:0::0/0'), + ('ipv6', 'dst_ip', '::/0', '::1/0', '0:0::0:1/0'), + ('ipv6', 'dst_ip', '::/0', None, None), + ]) + def test_input_rule_ip_normalization(self, mocker, ip_version, parameter, before_normalized, after_normalized, after): + assert ip_version in ('ipv4', 'ipv6') + assert parameter in ('src_ip', 'dst_ip') + input_call = { + 'ip_version': ip_version, + 'action': 'discard', + } + input_before = { + 'name': None, + 'ip_version': ip_version, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + input_after = { + 'name': None, + 'ip_version': ip_version, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + input_call[parameter] = after + input_before[parameter] = before_normalized + input_after[parameter] = after_normalized + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [input_before], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before_normalized != after_normalized) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [input_after], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][1][action]') + ) + after_call.expect_form_value('rules[input][0][ip_version]', ip_version) + after_call.expect_form_value('rules[input][0][action]', 'discard') + if after_normalized is None: + after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter)) + else: + after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized) + calls.append(after_call) + + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [input_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['input']) == 1 + assert result['diff']['before']['rules']['input'][0][parameter] == before_normalized + assert result['diff']['after']['rules']['input'][0][parameter] == after_normalized + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert result['firewall']['rules']['input'][0][parameter] == after_normalized + + # Missing requirements + + def test_fail_no_ipaddress(self, mocker): + try: + firewall.HAS_IPADDRESS = False + firewall.IPADDRESS_IMP_ERR = 'This is\na traceback' + result = self.run_module_failed(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, []) + assert result['msg'].startswith('Failed to import the required Python library (ipaddress) on') + assert result['exception'] == 'This is\na traceback' + finally: + firewall.HAS_IPADDRESS = True diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py new file mode 100644 index 00000000..cce6459f --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py @@ -0,0 +1,280 @@ +# (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import firewall_info + + +class TestHetznerFirewallInfo(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.firewall_info.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state (absent and present) + + def test_absent(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_no_rules(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'rules' in result['firewall'] + assert 'input' in result['firewall']['rules'] + assert len(result['firewall']['rules']['input']) == 0 + + def test_present(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert len(result['firewall']['rules']['input']) == 0 + + def test_present_w_rules(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': 'Accept HTTPS traffic', + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': '443', + 'src_ip': None, + 'src_port': None, + 'protocol': 'tcp', + 'tcp_flags': None, + 'action': 'accept', + }, + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert len(result['firewall']['rules']['input']) == 2 + assert result['firewall']['rules']['input'][0]['name'] == 'Accept HTTPS traffic' + assert result['firewall']['rules']['input'][0]['dst_port'] == '443' + assert result['firewall']['rules']['input'][0]['action'] == 'accept' + assert result['firewall']['rules']['input'][1]['dst_port'] is None + assert result['firewall']['rules']['input'][1]['action'] == 'discard' + + # Tests for wait_for_configured in getting status + + def test_wait_get(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_wait_get_timeout(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_failed(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Timeout while waiting for firewall to be configured.' + + def test_nowait_get(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 diff --git a/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt new file mode 100644 index 00000000..86e56874 --- /dev/null +++ b/collections-debian-merged/ansible_collections/community/hrobot/tests/unit/requirements.txt @@ -0,0 +1,5 @@ +unittest2 ; python_version < '2.7' +importlib ; python_version < '2.7' + +# firewall module +ipaddress ; python_version < '3.3' |