diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
commit | 975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch) | |
tree | 89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/community/hrobot/tests | |
parent | Initial commit. (diff) | |
download | ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip |
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/hrobot/tests')
45 files changed, 6708 insertions, 0 deletions
diff --git a/ansible_collections/community/hrobot/tests/config.yml b/ansible_collections/community/hrobot/tests/config.yml new file mode 100644 index 000000000..38590f2e4 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/config.yml @@ -0,0 +1,9 @@ +--- +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +# See template for more information: +# https://github.com/ansible/ansible/blob/devel/test/lib/ansible_test/config/config.yml +modules: + python_requires: default diff --git a/ansible_collections/community/hrobot/tests/ee/all.yml b/ansible_collections/community/hrobot/tests/ee/all.yml new file mode 100644 index 000000000..26f198b4f --- /dev/null +++ b/ansible_collections/community/hrobot/tests/ee/all.yml @@ -0,0 +1,18 @@ +--- +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +- hosts: localhost + tasks: + - name: Find all roles + find: + paths: + - "{{ (playbook_dir | default('.')) ~ '/roles' }}" + file_type: directory + depth: 1 + register: result + - name: Include all roles + include_role: + name: "{{ item }}" + loop: "{{ result.files | map(attribute='path') | map('regex_replace', '.*/', '') | sort }}" diff --git a/ansible_collections/community/hrobot/tests/ee/roles/smoke/library/smoke_ipaddress.py b/ansible_collections/community/hrobot/tests/ee/roles/smoke/library/smoke_ipaddress.py new file mode 100644 index 000000000..6c2156135 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/ee/roles/smoke/library/smoke_ipaddress.py @@ -0,0 +1,50 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2022 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: smoke_ipaddress +short_description: Check whether ipaddress is present +author: + - Felix Fontein (@felixfontein) +description: + - Check whether C(ipaddress) is present. +options: {} +''' + +EXAMPLES = r''' # ''' + +RETURN = r''' # ''' + +import traceback + +from ansible.module_utils.basic import AnsibleModule, missing_required_lib + +try: + import ipaddress # noqa: F401, pylint: disable=unused-import + HAS_IPADDRESS = True + IPADDRESS_IMP_ERR = None +except ImportError as exc: + IPADDRESS_IMP_ERR = traceback.format_exc() + HAS_IPADDRESS = False + + +def main(): + module = AnsibleModule(argument_spec=dict(), supports_check_mode=True) + + if not HAS_IPADDRESS: + module.fail_json(msg=missing_required_lib('ipaddress'), exception=IPADDRESS_IMP_ERR) + + module.exit_json(msg='Everything is ok') + + +if __name__ == '__main__': # pragma: no cover + main() # pragma: no cover diff --git a/ansible_collections/community/hrobot/tests/ee/roles/smoke/tasks/main.yml b/ansible_collections/community/hrobot/tests/ee/roles/smoke/tasks/main.yml new file mode 100644 index 000000000..a83cfbb30 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/ee/roles/smoke/tasks/main.yml @@ -0,0 +1,29 @@ +--- +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +- name: Check whether ipaddress is present + smoke_ipaddress: + register: result + +- name: Validate result + assert: + that: + - result.msg == 'Everything is ok' + +- name: Check ssh_key module with invalid fingerprint + community.hrobot.ssh_key: + hetzner_user: foo + hetzner_password: bar + name: baz + state: absent + fingerprint: f0:0b + ignore_errors: true + register: result + +- name: Validate result + assert: + that: + - result is failed + - "result.msg == 'Fingerprint must consist of 16 8-bit hex numbers: got 2 8-bit hex numbers instead'" diff --git a/ansible_collections/community/hrobot/tests/requirements.yml b/ansible_collections/community/hrobot/tests/requirements.yml new file mode 100644 index 000000000..dde980c10 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/requirements.yml @@ -0,0 +1,7 @@ +--- +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +unit_tests_dependencies: +- community.internal_test_tools diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.json b/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.json new file mode 100644 index 000000000..c2e612e5f --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.json @@ -0,0 +1,10 @@ +{ + "include_symlinks": false, + "prefixes": [ + "docs/docsite/" + ], + "output": "path-line-column-message", + "requirements": [ + "antsibull-docs" + ] +} diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.json.license b/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.json.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.json.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.py b/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.py new file mode 100755 index 000000000..673104923 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/extra-docs.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +"""Check extra collection docs with antsibull-docs.""" +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import sys +import subprocess + + +def main(): + """Main entry point.""" + if not os.path.isdir(os.path.join('docs', 'docsite')): + return + p = subprocess.run(['antsibull-docs', 'lint-collection-docs', '.'], check=False) + if p.returncode not in (0, 3): + print('{0}:0:0: unexpected return code {1}'.format(sys.argv[0], p.returncode)) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/licenses.json b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.json new file mode 100644 index 000000000..50e47ca88 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.json @@ -0,0 +1,4 @@ +{ + "include_symlinks": false, + "output": "path-message" +} diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/licenses.json.license b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.json.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.json.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/licenses.py b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.py new file mode 100755 index 000000000..80eb795ef --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# Copyright (c) 2022, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +"""Prevent files without a correct license identifier from being added to the source tree.""" +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import glob +import sys + + +def format_license_list(licenses): + if not licenses: + return '(empty)' + return ', '.join(['"%s"' % license for license in licenses]) + + +def find_licenses(filename, relax=False): + spdx_license_identifiers = [] + other_license_identifiers = [] + has_copyright = False + try: + with open(filename, 'r', encoding='utf-8') as f: + for line in f: + line = line.rstrip() + if 'Copyright ' in line: + has_copyright = True + if 'Copyright: ' in line: + print('%s: found copyright line with "Copyright:". Please remove the colon.' % (filename, )) + if 'SPDX-FileCopyrightText: ' in line: + has_copyright = True + idx = line.find('SPDX-License-Identifier: ') + if idx >= 0: + lic_id = line[idx + len('SPDX-License-Identifier: '):] + spdx_license_identifiers.extend(lic_id.split(' OR ')) + if 'GNU General Public License' in line: + if 'v3.0+' in line: + other_license_identifiers.append('GPL-3.0-or-later') + if 'version 3 or later' in line: + other_license_identifiers.append('GPL-3.0-or-later') + if 'Simplified BSD License' in line: + other_license_identifiers.append('BSD-2-Clause') + if 'Apache License 2.0' in line: + other_license_identifiers.append('Apache-2.0') + if 'PSF License' in line or 'Python-2.0' in line: + other_license_identifiers.append('PSF-2.0') + if 'MIT License' in line: + other_license_identifiers.append('MIT') + except Exception as exc: + print('%s: error while processing file: %s' % (filename, exc)) + if len(set(spdx_license_identifiers)) < len(spdx_license_identifiers): + print('%s: found identical SPDX-License-Identifier values' % (filename, )) + if other_license_identifiers and set(other_license_identifiers) != set(spdx_license_identifiers): + print('%s: SPDX-License-Identifier yielded the license list %s, while manual guessing yielded the license list %s' % ( + filename, format_license_list(spdx_license_identifiers), format_license_list(other_license_identifiers))) + if not has_copyright and not relax: + print('%s: found no copyright notice' % (filename, )) + return sorted(spdx_license_identifiers) + + +def main(): + """Main entry point.""" + paths = sys.argv[1:] or sys.stdin.read().splitlines() + + # The following paths are allowed to have no license identifier + no_comments_allowed = [ + 'changelogs/fragments/*.yml', + 'changelogs/fragments/*.yaml', + ] + + # These files are completely ignored + ignore_paths = [ + '.ansible-test-timeout.json', + '.reuse/dep5', + 'LICENSES/*.txt', + 'COPYING', + ] + + no_comments_allowed = [fn for pattern in no_comments_allowed for fn in glob.glob(pattern)] + ignore_paths = [fn for pattern in ignore_paths for fn in glob.glob(pattern)] + + valid_licenses = [license_file[len('LICENSES/'):-len('.txt')] for license_file in glob.glob('LICENSES/*.txt')] + + for path in paths: + if path.startswith('./'): + path = path[2:] + if path in ignore_paths or path.startswith('tests/output/'): + continue + if os.stat(path).st_size == 0: + continue + if not path.endswith('.license') and os.path.exists(path + '.license'): + path = path + '.license' + valid_licenses_for_path = valid_licenses + if path.startswith('plugins/') and not path.startswith(('plugins/modules/', 'plugins/module_utils/')): + valid_licenses_for_path = [license for license in valid_licenses if license == 'GPL-3.0-or-later'] + licenses = find_licenses(path, relax=path in no_comments_allowed) + if not licenses: + if path not in no_comments_allowed: + print('%s: must have at least one license' % (path, )) + else: + for license in licenses: + if license not in valid_licenses_for_path: + print('%s: found not allowed license "%s", must be one of %s' % ( + path, license, format_license_list(valid_licenses_for_path))) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/licenses.py.license b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.py.license new file mode 100644 index 000000000..6c4958feb --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/licenses.py.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: 2022, Felix Fontein <felix@fontein.de> diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.json b/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.json new file mode 100644 index 000000000..c789a7fd3 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.json @@ -0,0 +1,7 @@ +{ + "include_symlinks": true, + "prefixes": [ + "plugins/" + ], + "output": "path-message" +} diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.json.license b/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.json.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.json.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.py b/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.py new file mode 100755 index 000000000..51444ab75 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/extra/no-unwanted-files.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later +"""Prevent unwanted files from being added to the source tree.""" +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import sys + + +def main(): + """Main entry point.""" + paths = sys.argv[1:] or sys.stdin.read().splitlines() + + allowed_extensions = ( + '.cs', + '.ps1', + '.psm1', + '.py', + ) + + skip_paths = set([ + ]) + + skip_directories = ( + ) + + for path in paths: + if path in skip_paths: + continue + + if any(path.startswith(skip_directory) for skip_directory in skip_directories): + continue + + ext = os.path.splitext(path)[1] + + if ext not in allowed_extensions: + print('%s: extension must be one of: %s' % (path, ', '.join(allowed_extensions))) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt new file mode 100644 index 000000000..c368addc9 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt @@ -0,0 +1,8 @@ +plugins/modules/boot.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall.py pylint:bad-option-value # a pylint test that is disabled was modified over time +plugins/modules/firewall.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall_info.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/server.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/server_info.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/v_switch.py validate-modules:return-syntax-error # only allowed in 2.13+ +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.10.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt new file mode 100644 index 000000000..cfbc8b045 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt @@ -0,0 +1,7 @@ +plugins/modules/boot.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall_info.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/server.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/server_info.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/v_switch.py validate-modules:return-syntax-error # only allowed in 2.13+ +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.11.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.12.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.12.txt new file mode 100644 index 000000000..cfbc8b045 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.12.txt @@ -0,0 +1,7 @@ +plugins/modules/boot.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall_info.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/server.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/server_info.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/v_switch.py validate-modules:return-syntax-error # only allowed in 2.13+ +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.12.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.12.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.12.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.13.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.13.txt new file mode 100644 index 000000000..0d9329fad --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.13.txt @@ -0,0 +1 @@ +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.13.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.13.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.13.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.14.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.14.txt new file mode 100644 index 000000000..0d9329fad --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.14.txt @@ -0,0 +1 @@ +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.14.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.14.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.14.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.15.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.15.txt new file mode 100644 index 000000000..0d9329fad --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.15.txt @@ -0,0 +1 @@ +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.15.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.15.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.15.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt b/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt new file mode 100644 index 000000000..fd7a4e740 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt @@ -0,0 +1,3 @@ +plugins/modules/boot.py validate-modules:return-syntax-error # only allowed in 2.13+ +plugins/modules/firewall.py pylint:bad-option-value # a pylint test that is disabled was modified over time +tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt.license b/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/sanity/ignore-2.9.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py b/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py new file mode 100644 index 000000000..31d6adae0 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/inventory/test_robot.py @@ -0,0 +1,361 @@ +# Copyright (c), Felix Fontein <felix@fontein.de>, 2020 +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import json +import os +import textwrap + +import pytest + +from ansible import constants as C +from ansible.inventory.data import InventoryData +from ansible.inventory.manager import InventoryManager +from ansible.module_utils.common.text.converters import to_native +from ansible.template import Templar + +from ansible_collections.community.internal_test_tools.tests.unit.mock.path import mock_unfrackpath_noop +from ansible_collections.community.internal_test_tools.tests.unit.mock.loader import DictDataLoader +from ansible_collections.community.internal_test_tools.tests.unit.utils.open_url_framework import ( + OpenUrlCall, + OpenUrlProxy, +) + +from ansible_collections.community.hrobot.plugins.inventory.robot import InventoryModule +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL + + +original_exists = os.path.exists +original_access = os.access + + +def exists_mock(path, exists=True): + def exists(f): + if to_native(f) == path: + return exists + return original_exists(f) + + return exists + + +def access_mock(path, can_access=True): + def access(f, m, *args, **kwargs): + if to_native(f) == path: + return can_access + return original_access(f, m, *args, **kwargs) + + return access + + +@pytest.fixture(scope="module") +def inventory(): + r = InventoryModule() + r.inventory = InventoryData() + r.templar = Templar(loader=DictDataLoader({})) + return r + + +def get_option(option): + if option == 'filters': + return {} + if option == 'hetzner_user': + return 'test' + if option == 'hetzner_password': + return 'hunter2' + return False + + +def test_populate(inventory, mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + }, + }, + { + 'server': { + 'server_number': 5, + }, + }, + ]) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + + inventory.get_option = mocker.MagicMock(side_effect=get_option) + inventory.populate(inventory.get_servers()) + + open_url.assert_is_done() + + host_1 = inventory.inventory.get_host('1.2.3.4') + host_2 = inventory.inventory.get_host('test-server') + host_3 = inventory.inventory.get_host('5') + + host_1_vars = host_1.get_vars() + host_2_vars = host_2.get_vars() + host_3_vars = host_3.get_vars() + + assert host_1_vars['ansible_host'] == '1.2.3.4' + assert host_1_vars['hrobot_server_ip'] == '1.2.3.4' + assert 'hrobot_server_name' not in host_1_vars + assert host_2_vars['ansible_host'] == '1.2.3.5' + assert host_2_vars['hrobot_server_ip'] == '1.2.3.5' + assert host_2_vars['hrobot_server_name'] == 'test-server' + assert 'ansible_host' not in host_3_vars + assert 'hrobot_server_ip' not in host_3_vars + assert 'hrobot_server_name' not in host_3_vars + + +def test_inventory_file_simple(mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + 'dc': 'foo', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + 'dc': 'foo', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.6', + 'server_name': 'test-server-2', + 'dc': 'bar', + }, + }, + ]) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + inventory_filename = "test.robot.yaml" + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', exists_mock(inventory_filename)) + mocker.patch('os.access', access_mock(inventory_filename)) + + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + filters: + dc: foo + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert im._inventory.hosts + assert '1.2.3.4' in im._inventory.hosts + assert 'test-server' in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert im._inventory.get_host('1.2.3.4') in im._inventory.groups['ungrouped'].hosts + assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 2 + assert len(im._inventory.groups['all'].hosts) == 0 + + +def test_inventory_file_simple_2(mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + 'dc': 'foo', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + 'dc': 'foo', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.6', + 'server_name': 'test-server-2', + 'dc': 'bar', + }, + }, + ]) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + inventory_filename = "test.robot.yaml" + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', exists_mock(inventory_filename)) + mocker.patch('os.access', access_mock(inventory_filename)) + + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: '{{ "test" }}' + hetzner_password: '{{ "hunter2" }}' + filters: + dc: foo + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert im._inventory.hosts + assert '1.2.3.4' in im._inventory.hosts + assert 'test-server' in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert im._inventory.get_host('1.2.3.4') in im._inventory.groups['ungrouped'].hosts + assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 2 + assert len(im._inventory.groups['all'].hosts) == 0 + + +@pytest.mark.parametrize("error_result", [ + None, + json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + ), sort_keys=True).encode('utf-8') +]) +def test_inventory_file_fail(mocker, error_result): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_error(error_result) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + inventory_filename = "test.robot.yml" + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', exists_mock(inventory_filename)) + mocker.patch('os.access', access_mock(inventory_filename)) + + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + filters: + dc: foo + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert not im._inventory.hosts + assert '1.2.3.4' not in im._inventory.hosts + assert 'test-server' not in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 0 + assert len(im._inventory.groups['all'].hosts) == 0 + + +def test_inventory_wrong_file(mocker): + open_url = OpenUrlProxy([]) + inventory_filename = "test.bobot.yml" + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', exists_mock(inventory_filename)) + mocker.patch('os.access', access_mock(inventory_filename)) + + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert not im._inventory.hosts + assert '1.2.3.4' not in im._inventory.hosts + assert 'test-server' not in im._inventory.hosts + assert 'test-server-2' not in im._inventory.hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 0 + assert len(im._inventory.groups['all'].hosts) == 0 + + +def test_inventory_no_file(mocker): + open_url = OpenUrlProxy([]) + inventory_filename = "test.robot.yml" + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', exists_mock(inventory_filename, False)) + mocker.patch('os.access', access_mock(inventory_filename, False)) + + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + im = InventoryManager(loader=DictDataLoader({}), sources=inventory_filename) + open_url.assert_is_done() + + assert not im._inventory.hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 0 + assert len(im._inventory.groups['all'].hosts) == 0 + + +def test_inventory_file_collision(mocker): + open_url = OpenUrlProxy([ + OpenUrlCall('GET', 200) + .result_json([ + { + 'server': { + 'server_ip': '1.2.3.4', + 'server_name': 'test-server', + }, + }, + { + 'server': { + 'server_ip': '1.2.3.5', + 'server_name': 'test-server', + }, + }, + ]) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + inventory_filename = "test.robot.yaml" + mocker.patch('ansible_collections.community.hrobot.plugins.module_utils.robot.open_url', open_url) + mocker.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + mocker.patch('os.path.exists', exists_mock(inventory_filename)) + mocker.patch('os.access', access_mock(inventory_filename)) + + C.INVENTORY_ENABLED = ['community.hrobot.robot'] + inventory_file = {inventory_filename: textwrap.dedent("""\ + --- + plugin: community.hrobot.robot + hetzner_user: test + hetzner_password: hunter2 + """)} + im = InventoryManager(loader=DictDataLoader(inventory_file), sources=inventory_filename) + open_url.assert_is_done() + + assert im._inventory.hosts + assert 'test-server' in im._inventory.hosts + assert im._inventory.get_host('test-server').get_vars()['ansible_host'] == '1.2.3.4' + assert im._inventory.get_host('test-server') in im._inventory.groups['ungrouped'].hosts + assert len(im._inventory.groups['ungrouped'].hosts) == 1 + assert len(im._inventory.groups['all'].hosts) == 0 + # TODO: check for warning diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py b/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py new file mode 100644 index 000000000..56cd02944 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_failover.py @@ -0,0 +1,189 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import copy +import json +import pytest + +from mock import MagicMock +from ansible_collections.community.hrobot.plugins.module_utils import robot +from ansible_collections.community.hrobot.plugins.module_utils import failover + + +class ModuleFailException(Exception): + def __init__(self, msg, **kwargs): + super(ModuleFailException, self).__init__(msg) + self.fail_msg = msg + self.fail_kwargs = kwargs + + +def get_module_mock(): + def f(msg, **kwargs): + raise ModuleFailException(msg, **kwargs) + + module = MagicMock() + module.fail_json = f + module.from_json = json.loads + return module + + +# ######################################################################################## + +GET_FAILOVER_SUCCESS = [ + ( + '1.2.3.4', + (None, dict( + body=json.dumps(dict( + failover=dict( + active_server_ip='1.1.1.1', + ip='1.2.3.4', + netmask='255.255.255.255', + ) + )).encode('utf-8'), + )), + '1.1.1.1', + dict( + active_server_ip='1.1.1.1', + ip='1.2.3.4', + netmask='255.255.255.255', + ) + ), +] + + +GET_FAILOVER_FAIL = [ + ( + '1.2.3.4', + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + 'Request failed: 400 foo (bar)' + ), + ( + '1.2.3.4', + (None, dict( + body='{"foo": "bar"}'.encode('utf-8'), + )), + 'Cannot interpret result: {"foo": "bar"}' + ), +] + + +@pytest.mark.parametrize("ip, return_value, result, record", GET_FAILOVER_SUCCESS) +def test_get_failover_record(monkeypatch, ip, return_value, result, record): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + assert failover.get_failover_record(module, ip) == record + + +@pytest.mark.parametrize("ip, return_value, result", GET_FAILOVER_FAIL) +def test_get_failover_record_fail(monkeypatch, ip, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + with pytest.raises(ModuleFailException) as exc: + failover.get_failover_record(module, ip) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() + + +@pytest.mark.parametrize("ip, return_value, result, record", GET_FAILOVER_SUCCESS) +def test_get_failover(monkeypatch, ip, return_value, result, record): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + assert failover.get_failover(module, ip) == result + + +@pytest.mark.parametrize("ip, return_value, result", GET_FAILOVER_FAIL) +def test_get_failover_fail(monkeypatch, ip, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + with pytest.raises(ModuleFailException) as exc: + failover.get_failover(module, ip) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() + + +# ######################################################################################## + +SET_FAILOVER_SUCCESS = [ + ( + '1.2.3.4', + '1.1.1.1', + (None, dict( + body=json.dumps(dict( + failover=dict( + active_server_ip='1.1.1.2', + ) + )).encode('utf-8'), + )), + ('1.1.1.2', True) + ), + ( + '1.2.3.4', + '1.1.1.1', + (None, dict( + body=json.dumps(dict( + error=dict( + code="FAILOVER_ALREADY_ROUTED", + status=400, + message="Failover already routed", + ), + )).encode('utf-8'), + )), + ('1.1.1.1', False) + ), +] + + +SET_FAILOVER_FAIL = [ + ( + '1.2.3.4', + '1.1.1.1', + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + 'Request failed: 400 foo (bar)' + ), +] + + +@pytest.mark.parametrize("ip, value, return_value, result", SET_FAILOVER_SUCCESS) +def test_set_failover(monkeypatch, ip, value, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + assert failover.set_failover(module, ip, value) == result + + +@pytest.mark.parametrize("ip, value, return_value, result", SET_FAILOVER_FAIL) +def test_set_failover_fail(monkeypatch, ip, value, return_value, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=copy.deepcopy(return_value)) + + with pytest.raises(ModuleFailException) as exc: + failover.set_failover(module, ip, value) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py b/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py new file mode 100644 index 000000000..b53049e8b --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/module_utils/test_robot.py @@ -0,0 +1,207 @@ +# Copyright (c) 2017 Ansible Project +# Copyright (c), Felix Fontein <felix@fontein.de>, 2019-2020 +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import pytest + +from mock import MagicMock +from ansible_collections.community.hrobot.plugins.module_utils import robot + + +class ModuleFailException(Exception): + def __init__(self, msg, **kwargs): + super(ModuleFailException, self).__init__(msg) + self.fail_msg = msg + self.fail_kwargs = kwargs + + +def get_module_mock(): + def f(msg, **kwargs): + raise ModuleFailException(msg, **kwargs) + + module = MagicMock() + module.fail_json = f + module.from_json = json.loads + return module + + +# ######################################################################################## + +FETCH_URL_JSON_SUCCESS = [ + ( + (None, dict( + body=json.dumps(dict( + a='b' + )).encode('utf-8'), + )), + None, + (dict( + a='b' + ), None) + ), + ( + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + a='b' + )).encode('utf-8'), + )), + ['foo'], + (dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + a='b' + ), 'foo') + ), +] + + +FETCH_URL_JSON_FAIL = [ + ( + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + None, + 'Request failed: 400 foo (bar)' + ), + ( + (None, dict( + body=json.dumps(dict( + error=dict( + code="foo", + status=400, + message="bar", + ), + )).encode('utf-8'), + )), + ['bar'], + 'Request failed: 400 foo (bar)' + ), + ( + (None, dict(body='{this is not json}'.encode('utf-8'))), + [], + 'Cannot decode content retrieved from https://foo/bar' + ), + ( + (None, dict(status=400)), + [], + 'Cannot retrieve content from https://foo/bar, HTTP status code 400' + ), +] + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_SUCCESS) +def test_fetch_url_json(monkeypatch, return_value, accept_errors, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=return_value) + + assert robot.fetch_url_json(module, 'https://foo/bar', accept_errors=accept_errors) == result + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_FAIL) +def test_fetch_url_json_fail(monkeypatch, return_value, accept_errors, result): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=return_value) + + with pytest.raises(ModuleFailException) as exc: + robot.fetch_url_json(module, 'https://foo/bar', accept_errors=accept_errors) + + assert exc.value.fail_msg == result + assert exc.value.fail_kwargs == dict() + + +def test_fetch_url_json_empty(monkeypatch): + module = get_module_mock() + robot.fetch_url = MagicMock(return_value=(None, dict(status=204, body=''))) + + assert robot.fetch_url_json(module, 'https://foo/bar', allow_empty_result=True) == (None, None) + + robot.fetch_url = MagicMock(return_value=(None, dict(status=400, body=''))) + + with pytest.raises(ModuleFailException) as exc: + robot.fetch_url_json(module, 'https://foo/bar', allow_empty_result=True) + + assert exc.value.fail_msg == 'Cannot retrieve content from https://foo/bar, HTTP status code 400' + assert exc.value.fail_kwargs == dict() + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_SUCCESS) +def test_plugin_open_url_json(monkeypatch, return_value, accept_errors, result): + response = MagicMock() + response.read = MagicMock(return_value=return_value[1]['body']) + robot.open_url = MagicMock(return_value=response) + plugin = MagicMock() + + assert robot.plugin_open_url_json(plugin, 'https://foo/bar', accept_errors=accept_errors) == result + + +@pytest.mark.parametrize("return_value, accept_errors, result", FETCH_URL_JSON_FAIL) +def test_plugin_open_url_json_fail(monkeypatch, return_value, accept_errors, result): + response = MagicMock() + response.read = MagicMock(return_value=return_value[1].get('body', '')) + robot.open_url = MagicMock(side_effect=robot.HTTPError('https://foo/bar', 400, 'Error!', {}, response)) + plugin = MagicMock() + + with pytest.raises(robot.PluginException) as exc: + robot.plugin_open_url_json(plugin, 'https://foo/bar', accept_errors=accept_errors) + + assert exc.value.error_message == result + + +def test_plugin_open_url_json_fail_other(monkeypatch): + robot.open_url = MagicMock(side_effect=Exception('buh!')) + plugin = MagicMock() + + with pytest.raises(robot.PluginException) as exc: + robot.plugin_open_url_json(plugin, 'https://foo/bar') + + assert exc.value.error_message == 'Failed request to Hetzner Robot server endpoint https://foo/bar: buh!' + + +def test_plugin_open_url_json_fail_other_2(monkeypatch): + response = MagicMock() + response.read = MagicMock(side_effect=AttributeError('read')) + robot.open_url = MagicMock(side_effect=robot.HTTPError('https://foo/bar', 400, 'Error!', {}, response)) + plugin = MagicMock() + + with pytest.raises(robot.PluginException) as exc: + robot.plugin_open_url_json(plugin, 'https://foo/bar') + + assert exc.value.error_message == 'Cannot retrieve content from https://foo/bar, HTTP status code 400' + + +def test_plugin_open_url_json_empty_result(monkeypatch): + response = MagicMock() + response.read = MagicMock(return_value='') + response.code = 200 + robot.open_url = MagicMock(return_value=response) + plugin = MagicMock() + + assert robot.plugin_open_url_json(plugin, 'https://foo/bar', allow_empty_result=True) == (None, None) + + response = MagicMock() + response.read = MagicMock(side_effect=AttributeError('read')) + robot.open_url = MagicMock(side_effect=robot.HTTPError('https://foo/bar', 400, 'Error!', {}, response)) + + with pytest.raises(robot.PluginException) as exc: + robot.plugin_open_url_json(plugin, 'https://foo/bar') + + assert exc.value.error_message == 'Cannot retrieve content from https://foo/bar, HTTP status code 400' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_boot.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_boot.py new file mode 100644 index 000000000..7117afb21 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_boot.py @@ -0,0 +1,541 @@ +# Copyright (c) 2021 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import boot + + +def _amend_server_data(data): + data.update({ + 'server_ip': '123.123.123.123', + 'server_ipv6_net': '2a01:4f8:111:4221::', + 'server_number': 23, + }) + return data + + +def create_rescue_inactive(): + return _amend_server_data({ + 'active': False, + 'arch': [64, 32], + 'authorized_key': [], + 'boot_time': None, + 'host_key': [], + 'os': ['linux', 'linuxold', 'freebsd', 'freebsdbeta', 'freebsdax', 'freebsdbetaax', 'vkvm', 'vkvmold'], + 'password': None, + }) + + +def create_rescue_active(os='linux', arch=64, authorized_key=None, host_key=None): + return _amend_server_data({ + 'active': True, + 'arch': arch, + 'authorized_key': authorized_key or [], + 'boot_time': None, + 'host_key': host_key or [], + 'os': os, + 'password': 'aBcDeFgHiJ1234', + }) + + +def create_linux_inactive(): + return { + 'dist': [ + 'Arch Linux latest minimal', + 'CentOS 7.9 minimal', + 'CentOS 8.4 minimal', + 'Debian 10.10 LAMP', + 'Debian 10.10 minimal', + 'Debian 11 base', + 'Ubuntu 18.04.5 LTS minimal', + 'Ubuntu 18.04.5 LTS Nextcloud', + 'Ubuntu 20.04.1 LTS minimal', + ], + 'arch': [64], + 'lang': ['en'], + 'active': False, + 'password': None, + 'authorized_key': [], + 'host_key': [], + } + + +def create_linux_active(dist='Arch Linux latest minimal', arch=64, lang='en', authorized_key=None, host_key=None): + return { + 'dist': dist, + 'arch': arch, + 'lang': lang, + 'active': True, + 'password': 'aBcDeFgHiJ1234', + 'authorized_key': authorized_key or [], + 'host_key': host_key or [], + } + + +def create_vnc_inactive(): + return { + 'dist': ['CentOS-7.9', 'CentOS-8.4', 'Fedora-33', 'openSUSE-15.2'], + 'arch': [64], + 'lang': ['de_DE', 'en_US', 'fr_FR', 'ru_RU'], + 'active': False, + 'password': None, + } + + +def _amend_boot(data=None): + if data is None: + data = {} + if 'rescue' not in data: + data['rescue'] = create_rescue_inactive() + if 'linux' not in data: + data['linux'] = create_linux_inactive() + if 'vnc' not in data: + data['vnc'] = create_vnc_inactive() + for section in ('windows', 'plesk', 'cpanel'): + if section not in data: + data[section] = None + return { + 'boot': data, + } + + +class TestHetznerBoot(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.boot.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + def test_idempotent_regular(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'server_number': 23, + 'regular_boot': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_linux_inactive(), + })) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['configuration_type'] == 'regular_boot' + assert result['password'] is None + + def test_rescue_idempotent(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'rescue': { + 'os': 'linux', + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_rescue_active(os='linux'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['configuration_type'] == 'rescue' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_rescue_idempotent_2(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'rescue': { + 'os': 'linux', + 'arch': 32, + 'authorized_keys': [ + 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0', + 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99', + '0f:1e:2d:3c:4b:5a:69:78:87:96:a5:b4:c3:d2:e1:f0', + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_rescue_active(os='linux', arch=32, authorized_key=[ + { + 'key': { + 'fingerprint': 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0', + 'name': 'baz', + 'size': 4096, + 'type': 'RSA', + }, + }, + { + 'key': { + 'fingerprint': 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99', + 'name': 'foo bar', + 'size': 2048, + 'type': 'RSA', + }, + }, + { + 'key': { + 'fingerprint': '0f:1e:2d:3c:4b:5a:69:78:87:96:a5:b4:c3:d2:e1:f0', + 'name': 'test', + 'size': 3072, + 'type': 'RSA', + }, + }, + ]), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['configuration_type'] == 'rescue' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_rescue_deactivate(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'regular_boot': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_rescue_active(os='linux'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_url('{0}/boot/23/rescue'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'regular_boot' + assert result['password'] is None + + def test_rescue_deactivate_check_mode(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'regular_boot': True, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_rescue_active(os='linux'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'regular_boot' + assert result['password'] is None + + def test_rescue_activate(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'rescue': { + 'os': 'linux', + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot()) + .expect_url('{0}/boot/23'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_form_value('os', 'linux') + .expect_form_value_absent('arch') + .expect_form_value_absent('authorized_key') + .result_json({ + 'rescue': create_rescue_active(os='linux'), + }) + .expect_url('{0}/boot/23/rescue'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'rescue' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_rescue_activate_check_mode(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'rescue': { + 'os': 'linux', + }, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot()) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'rescue' + assert result['password'] is None + + def test_rescue_reactivate(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'rescue': { + 'os': 'linuxold', + 'arch': 32, + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_rescue_active(os='linux'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_url('{0}/boot/23/rescue'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_form_value('os', 'linuxold') + .expect_form_value('arch', '32') + .expect_form_value_absent('authorized_key') + .result_json({ + 'rescue': create_rescue_active(os='linuxold', arch=32), + }) + .expect_url('{0}/boot/23/rescue'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'rescue' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_rescue_reactivate_check_mode(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'rescue': { + 'os': 'linuxold', + 'arch': 32, + }, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'rescue': create_rescue_active(os='linux'), + })) + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'rescue' + assert result['password'] is None + + def test_install_linux_idempotent(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'install_linux': { + 'dist': 'Arch Linux latest minimal', + 'lang': 'en', + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'linux': create_linux_active(dist='Arch Linux latest minimal', lang='en'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['configuration_type'] == 'install_linux' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_install_linux_idempotent_2(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'install_linux': { + 'dist': 'Arch Linux latest minimal', + 'arch': 32, + 'lang': 'de', + 'authorized_keys': [ + 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0', + '0f:1e:2d:3c:4b:5a:69:78:87:96:a5:b4:c3:d2:e1:f0', + 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99', + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'linux': create_linux_active(dist='Arch Linux latest minimal', arch=32, lang='de', authorized_key=[ + { + 'key': { + 'fingerprint': 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0', + 'name': 'abc', + 'size': 4096, + 'type': 'RSA', + }, + }, + { + 'key': { + 'fingerprint': 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99', + 'name': 'buzz', + 'size': 2048, + 'type': 'RSA', + }, + }, + { + 'key': { + 'fingerprint': '0f:1e:2d:3c:4b:5a:69:78:87:96:a5:b4:c3:d2:e1:f0', + 'name': 'afz', + 'size': 2048, + 'type': 'RSA', + }, + }, + ]), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['configuration_type'] == 'install_linux' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_install_linux_deactivate(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'regular_boot': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'linux': create_linux_active(dist='Arch Linux latest minimal'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_url('{0}/boot/23/linux'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'regular_boot' + assert result['password'] is None + + def test_install_linux_activate(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'install_linux': { + 'dist': 'Arch Linux latest minimal', + 'lang': 'en', + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot()) + .expect_url('{0}/boot/23'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_form_value('dist', 'Arch Linux latest minimal') + .expect_form_value_absent('arch') + .expect_form_value_absent('authorized_key') + .result_json({ + 'linux': create_linux_active(dist='Arch Linux latest minimal', lang='en'), + }) + .expect_url('{0}/boot/23/linux'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'install_linux' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_install_linux_reactivate(self, mocker): + result = self.run_module_success(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'install_linux': { + 'dist': 'Debian 11 base', + 'arch': 32, + 'lang': 'fr', + 'authorized_keys': [ + 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0', + 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99', + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json(_amend_boot({ + 'linux': create_linux_active(dist='Arch Linux latest minimal', lang='en'), + })) + .expect_url('{0}/boot/23'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_url('{0}/boot/23/linux'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_form_value('dist', 'Debian 11 base') + .expect_form_value('arch', '32') + .expect_form_value('lang', 'fr') + .expect_form_present('authorized_key') + # .expect_form_value('authorized_key', 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0') + # .expect_form_value('authorized_key', 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99') + .result_json({ + 'linux': create_linux_active(dist='Debian 11 base', lang='fr', arch=32, authorized_key=[ + { + 'key': { + 'fingerprint': 'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99', + 'name': 'foo bar', + 'size': 4096, + 'type': 'RSA', + }, + }, + { + 'key': { + 'fingerprint': 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0', + 'name': 'bar', + 'size': 2048, + 'type': 'RSA', + }, + }, + ]), + }) + .expect_url('{0}/boot/23/linux'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['configuration_type'] == 'install_linux' + assert result['password'] == 'aBcDeFgHiJ1234' + + def test_server_not_found(self, mocker): + result = self.run_module_failed(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'regular_boot': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'Server not found', + }, + }) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'This server does not exist, or you do not have access rights for it' + + def test_invalid_input(self, mocker): + result = self.run_module_failed(mocker, boot, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'regular_boot': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'BOOT_NOT_AVAILABLE', + 'message': 'No boot configuration available for this server', + }, + }) + .expect_url('{0}/boot/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'There is no boot configuration available for this server' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py new file mode 100644 index 000000000..dcf27d249 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip.py @@ -0,0 +1,247 @@ +# Copyright (c) 2020 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import failover_ip + + +class TestHetznerFailoverIP(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.failover_ip.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state idempotence (routed and unrouted) + + def test_unrouted(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'failover_ip': '1.2.3.4', + 'state': 'unrouted', + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] is None + assert result['state'] == 'unrouted' + + def test_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + # Tests for changing state (unrouted to routed, vice versa) + + def test_unrouted_to_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_form_value('active_server_ip', '4.3.2.1') + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + def test_unrouted_to_routed_check_mode(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + def test_routed_to_unrouted(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'unrouted', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] is None + assert result['state'] == 'unrouted' + + # Tests for re-routing + + def test_rerouting(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '5.4.3.2', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_form_value('active_server_ip', '4.3.2.1') + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + + def test_rerouting_already_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + 'state': 'routed', + 'value': '4.3.2.1', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '5.4.3.2', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 409) + .result_json({ + 'error': { + 'status': 409, + 'code': 'FAILOVER_ALREADY_ROUTED', + 'message': 'Failover already routed', + }, + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_form_value('active_server_ip', '4.3.2.1') + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py new file mode 100644 index 000000000..fb6cbfcd3 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_failover_ip_info.py @@ -0,0 +1,74 @@ +# Copyright (c) 2020 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import failover_ip_info + + +class TestHetznerFailoverIPInfo(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.failover_ip_info.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state (routed and unrouted) + + def test_unrouted(self, mocker): + result = self.run_module_success(mocker, failover_ip_info, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'failover_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': None, + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] is None + assert result['state'] == 'unrouted' + assert result['failover_ip'] == '1.2.3.4' + assert result['server_ip'] == '2.3.4.5' + assert result['server_number'] == 2345 + + def test_routed(self, mocker): + result = self.run_module_success(mocker, failover_ip_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'failover_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'failover': { + 'ip': '1.2.3.4', + 'netmask': '255.255.255.255', + 'server_ip': '2.3.4.5', + 'server_number': 2345, + 'active_server_ip': '4.3.2.1', + }, + }) + .expect_url('{0}/failover/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['value'] == '4.3.2.1' + assert result['state'] == 'routed' + assert result['failover_ip'] == '1.2.3.4' + assert result['server_ip'] == '2.3.4.5' + assert result['server_number'] == 2345 diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py new file mode 100644 index 000000000..8908c5abb --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall.py @@ -0,0 +1,1865 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import pytest + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import firewall + + +def create_params(parameter, *values): + assert len(values) > 1 + result = [] + for i in range(1, len(values)): + result.append((parameter, values[i - 1], values[i])) + return result + + +def flatten(list_of_lists): + result = [] + for l in list_of_lists: + result.extend(l) + return result + + +class TestHetznerFirewall(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.firewall.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state (absent and present) + + def test_absent_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_idempotency_no_rules(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'rules' in result['firewall'] + assert 'input' in result['firewall']['rules'] + assert len(result['firewall']['rules']['input']) == 0 + assert 'output' in result['firewall']['rules'] + assert len(result['firewall']['rules']['output']) == 0 + + def test_absent_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 4321, + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 4321, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/4321'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 4321, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/4321'.format(BASE_URL)) + .expect_form_value('status', 'disabled'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 4321 + + def test_absent_changed_no_rules(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'disabled'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['before']['rules']['output']) == 0 + assert result['diff']['after']['status'] == 'disabled' + assert len(result['diff']['after']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 0 + + def test_present_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_present_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + # Tests for state (absent and present) with check mode + + def test_absent_idempotency_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_changed_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'disabled' + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_present_idempotency_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_present_changed_check(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + # Tests for port + + def test_port_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'port': 'main', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['port'] == 'main' + assert result['diff']['after']['port'] == 'main' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['port'] == 'main' + + def test_port_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'port': 'main', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': True, + 'port': 'kvm', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('port', 'main'), + ]) + assert result['changed'] is True + assert result['diff']['before']['port'] == 'kvm' + assert result['diff']['after']['port'] == 'main' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['port'] == 'main' + + # Tests for allowlist_hos + + def test_allowlist_hos_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'allowlist_hos': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['allowlist_hos'] is True + assert result['diff']['before']['whitelist_hos'] is True + assert result['diff']['after']['allowlist_hos'] is True + assert result['diff']['after']['whitelist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['allowlist_hos'] is True + assert result['firewall']['whitelist_hos'] is True + + def test_allowlist_hos_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'allowlist_hos': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('whitelist_hos', 'true'), + ]) + assert result['changed'] is True + assert result['diff']['before']['allowlist_hos'] is False + assert result['diff']['before']['whitelist_hos'] is False + assert result['diff']['after']['allowlist_hos'] is True + assert result['diff']['after']['whitelist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['allowlist_hos'] is True + assert result['firewall']['whitelist_hos'] is True + + # Tests for filter_ipv6 + + def test_filter_ipv6_idempotency(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'filter_ipv6': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['filter_ipv6'] is True + assert result['diff']['after']['filter_ipv6'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['filter_ipv6'] is True + + def test_filter_ipv6_changed(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'filter_ipv6': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('filter_ipv6', 'true'), + ]) + assert result['changed'] is True + assert result['diff']['before']['filter_ipv6'] is False + assert result['diff']['after']['filter_ipv6'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert result['firewall']['filter_ipv6'] is True + + # Tests for wait_for_configured in getting status + + def test_wait_get(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_wait_get_timeout(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_failed(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Timeout while waiting for firewall to be configured.' + + def test_nowait_get(self, mocker): + result = self.run_module_failed(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Firewall configuration cannot be read as it is not configured.' + + # Tests for wait_for_configured in setting status + + def test_wait_update(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': True, + 'state': 'present', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_wait_update_timeout(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'Timeout while waiting for firewall to be configured.' in result['warnings'] + + def test_nowait_update(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'disabled' + assert result['diff']['after']['status'] == 'active' + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + # Idempotency checks: different amount of input/output rules + + def test_input_rule_len_change_0_1(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [ + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][name]') + .expect_form_value('rules[input][0][ip_version]', 'ipv4') + .expect_form_value_absent('rules[input][0][dst_ip]') + .expect_form_value_absent('rules[input][0][dst_port]') + .expect_form_value_absent('rules[input][0][src_ip]') + .expect_form_value_absent('rules[input][0][src_port]') + .expect_form_value_absent('rules[input][0][protocol]') + .expect_form_value_absent('rules[input][0][tcp_flags]') + .expect_form_value('rules[input][0][action]', 'discard') + .expect_form_value_absent('rules[input][1][action]') + .expect_form_value_absent('rules[output][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['before']['rules']['output']) == 0 + assert len(result['diff']['after']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert len(result['firewall']['rules']['output']) == 0 + + def test_output_rule_len_change_0_1(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'output': [ + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'output': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[output][0][name]') + .expect_form_value('rules[output][0][ip_version]', 'ipv4') + .expect_form_value_absent('rules[output][0][dst_ip]') + .expect_form_value_absent('rules[output][0][dst_port]') + .expect_form_value_absent('rules[output][0][src_ip]') + .expect_form_value_absent('rules[output][0][src_port]') + .expect_form_value_absent('rules[output][0][protocol]') + .expect_form_value_absent('rules[output][0][tcp_flags]') + .expect_form_value('rules[output][0][action]', 'discard') + .expect_form_value_absent('rules[output][1][action]') + .expect_form_value_absent('rules[input][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['before']['rules']['output']) == 0 + assert len(result['diff']['after']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['output']) == 1 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 1 + + def test_input_rule_len_change_1_0(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][action]') + .expect_form_value_absent('rules[output][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['before']['rules']['output']) == 0 + assert len(result['diff']['after']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 0 + + def test_output_rule_len_change_1_0(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][0][action]') + .expect_form_value_absent('rules[output][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['before']['rules']['output']) == 1 + assert len(result['diff']['after']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 0 + + def test_input_rule_len_change_1_2(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [ + { + 'ip_version': 'ipv4', + 'dst_port': 80, + 'protocol': 'tcp', + 'action': 'accept', + }, + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': '80', + 'src_ip': None, + 'src_port': None, + 'protocol': 'tcp', + 'tcp_flags': None, + 'action': 'accept', + }, + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value('rules[input][0][action]', 'accept') + .expect_form_value('rules[input][1][action]', 'discard') + .expect_form_value_absent('rules[input][2][action]') + .expect_form_value_absent('rules[output][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['before']['rules']['output']) == 0 + assert len(result['diff']['after']['rules']['input']) == 2 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 2 + assert len(result['firewall']['rules']['output']) == 0 + + def test_output_rule_len_change_1_2(self, mocker): + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [], + 'output': [ + { + 'ip_version': 'ipv4', + 'dst_port': 80, + 'protocol': 'tcp', + 'action': 'accept', + }, + { + 'ip_version': 'ipv4', + 'action': 'discard', + }, + ], + }, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [ + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': '80', + 'src_ip': None, + 'src_port': None, + 'protocol': 'tcp', + 'tcp_flags': None, + 'action': 'accept', + }, + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + }, + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value('rules[output][0][action]', 'accept') + .expect_form_value('rules[output][1][action]', 'discard') + .expect_form_value_absent('rules[output][2][action]') + .expect_form_value_absent('rules[input][0][action]'), + ]) + assert result['changed'] is True + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['before']['rules']['output']) == 1 + assert len(result['diff']['after']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['output']) == 2 + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 2 + + # Idempotency checks: change one value + + @pytest.mark.parametrize("parameter, before, after", flatten([ + create_params('name', None, '', 'Test', 'Test', 'foo', '', None), + create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'), + create_params('dst_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('dst_port', None, '80', '80-443', '80-443', None), + create_params('src_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('src_port', None, '80', '80-443', '80-443', None), + create_params('protocol', None, 'tcp', 'tcp', 'udp', 'udp', None), + create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None), + create_params('action', 'accept', 'accept', 'discard', 'discard'), + ])) + def test_input_rule_value_change(self, mocker, parameter, before, after): + input_call = { + 'ip_version': 'ipv4', + 'action': 'discard', + } + input_before = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + input_after = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + input_call[parameter] = after + input_before[parameter] = before + input_after[parameter] = after + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [input_before], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before != after) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [input_after], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][1][action]') + ) + if parameter != 'ip_version': + after_call.expect_form_value('rules[input][0][ip_version]', 'ipv4') + if parameter != 'action': + after_call.expect_form_value('rules[input][0][action]', 'discard') + if after is not None: + after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after) + else: + after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter)) + calls.append(after_call) + + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [input_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['before']['rules']['output']) == 0 + assert len(result['diff']['after']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['diff']['before']['rules']['input'][0][parameter] == before + assert result['diff']['after']['rules']['input'][0][parameter] == after + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert result['firewall']['rules']['input'][0][parameter] == after + assert len(result['firewall']['rules']['output']) == 0 + + @pytest.mark.parametrize("parameter, before, after", flatten([ + create_params('name', None, '', 'Test', 'Test', 'foo', '', None), + create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'), + create_params('dst_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('dst_port', None, '80', '80-443', '80-443', None), + create_params('src_ip', None, '1.2.3.4/24', '1.2.3.4/32', '1.2.3.4/32', None), + create_params('src_port', None, '80', '80-443', '80-443', None), + create_params('protocol', None, 'tcp', 'tcp', 'udp', 'udp', None), + create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None), + create_params('action', 'accept', 'accept', 'discard', 'discard'), + ])) + def test_output_rule_value_change(self, mocker, parameter, before, after): + output_call = { + 'ip_version': 'ipv4', + 'action': 'discard', + } + output_before = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + output_after = { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + output_call[parameter] = after + output_before[parameter] = before + output_after[parameter] = after + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'output': [output_before], + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before != after) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'output': [output_after], + 'input': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[output][1][action]') + .expect_form_value_absent('rules[input][0][action]') + ) + if parameter != 'ip_version': + after_call.expect_form_value('rules[output][0][ip_version]', 'ipv4') + if parameter != 'action': + after_call.expect_form_value('rules[output][0][action]', 'discard') + if after is not None: + after_call.expect_form_value('rules[output][0][{0}]'.format(parameter), after) + else: + after_call.expect_form_value_absent('rules[output][0][{0}]'.format(parameter)) + calls.append(after_call) + + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [], + 'output': [output_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 0 + assert len(result['diff']['before']['rules']['output']) == 1 + assert len(result['diff']['after']['rules']['input']) == 0 + assert len(result['diff']['after']['rules']['output']) == 1 + assert result['diff']['before']['rules']['output'][0][parameter] == before + assert result['diff']['after']['rules']['output'][0][parameter] == after + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 1 + assert result['firewall']['rules']['output'][0][parameter] == after + + # Idempotency checks: IP address normalization + + @pytest.mark.parametrize("ip_version, parameter, before_normalized, after_normalized, after", [ + ('ipv4', 'src_ip', '1.2.3.4/32', '1.2.3.4/32', '1.2.3.4'), + ('ipv6', 'src_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3::4'), + ('ipv6', 'dst_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3:0::4'), + ('ipv6', 'dst_ip', '::/0', '::/0', '0:0::0/0'), + ('ipv6', 'dst_ip', '::/0', '::1/0', '0:0::0:1/0'), + ('ipv6', 'dst_ip', '::/0', None, None), + ]) + def test_input_rule_ip_normalization(self, mocker, ip_version, parameter, before_normalized, after_normalized, after): + assert ip_version in ('ipv4', 'ipv6') + assert parameter in ('src_ip', 'dst_ip') + input_call = { + 'ip_version': ip_version, + 'action': 'discard', + } + input_before = { + 'name': None, + 'ip_version': ip_version, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + input_after = { + 'name': None, + 'ip_version': ip_version, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + if after is not None: + input_call[parameter] = after + input_before[parameter] = before_normalized + input_after[parameter] = after_normalized + + calls = [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [input_before], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ] + + changed = (before_normalized != after_normalized) + if changed: + after_call = ( + FetchUrlCall('POST', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [input_after], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)) + .expect_form_value('status', 'active') + .expect_form_value_absent('rules[input][1][action]') + ) + after_call.expect_form_value('rules[input][0][ip_version]', ip_version) + after_call.expect_form_value('rules[input][0][action]', 'discard') + if after_normalized is None: + after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter)) + else: + after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized) + calls.append(after_call) + + result = self.run_module_success(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'rules': { + 'input': [input_call], + }, + }, calls) + assert result['changed'] == changed + assert result['diff']['before']['status'] == 'active' + assert result['diff']['after']['status'] == 'active' + assert len(result['diff']['before']['rules']['input']) == 1 + assert len(result['diff']['before']['rules']['output']) == 0 + assert len(result['diff']['after']['rules']['input']) == 1 + assert len(result['diff']['after']['rules']['output']) == 0 + assert result['diff']['before']['rules']['input'][0][parameter] == before_normalized + assert result['diff']['after']['rules']['input'][0][parameter] == after_normalized + assert result['firewall']['status'] == 'active' + assert len(result['firewall']['rules']['input']) == 1 + assert len(result['firewall']['rules']['output']) == 0 + assert result['firewall']['rules']['input'][0][parameter] == after_normalized + + # Missing requirements + + def test_fail_no_ipaddress(self, mocker): + try: + firewall.HAS_IPADDRESS = False + firewall.IPADDRESS_IMP_ERR = 'This is\na traceback' + result = self.run_module_failed(mocker, firewall, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'state': 'present', + 'wait_for_configured': True, + 'timeout': 0, + }, []) + assert result['msg'].startswith('Failed to import the required Python library (ipaddress) on') + assert result['exception'] == 'This is\na traceback' + finally: + firewall.HAS_IPADDRESS = True diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py new file mode 100644 index 000000000..4e0bbdb3c --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_firewall_info.py @@ -0,0 +1,328 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import firewall_info + + +class TestHetznerFirewallInfo(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.firewall_info.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Tests for state (absent and present) + + def test_absent(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'server_number': 1, + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['filter_ipv6'] is False + assert result['firewall']['allowlist_hos'] is False + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + + def test_absent_no_rules(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'disabled', + 'whitelist_hos': False, + 'port': 'main', + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['filter_ipv6'] is False + assert result['firewall']['status'] == 'disabled' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert 'rules' in result['firewall'] + assert 'input' in result['firewall']['rules'] + assert len(result['firewall']['rules']['input']) == 0 + + def test_present(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['filter_ipv6'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert len(result['firewall']['rules']['input']) == 0 + assert len(result['firewall']['rules']['output']) == 0 + + def test_present_w_rules(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [ + { + 'name': 'Accept HTTPS traffic', + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': '443', + 'src_ip': None, + 'src_port': None, + 'protocol': 'tcp', + 'tcp_flags': None, + 'action': 'accept', + }, + { + 'name': None, + 'ip_version': 'ipv4', + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'discard', + } + ], + 'output': [ + { + 'name': None, + 'ip_version': None, + 'dst_ip': None, + 'dst_port': None, + 'src_ip': None, + 'src_port': None, + 'protocol': None, + 'tcp_flags': None, + 'action': 'accept', + } + ], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['filter_ipv6'] is True + assert result['firewall']['allowlist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 + assert len(result['firewall']['rules']['input']) == 2 + assert result['firewall']['rules']['input'][0]['name'] == 'Accept HTTPS traffic' + assert result['firewall']['rules']['input'][0]['dst_port'] == '443' + assert result['firewall']['rules']['input'][0]['action'] == 'accept' + assert result['firewall']['rules']['input'][1]['dst_port'] is None + assert result['firewall']['rules']['input'][1]['action'] == 'discard' + assert len(result['firewall']['rules']['output']) == 1 + assert result['firewall']['rules']['output'][0]['name'] is None + assert result['firewall']['rules']['output'][0]['ip_version'] is None + assert result['firewall']['rules']['output'][0]['action'] == 'accept' + + # Tests for wait_for_configured in getting status + + def test_wait_get(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 123, + 'wait_for_configured': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 123, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/123'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 123, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/123'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': False, + 'server_ip': '1.2.3.4', + 'server_number': 123, + 'status': 'active', + 'whitelist_hos': True, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/123'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['filter_ipv6'] is False + assert result['firewall']['whitelist_hos'] is True + assert result['firewall']['allowlist_hos'] is True + assert result['firewall']['status'] == 'active' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 123 + + def test_wait_get_timeout(self, mocker): + mocker.patch('time.sleep', lambda duration: None) + result = self.run_module_failed(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': True, + 'timeout': 0, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'Timeout while waiting for firewall to be configured.' + + def test_nowait_get(self, mocker): + result = self.run_module_success(mocker, firewall_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_ip': '1.2.3.4', + 'wait_for_configured': False, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'firewall': { + 'filter_ipv6': True, + 'server_ip': '1.2.3.4', + 'server_number': 1, + 'status': 'in process', + 'whitelist_hos': False, + 'port': 'main', + 'rules': { + 'input': [], + 'output': [], + }, + }, + }) + .expect_url('{0}/firewall/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['firewall']['status'] == 'in process' + assert result['firewall']['server_ip'] == '1.2.3.4' + assert result['firewall']['server_number'] == 1 diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_reset.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_reset.py new file mode 100644 index 000000000..55bfdbbb4 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_reset.py @@ -0,0 +1,237 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import reset + + +class TestHetznerReset(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.reset.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + def test_check_valid(self, mocker): + result = self.run_module_success(mocker, reset, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'server_number': 23, + 'reset_type': 'software', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json({ + 'reset': { + 'server_ip': '123.123.123.123', + 'server_ipv6_net': '2a01:4f8:111:4221::', + 'server_number': 23, + 'type': [ + 'sw', + 'hw', + 'man' + ], + 'operating_status': 'not supported', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['changed'] is True + + def test_valid(self, mocker): + result = self.run_module_success(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'manual', + }, [ + FetchUrlCall('POST', 200) + .expect_form_value('type', 'man') + .result_json({ + 'reset': { + 'server_ip': '123.123.123.123', + 'server_ipv6_net': '2a01:4f8:111:4221::', + 'server_number': 23, + 'type': 'man', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['changed'] is True + + # Errors + + def test_invalid(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + }, [ + FetchUrlCall('POST', 400) + .expect_form_value('type', 'power') + .result_json({ + 'error': { + 'status': 400, + 'code': 'INVALID_INPUT', + 'message': 'Invalid input parameters', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'The chosen reset method is not supported for this server' + + def test_check_invalid(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'reset': { + 'server_ip': '123.123.123.123', + 'server_ipv6_net': '2a01:4f8:111:4221::', + 'server_number': 23, + 'type': [ + 'sw', + 'hw', + 'man' + ], + 'operating_status': 'not supported', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'The chosen reset method is not supported for this server' + + def test_server_not_found(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + }, [ + FetchUrlCall('POST', 404) + .expect_form_value('type', 'power') + .result_json({ + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'Server not found', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'This server does not exist, or you do not have access rights for it' + + def test_check_server_not_found(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'Server not found', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'This server does not exist, or you do not have access rights for it' + + def test_reset_not_available(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + }, [ + FetchUrlCall('POST', 404) + .expect_form_value('type', 'power') + .result_json({ + 'error': { + 'status': 404, + 'code': 'RESET_NOT_AVAILABLE', + 'message': 'The server has no reset option', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'The server has no reset option available' + + def test_check_reset_not_available(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'RESET_NOT_AVAILABLE', + 'message': 'The server has no reset option', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'The server has no reset option available' + + def test_reset_manual_active(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + }, [ + FetchUrlCall('POST', 409) + .expect_form_value('type', 'power') + .result_json({ + 'error': { + 'status': 409, + 'code': 'RESET_MANUAL_ACTIVE', + 'message': 'There is already a running manual reset', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'A manual reset is already running' + + def test_reset_failed(self, mocker): + result = self.run_module_failed(mocker, reset, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'reset_type': 'power', + }, [ + FetchUrlCall('POST', 500) + .expect_form_value('type', 'power') + .result_json({ + 'error': { + 'status': 500, + 'code': 'RESET_FAILED', + 'message': 'Resetting failed due to an internal error', + }, + }) + .expect_url('{0}/reset/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'The reset failed due to an internal error at Hetzner' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_reverse_dns.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_reverse_dns.py new file mode 100644 index 000000000..44a3944a7 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_reverse_dns.py @@ -0,0 +1,165 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import reverse_dns + + +class TestHetznerReverseDNS(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.reverse_dns.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + def test_idempotent_present(self, mocker): + result = self.run_module_success(mocker, reverse_dns, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'ip': '1.2.3.4', + 'value': 'foo.example.com', + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json({ + 'rdns': { + 'ip': '1.2.3.4', + 'ptr': 'foo.example.com', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + + def test_idempotent_absent(self, mocker): + result = self.run_module_success(mocker, reverse_dns, { + 'hetzner_user': '', + 'hetzner_password': '', + 'ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'RDNS_NOT_FOUND', + 'message': 'The IP address 1.2.3.4 has no reverse DNS entry yet', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is False + + def test_set_check_mode(self, mocker): + result = self.run_module_success(mocker, reverse_dns, { + 'hetzner_user': '', + 'hetzner_password': '', + 'ip': '1.2.3.4', + 'value': 'foo.example.com', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'RDNS_NOT_FOUND', + 'message': 'The IP address 1.2.3.4 has no reverse DNS entry yet', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + + def test_set(self, mocker): + result = self.run_module_success(mocker, reverse_dns, { + 'hetzner_user': '', + 'hetzner_password': '', + 'ip': '1.2.3.4', + 'value': 'foo.example.com', + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'RDNS_NOT_FOUND', + 'message': 'The IP address 1.2.3.4 has no reverse DNS entry yet', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_form_value('ptr', 'foo.example.com') + .result_json({ + 'rdns': { + 'ip': '1.2.3.4', + 'ptr': 'foo.example.com', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + + def test_remove_check_mode(self, mocker): + result = self.run_module_success(mocker, reverse_dns, { + 'hetzner_user': '', + 'hetzner_password': '', + 'ip': '1.2.3.4', + 'state': 'absent', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'rdns': { + 'ip': '1.2.3.4', + 'ptr': 'foo.example.com', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + + def test_remove(self, mocker): + result = self.run_module_success(mocker, reverse_dns, { + 'hetzner_user': '', + 'hetzner_password': '', + 'ip': '1.2.3.4', + 'state': 'absent', + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'rdns': { + 'ip': '1.2.3.4', + 'ptr': 'foo.example.com', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['changed'] is True + + def test_bad_ip(self, mocker): + result = self.run_module_failed(mocker, reverse_dns, { + 'hetzner_user': '', + 'hetzner_password': '', + 'ip': '1.2.3.4', + 'value': 'foo.example.com', + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'IP_NOT_FOUND', + 'message': 'The IP address 1.2.3.4 was not found', + }, + }) + .expect_url('{0}/rdns/1.2.3.4'.format(BASE_URL)), + ]) + assert result['msg'] == 'The IP address was not found' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_server.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_server.py new file mode 100644 index 000000000..b69d5d178 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_server.py @@ -0,0 +1,160 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import server + + +def create_server_data(server_name): + return { + 'server': { + 'cancelled': False, + 'cpanel': False, + 'dc': 'NBG1-DC1', + 'hot_swap': True, + 'ip': [ + '1.2.3.4', + ], + 'linked_storagebox': None, + 'paid_until': '2021-12-31', + 'plesk': False, + 'product': 'EX41', + 'rescue': True, + 'reset': True, + 'server_ip': '1.2.3.4', + 'server_ipv6_net': '2a01:1:2:3::', + 'server_name': server_name, + 'server_number': 23, + 'status': 'ready', + 'subnet': [ + { + 'ip': '2a01:1:2:3::', + 'mask': '64', + }, + ], + 'traffic': 'unlimited', + 'vnc': True, + 'windows': False, + 'wol': True, + }, + } + + +class TestHetznerServer(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.server.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + def test_idempotent_not_specified(self, mocker): + result = self.run_module_success(mocker, server, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'server_number': 23, + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_server_data('foo')) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['server'] == create_server_data('foo')['server'] + + def test_idempotent(self, mocker): + result = self.run_module_success(mocker, server, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'server_name': 'foo', + }, [ + FetchUrlCall('GET', 200) + .result_json(create_server_data('foo')) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['server'] == create_server_data('foo')['server'] + + def test_change_check_mode(self, mocker): + result = self.run_module_success(mocker, server, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'server_name': 'bar', + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(create_server_data('foo')) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['server'] == create_server_data('bar')['server'] + + def test_change(self, mocker): + result = self.run_module_success(mocker, server, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'server_name': 'bar', + }, [ + FetchUrlCall('GET', 200) + .result_json(create_server_data('foo')) + .expect_url('{0}/server/23'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_form_value('server_name', 'bar') + .result_json(create_server_data('bar')) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['server'] == create_server_data('bar')['server'] + + # Errors + + def test_server_not_found(self, mocker): + result = self.run_module_failed(mocker, server, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'Server not found', + }, + }) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'This server does not exist, or you do not have access rights for it' + + def test_invalid_input(self, mocker): + result = self.run_module_failed(mocker, server, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'server_name': 'bar', + }, [ + FetchUrlCall('GET', 200) + .result_json(create_server_data('foo')) + .expect_url('{0}/server/23'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_form_value('server_name', 'bar') + .result_json({ + 'error': { + 'status': 400, + 'code': 'INVALID_INPUT', + 'message': 'Invalid input parameters', + }, + }) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['msg'] == 'The values to update were invalid ({"server_name": "bar"})' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_server_info.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_server_info.py new file mode 100644 index 000000000..9a9c715d0 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_server_info.py @@ -0,0 +1,297 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import server_info + + +SERVER_MINIMUM_DATA = [ + { + 'server': { + 'cancelled': False, + 'dc': 'NBG1-DC1', + 'ip': [ + '1.2.3.4', + ], + 'linked_storagebox': None, + 'paid_until': '2021-12-31', + 'product': 'EX41', + 'server_ip': '1.2.3.4', + 'server_ipv6_net': '2a01:1:2:3::', + 'server_name': 'foo', + 'server_number': 23, + 'status': 'ready', + 'subnet': [ + { + 'ip': '2a01:1:2:3::', + 'mask': '64', + }, + ], + 'traffic': 'unlimited', + }, + }, + { + 'server': { + 'cancelled': True, + 'dc': 'NBG1-DC2', + 'ip': [ + '1.2.3.5', + ], + 'linked_storagebox': 12345, + 'paid_until': '2021-11-30', + 'product': 'EX41', + 'server_ip': '1.2.3.5', + 'server_ipv6_net': '2a01:1:5:3::', + 'server_name': 'bar', + 'server_number': 42, + 'status': 'in process', + 'subnet': [ + { + 'ip': '2a01:1:5:3::', + 'mask': '64', + }, + ], + 'traffic': '1 TB', + }, + }, +] + + +SERVER_DETAIL_DATA = { + 23: { + 'server': { + 'cancelled': False, + 'cpanel': False, + 'dc': 'NBG1-DC1', + 'hot_swap': True, + 'ip': [ + '1.2.3.4', + ], + 'linked_storagebox': None, + 'paid_until': '2021-12-31', + 'plesk': False, + 'product': 'EX41', + 'rescue': True, + 'reset': True, + 'server_ip': '1.2.3.4', + 'server_ipv6_net': '2a01:1:2:3::', + 'server_name': 'foo', + 'server_number': 23, + 'status': 'ready', + 'subnet': [ + { + 'ip': '2a01:1:2:3::', + 'mask': '64', + }, + ], + 'traffic': 'unlimited', + 'vnc': True, + 'windows': False, + 'wol': True, + }, + }, + 42: { + 'server': { + 'cancelled': True, + 'cpanel': False, + 'dc': 'NBG1-DC2', + 'hot_swap': True, + 'ip': [ + '1.2.3.5', + ], + 'linked_storagebox': 12345, + 'paid_until': '2021-11-30', + 'plesk': False, + 'product': 'EX41', + 'rescue': False, + 'reset': False, + 'server_ip': '1.2.3.5', + 'server_ipv6_net': '2a01:1:5:3::', + 'server_name': 'bar', + 'server_number': 42, + 'status': 'in process', + 'subnet': [ + { + 'ip': '2a01:1:5:3::', + 'mask': '64', + }, + ], + 'traffic': '1 TB', + 'vnc': False, + 'windows': True, + 'wol': False, + }, + }, +} + + +class TestHetznerServerInfo(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.server_info.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + def test_server_number(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'server_number': 23, + }, [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(SERVER_DETAIL_DATA[23]) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 1 + assert result['servers'][0] == SERVER_DETAIL_DATA[23]['server'] + + def test_server_number_name_match(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'server_name': 'foo', + }, [ + FetchUrlCall('GET', 200) + .result_json(SERVER_DETAIL_DATA[23]) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 1 + assert result['servers'][0] == SERVER_DETAIL_DATA[23]['server'] + + def test_server_number_name_mismatch(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 23, + 'server_name': 'bar', + }, [ + FetchUrlCall('GET', 200) + .result_json(SERVER_DETAIL_DATA[23]) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 0 + + def test_server_number_unknown(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_number': 1, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'server not found', + }, + }) + .expect_url('{0}/server/1'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 0 + + def test_server_all(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + }, [ + FetchUrlCall('GET', 200) + .result_json(SERVER_MINIMUM_DATA) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 2 + assert result['servers'][0] == SERVER_MINIMUM_DATA[0]['server'] + assert result['servers'][1] == SERVER_MINIMUM_DATA[1]['server'] + + def test_server_name(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_name': 'foo', + }, [ + FetchUrlCall('GET', 200) + .result_json(SERVER_MINIMUM_DATA) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 1 + assert result['servers'][0] == SERVER_MINIMUM_DATA[0]['server'] + + def test_server_name_full_info(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_name': 'foo', + 'full_info': True, + }, [ + FetchUrlCall('GET', 200) + .result_json(SERVER_MINIMUM_DATA) + .expect_url('{0}/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .result_json(SERVER_DETAIL_DATA[23]) + .expect_url('{0}/server/23'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 1 + assert result['servers'][0] == SERVER_DETAIL_DATA[23]['server'] + + def test_server_name_unknown(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_name': 'baz', + }, [ + FetchUrlCall('GET', 200) + .result_json(SERVER_MINIMUM_DATA) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 0 + + def test_server_name_none(self, mocker): + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_name': 'foo', + }, [ + FetchUrlCall('GET', 200) + .result_json([]) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 0 + + def test_server_name_none_error(self, mocker): + # According to the API docs, when no server is found this API can return 404. + # I haven't seen that in RL though... + result = self.run_module_success(mocker, server_info, { + 'hetzner_user': '', + 'hetzner_password': '', + 'server_name': 'foo', + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'server not found', + }, + }) + .expect_url('{0}/server'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert len(result['servers']) == 0 diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_ssh_key.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_ssh_key.py new file mode 100644 index 000000000..670323db5 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_ssh_key.py @@ -0,0 +1,452 @@ +# Copyright (c) 2021 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import pytest + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import ssh_key + + +# Key generated with `ssh-keygen -t rsa -b 4096 -f test`, fingerprint with `ssh-keygen -lf test.pub -E md5`` +PUBLIC_KEY_1 = ( + 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC7g+C+gXspRfsNRFXHSeEuQLrUEb+pSV9OUi3zz0DvdxzaXyP4I1vUErnwll5P5' + '8KFdkWp65haqiGteM53zuGJa251c+J41Y69jLEI0jX4mGj4BskB0Cud23lnVzYTktzjkwGz2tGlRjaSYzYdm9lR3Nf6rlWBP1iz6C' + 'QasBHVLGWUBuJF+DQ16ztHV9EWtifDprVoMHK5EaGW19W5OCW73sPJfvbdDjolTZC6QZ7lKOGcZjdFBM7nnIyfIHYfjnXPZh9eMnY' + '6KWEAKuhQpPO1SB82PrLvBPlYzNewO1BiOQWoJyJfJBr1vRBfhLzY9VAoNr5fDSUxtn3UmZ2OmcNCx+qb8iUrn+E3K3i4sRn5iYVA' + 'dO4pmsjx5SENXlfpj/Mmz6wu3bQGN5k1jYtq+sKxGuIRiX+9sxEQ1KBXIqMfM1zSzitxGQSGUrqEgWpxJKVmDscGnlZBGGTPvPRwX' + 'i3VLeiTH+AkGOnWrlVenKpBh/0IWPI8fN/d7GolWHT53Cyi0HQbb3nKMUlfXWFKukbdSb9mvJ0v1Pv8qlWb6+fDZCBi0hz/fmE+hx' + '/+uwnY9Vk8H5CzTDQOmXKx6Gj3Lff9RSWD/WePW8LyukWz0l18GOGWzv/HqNIVtljdfJMa5v2kckhZAFPxQvZBMUIX0wkRTmGJOcQ' + '+A8ZKOVaScMnXXQ==' +) +FINGERPRINT_1 = 'e4:47:42:71:81:62:bf:06:1c:23:fa:f3:8f:7b:6f:d0' +TYPE_1 = 'RSA' +SIZE_1 = 4096 + + +class TestHetznerSSHKey(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.ssh_key.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + # Absent with fingerprint + + def test_absent_fp(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'state': 'absent', + 'fingerprint': FINGERPRINT_1, + }, [ + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_absent_fp_idempotent(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'fingerprint': FINGERPRINT_1, + }, [ + FetchUrlCall('DELETE', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'Key not found', + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is False + assert result['fingerprint'] == FINGERPRINT_1 + + def test_absent_fp_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'fingerprint': FINGERPRINT_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'key': { + 'name': 'My Test Key', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_absent_fp_idempotent_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'fingerprint': FINGERPRINT_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'Key not found', + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is False + assert result['fingerprint'] == FINGERPRINT_1 + + # Absent with public key + + def test_absent_key(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'public_key': PUBLIC_KEY_1, + }, [ + FetchUrlCall('DELETE', 200) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_absent_key_idempotent(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'public_key': PUBLIC_KEY_1, + }, [ + FetchUrlCall('DELETE', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'Key not found', + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is False + assert result['fingerprint'] == FINGERPRINT_1 + + def test_absent_key_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'public_key': PUBLIC_KEY_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'key': { + 'name': 'My Test Key', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_absent_key_idempotent_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'absent', + 'public_key': PUBLIC_KEY_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'Key not found', + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is False + assert result['fingerprint'] == FINGERPRINT_1 + + # Present + + def test_present_create_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'foo', + 'public_key': PUBLIC_KEY_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'Key not found', + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_present_create(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'foo', + 'public_key': PUBLIC_KEY_1, + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'Key not found', + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + FetchUrlCall('POST', 200) + .expect_form_value('name', 'foo') + .expect_form_value('data', PUBLIC_KEY_1) + .result_json({ + 'key': { + 'name': 'foo', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key'.format(BASE_URL)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_present_idempotent_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'foo', + 'public_key': PUBLIC_KEY_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'key': { + 'name': 'foo', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is False + assert result['fingerprint'] == FINGERPRINT_1 + + def test_present_idempotent(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'foo', + 'public_key': PUBLIC_KEY_1, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'key': { + 'name': 'foo', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is False + assert result['fingerprint'] == FINGERPRINT_1 + + def test_present_change_check_mode(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'bar', + 'public_key': PUBLIC_KEY_1, + '_ansible_check_mode': True, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'key': { + 'name': 'foo', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + def test_present_change(self, mocker): + result = self.run_module_success(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'bar', + 'public_key': PUBLIC_KEY_1, + }, [ + FetchUrlCall('GET', 200) + .result_json({ + 'key': { + 'name': 'foo', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + FetchUrlCall('POST', 200) + .expect_form_value('name', 'bar') + .expect_form_value_absent('data') + .result_json({ + 'key': { + 'name': 'bar', + 'fingerprint': FINGERPRINT_1, + 'type': TYPE_1, + 'size': SIZE_1, + 'data': PUBLIC_KEY_1, + }, + }) + .expect_url('{0}/key/{1}'.format(BASE_URL, FINGERPRINT_1)), + ]) + assert result['changed'] is True + assert result['fingerprint'] == FINGERPRINT_1 + + # Error + + def test_invalid_public_key(self, mocker): + result = self.run_module_failed(mocker, ssh_key, { + 'hetzner_user': '', + 'hetzner_password': '', + 'state': 'present', + 'name': 'bar', + 'public_key': 'asdf', + }, []) + assert result['msg'] == 'Error while extracting fingerprint from public key data: cannot split public key into at least two parts' + + +def test_normalize_fingerprint(): + assert ssh_key.normalize_fingerprint(FINGERPRINT_1) == FINGERPRINT_1 + assert ssh_key.normalize_fingerprint('F5:7e:4f:d8:ab:20:b8:5B:8b:2f:7a:4:47:fd:96:73') == ( + 'f5:7e:4f:d8:ab:20:b8:5b:8b:2f:7a:04:47:fd:96:73' + ) + assert ssh_key.normalize_fingerprint('F57e4fd8ab20b85B8b2f7a0447fd9673') == ( + 'f5:7e:4f:d8:ab:20:b8:5b:8b:2f:7a:04:47:fd:96:73' + ) + assert ssh_key.normalize_fingerprint('Fe:F', size=2) == 'fe:0f' + + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.normalize_fingerprint('') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Fingerprint must consist of 16 8-bit hex numbers: got 0 8-bit hex numbers instead' + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.normalize_fingerprint('1:2:3') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Fingerprint must consist of 16 8-bit hex numbers: got 3 8-bit hex numbers instead' + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.normalize_fingerprint('01023') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Fingerprint must consist of 16 8-bit hex numbers: got 3 8-bit hex numbers instead' + + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.normalize_fingerprint('A:B:C:D:E:F:G:H:I:J:K:L:M:N:O:P') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Fingerprint must consist of 16 8-bit hex numbers: number 7 is invalid: "G"' + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.normalize_fingerprint('fee:B:C:D:E:F:G:H:I:J:K:L:M:N:O:P') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Fingerprint must consist of 16 8-bit hex numbers: number 1 is invalid: "fee"' + + +def test_extract_fingerprint(): + assert ssh_key.extract_fingerprint(PUBLIC_KEY_1) == FINGERPRINT_1 + assert ssh_key.extract_fingerprint(' %s foo@ bar ' % PUBLIC_KEY_1.replace(' ', ' ')) == FINGERPRINT_1 + + key = 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIGGdztn98LzAZkwHzSNa2HpTERPzBZdrdMt9u++0qQ+U' + assert ssh_key.extract_fingerprint(key) == 'f5:7e:4f:d8:ab:20:b8:5b:8b:2f:7a:04:47:fd:96:73' + print(ssh_key.extract_fingerprint(key, alg='sha256', size=32)) + assert ssh_key.extract_fingerprint(key, alg='sha256', size=32) == ( + '64:94:70:47:7a:bd:79:99:95:9f:3b:d3:37:8c:2c:fa:33:a7:d1:93:95:56:1b:f7:f6:52:31:34:0b:4a:fc:67' + ) + + key = ( + 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBDEVarUR' + 'tu+DmCvn0OkHC+gCOQ6Bxkolfh9NvWr4f8SPfQJ/yOUO6RZ+m3RhvnDEWAvA1BG/lCNqui6/kuZiyVk=' + ) + assert ssh_key.extract_fingerprint(key) == 'f4:b7:43:14:fe:8b:43:4b:cc:b3:63:dc:cf:23:bb:cb' + print(ssh_key.extract_fingerprint(key, alg='sha256', size=32)) + assert ssh_key.extract_fingerprint(key, alg='sha256', size=32) == ( + '88:c2:a3:0f:2a:cf:60:73:7c:52:e0:41:40:25:c3:d4:5d:32:37:a9:46:48:3e:37:34:f1:aa:0d:4d:69:15:d7' + ) + + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.extract_fingerprint(' adsf ') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Error while extracting fingerprint from public key data: cannot split public key into at least two parts' + + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.extract_fingerprint('a b') + print(exc.value.args[0]) + assert exc.value.args[0] in ( + 'Error while extracting fingerprint from public key data: Invalid base64-encoded string:' + ' number of data characters (1) cannot be 1 more than a multiple of 4', + 'Error while extracting fingerprint from public key data: Incorrect padding', + ) + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.extract_fingerprint('a ab=f') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Error while extracting fingerprint from public key data: Incorrect padding' + with pytest.raises(ssh_key.FingerprintError) as exc: + ssh_key.extract_fingerprint('a ab==', alg='foo bar') + print(exc.value.args[0]) + assert exc.value.args[0] == 'Hash algorithm FOO BAR is not available. Possibly running in FIPS mode.' diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_ssh_key_info.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_ssh_key_info.py new file mode 100644 index 000000000..bc755502b --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_ssh_key_info.py @@ -0,0 +1,80 @@ +# Copyright (c) 2021 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import ssh_key_info + + +class TestHetznerSSHKeyInfo(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.ssh_key_info.AnsibleModule' + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + + def test_no_keys(self, mocker): + result = self.run_module_success(mocker, ssh_key_info, { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + }, [ + FetchUrlCall('GET', 200) + .result_json([]) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_url('{0}/key'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['ssh_keys'] == [] + + def test_no_keys_404(self, mocker): + result = self.run_module_success(mocker, ssh_key_info, { + 'hetzner_user': '', + 'hetzner_password': '', + }, [ + FetchUrlCall('GET', 404) + .result_json({ + 'error': { + 'status': 404, + 'code': 'NOT_FOUND', + 'message': 'No keys found', + }, + }) + .expect_url('{0}/key'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['ssh_keys'] == [] + + def test_single_key(self, mocker): + result = self.run_module_success(mocker, ssh_key_info, { + 'hetzner_user': '', + 'hetzner_password': '', + }, [ + FetchUrlCall('GET', 200) + .result_json([ + { + 'key': { + 'name': 'key1', + 'fingerprint': '56:29:99:a4:5d:ed:ac:95:c1:f5:88:82:90:5d:dd:10', + 'type': 'ECDSA', + 'size': 521, + 'data': 'ecdsa-sha2-nistp521 AAAAE2VjZHNh ...' + }, + }, + ]) + .expect_url('{0}/key'.format(BASE_URL)), + ]) + assert result['changed'] is False + assert result['ssh_keys'] == [{ + 'name': 'key1', + 'fingerprint': '56:29:99:a4:5d:ed:ac:95:c1:f5:88:82:90:5d:dd:10', + 'type': 'ECDSA', + 'size': 521, + 'data': 'ecdsa-sha2-nistp521 AAAAE2VjZHNh ...' + }] diff --git a/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_v_switch.py b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_v_switch.py new file mode 100644 index 000000000..5cc3c0c4e --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/plugins/modules/test_v_switch.py @@ -0,0 +1,1123 @@ +# Copyright (c) 2019 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +from datetime import datetime + +from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import ( + FetchUrlCall, + BaseTestModule, +) + +from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL +from ansible_collections.community.hrobot.plugins.modules import v_switch + + +# pylint: disable=dangerous-default-value +# we are not mutating this value +def create_v_switch_data(vlan, name, server=[]): + return { + 'id': 4321, + 'name': name, + 'vlan': vlan, + 'cancelled': False, + 'server': server, + 'subnet': [], + 'cloud_network': [], + } + + +def create_v_switches_data(vlan, name): + return [ + { + 'id': 4321, + 'name': name, + 'vlan': vlan, + 'cancelled': False, + } + ] + + +def create_server_data(ip, id_, status='ready'): + return { + 'server_ip': ip, + 'server_ipv6_net': '2a01:4f8:111:4221::', + 'server_number': id_, + 'status': status, + } + + +class TestHetznerVSwitch(BaseTestModule): + MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = ( + 'ansible_collections.community.hrobot.plugins.modules.v_switch.AnsibleModule' + ) + MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = ( + 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' + ) + + def test_idempotent(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + assert result['changed'] is False + assert result['v_switch'] == create_v_switch_data(4010, 'foo') + + def test_create(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json([]) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + ], + ) + assert result['changed'] is True + assert result['v_switch'] == create_v_switch_data(4010, 'foo') + + def test_v_switch_different_name(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'not_matching_name', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'not_matching_name')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + ], + ) + assert result['changed'] is True + assert result['v_switch'] == create_v_switch_data(4010, 'not_matching_name') + + def test_v_switch_unauthorized_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': '', + 'hetzner_password': '', + 'vlan': 4010, + 'name': 'foo', + }, + [ + FetchUrlCall('GET', 401) + .result_json( + { + 'error': { + 'status': 401, + 'code': 'UNAUTHORIZED', + 'message': 'Unauthorized', + }, + } + ) + .expect_url('{0}/vswitch'.format(BASE_URL)), + ], + ) + assert result['msg'] == 'Please check your current user and password configuration' + + def test_v_switch_limit_reached_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4000, + 'name': 'new vswitch', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('POST', 409) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + { + 'error': { + 'status': 409, + 'code': 'VSWITCH_LIMIT_REACHED', + 'message': 'The maximum count of vSwitches is reached', + }, + } + ) + .expect_url('{0}/vswitch'.format(BASE_URL)), + ], + ) + assert result['msg'] == 'The maximum count of vSwitches is reached' + + def test_v_switch_invalid_input_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 40100, + 'name': 'foo', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('POST', 400) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + { + 'error': { + 'status': 400, + 'code': 'INVALID_INPUT', + 'message': 'invalid input', + 'missing': None, + 'invalid': ['vlan'], + }, + } + ) + .expect_url('{0}/vswitch'.format(BASE_URL)), + ], + ) + assert result['msg'] == "vSwitch invalid parameter (['vlan'])" + + def test_delete(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'state': 'absent', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('cancellation_date', datetime.now().strftime('%y-%m-%d')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['changed'] is True + + def test_create_with_server(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json([]) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('POST', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321)], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['v_switch'] == create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + assert result['changed'] is True + + def test_is_all_servers_ready(self): + result = v_switch.is_all_servers_ready( + create_v_switch_data( + 4010, + 'foo', + server=[], + ), + None, + ) + assert result is True + + result = v_switch.is_all_servers_ready( + create_v_switch_data( + 4010, + 'foo', + server=[ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ], + ), + None, + ) + assert result is True + + result = v_switch.is_all_servers_ready( + create_v_switch_data( + 4010, + 'foo', + server=[ + create_server_data('123.123.123.123', 321, status='in process'), + create_server_data('123.123.123.124', 322), + ], + ), + None, + ) + assert result is False + + def test_get_servers_to_delete(self): + current_servers = [ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ] + desired_servers = ['321'] + result = v_switch.get_servers_to_delete(current_servers, desired_servers) + assert result == ['123.123.123.124'] + + current_servers = [ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ] + desired_servers = [] + result = v_switch.get_servers_to_delete(current_servers, desired_servers) + assert result == ['123.123.123.123', '123.123.123.124'] + + current_servers = [ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ] + desired_servers = ['123.123.123.123'] + result = v_switch.get_servers_to_delete(current_servers, desired_servers) + assert result == ['123.123.123.124'] + + current_servers = [ + create_server_data('check_default_ipv6', 321), + ] + desired_servers = ['2a01:4f8:111:4221::'] + result = v_switch.get_servers_to_delete(current_servers, desired_servers) + assert result == [] + + current_servers = [] + desired_servers = ['123.123.123.123'] + result = v_switch.get_servers_to_delete(current_servers, desired_servers) + assert result == [] + + def test_get_servers_to_add(self): + current_servers = [ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ] + desired_servers = ['321'] + result = v_switch.get_servers_to_add(current_servers, desired_servers) + assert result == [] + + current_servers = [ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ] + desired_servers = [] + result = v_switch.get_servers_to_add(current_servers, desired_servers) + assert result == [] + + current_servers = [ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ] + desired_servers = ['123.123.123.123'] + result = v_switch.get_servers_to_add(current_servers, desired_servers) + assert result == [] + + current_servers = [ + create_server_data('check_default_ipv6', 321), + ] + desired_servers = ['2a01:4f8:111:4221::'] + result = v_switch.get_servers_to_add(current_servers, desired_servers) + assert result == [] + + current_servers = [] + desired_servers = ['123.123.123.123'] + result = v_switch.get_servers_to_add(current_servers, desired_servers) + assert result == ['123.123.123.123'] + + current_servers = [create_server_data('123.123.123.123', 321)] + desired_servers = ['321', '322'] + result = v_switch.get_servers_to_add(current_servers, desired_servers) + assert result == ['322'] + + def test_add_server(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321)], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['v_switch'] == create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + assert result['changed'] is True + + def test_add_server_no_wait(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + 'wait': False, + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321, status='in process')], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['v_switch'] == create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321, status='in process')] + ) + assert result['changed'] is True + + def test_add_multiple_servers(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123', '123.123.123.124'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[0]', '123.123.123.123') + .expect_form_value('server[1]', '123.123.123.124') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['v_switch'] == create_v_switch_data( + 4010, + 'foo', + server=[ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ], + ) + assert result['changed'] is True + + def test_add_server_timeout_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + 'timeout': 0, + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321, status='in process')], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321, status='in process')], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['msg'] == "Timeout waiting vSwitch operation to finish" + + def test_add_server_idempotent(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + assert result['v_switch'] == create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + assert result['changed'] is False + + def test_add_server_server_not_found_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .result_json( + { + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'server "123.123.123.123" not found', + }, + } + ) + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + ], + ) + assert result['msg'] == 'server "123.123.123.123" not found' + + def test_add_server_vlan_not_unique_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .result_json( + { + 'error': { + 'status': 409, + 'code': 'VSWITCH_VLAN_NOT_UNIQUE', + 'message': 'vlan of vswitch is already in use at server EX62-NVMe (123.123.123.123) example.com, please change vlan', + }, + } + ) + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + ], + ) + assert ( + result['msg'] + == "vlan of vswitch is already in use at server EX62-NVMe (123.123.123.123) example.com, please change vlan" + ) + + def test_add_server_vswitch_in_process_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .result_json( + { + 'error': { + 'status': 409, + 'code': 'VSWITCH_IN_PROCESS', + 'message': 'There is a update running, therefore the vswitch can not be updated', + }, + } + ) + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + ], + ) + assert ( + result['msg'] == "There is a update running, therefore the vswitch can not be updated" + ) + + def test_add_server_server_limit_reached_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['123.123.123.123'], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switch_data(4010, 'foo')) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('POST', 201) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .result_json( + { + 'error': { + 'status': 409, + 'code': 'VSWITCH_SERVER_LIMIT_REACHED', + 'message': 'The maximum number of servers is reached for this vSwitch', + }, + } + ) + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + ], + ) + assert result['msg'] == "The maximum number of servers is reached for this vSwitch" + + def test_not_delete_if_servers_not_defined(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + assert result['v_switch'] == create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + assert result['changed'] is False + + def test_delete_server(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': [], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['v_switch'] == create_v_switch_data(4010, 'foo') + assert result['changed'] is True + + def test_delete_server_wait(self, mocker): + result = self.run_module_success( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': ['321'], + 'timeout': 0, + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322), + ], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.124') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[ + create_server_data('123.123.123.123', 321), + create_server_data('123.123.123.124', 322, status='in process'), + ], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + + assert result['v_switch'] == create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + assert result['changed'] is True + + def test_delete_server_timeout_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': [], + 'timeout': 0, + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321, status='in process')], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, + 'foo', + server=[create_server_data('123.123.123.123', 321, status='in process')], + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + ], + ) + assert result['msg'] == "Timeout waiting vSwitch operation to finish" + + def test_delete_server_server_not_found(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': [], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .result_json( + { + 'error': { + 'status': 404, + 'code': 'SERVER_NOT_FOUND', + 'message': 'server "123.123.123.123" not found', + }, + } + ) + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + ], + ) + assert result['msg'] == 'server "123.123.123.123" not found' + + def test_delete_server_in_process_error(self, mocker): + result = self.run_module_failed( + mocker, + v_switch, + { + 'hetzner_user': 'test', + 'hetzner_password': 'hunter2', + 'vlan': 4010, + 'name': 'foo', + 'servers': [], + }, + [ + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json(create_v_switches_data(4010, 'foo')) + .expect_url('{0}/vswitch'.format(BASE_URL)), + FetchUrlCall('GET', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .result_json( + create_v_switch_data( + 4010, 'foo', server=[create_server_data('123.123.123.123', 321)] + ) + ) + .expect_url('{0}/vswitch/4321'.format(BASE_URL)), + FetchUrlCall('DELETE', 200) + .expect_basic_auth('test', 'hunter2') + .expect_force_basic_auth(True) + .expect_form_value('server[]', '123.123.123.123') + .result_json( + { + 'error': { + 'status': 409, + 'code': 'VSWITCH_IN_PROCESS', + 'message': 'There is a update running, therefore the vswitch can not be updated', + }, + } + ) + .expect_url('{0}/vswitch/4321/server'.format(BASE_URL)), + ], + ) + assert ( + result['msg'] == "There is a update running, therefore the vswitch can not be updated" + ) diff --git a/ansible_collections/community/hrobot/tests/unit/requirements.txt b/ansible_collections/community/hrobot/tests/unit/requirements.txt new file mode 100644 index 000000000..34f5b5023 --- /dev/null +++ b/ansible_collections/community/hrobot/tests/unit/requirements.txt @@ -0,0 +1,9 @@ +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +unittest2 ; python_version < '2.7' +importlib ; python_version < '2.7' + +# firewall module +ipaddress ; python_version < '3.3' |