summaryrefslogtreecommitdiffstats
path: root/ansible_collections/hetzner/hcloud/plugins/modules/firewall.py
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/hetzner/hcloud/plugins/modules/firewall.py')
-rw-r--r--ansible_collections/hetzner/hcloud/plugins/modules/firewall.py438
1 files changed, 438 insertions, 0 deletions
diff --git a/ansible_collections/hetzner/hcloud/plugins/modules/firewall.py b/ansible_collections/hetzner/hcloud/plugins/modules/firewall.py
new file mode 100644
index 000000000..3c51b5c0a
--- /dev/null
+++ b/ansible_collections/hetzner/hcloud/plugins/modules/firewall.py
@@ -0,0 +1,438 @@
+#!/usr/bin/python
+
+# Copyright: (c) 2020, Hetzner Cloud GmbH <info@hetzner-cloud.de>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+
+from __future__ import annotations
+
+DOCUMENTATION = """
+---
+module: firewall
+short_description: Create and manage firewalls on the Hetzner Cloud.
+
+description:
+ - Create, update and manage firewalls on the Hetzner Cloud.
+
+author:
+ - Lukas Kaemmerling (@lkaemmerling)
+
+options:
+ id:
+ description:
+ - The ID of the Hetzner Cloud Firewall to manage.
+ - Only required if no firewall O(name) is given.
+ type: int
+ name:
+ description:
+ - The Name of the Hetzner Cloud Firewall to manage.
+ - Only required if no firewall O(id) is given, or the firewall does not exist.
+ type: str
+ labels:
+ description:
+ - User-defined labels (key-value pairs).
+ type: dict
+ rules:
+ description:
+ - List of rules the firewall contain.
+ type: list
+ elements: dict
+ suboptions:
+ description:
+ description:
+ - User defined description of this rule.
+ type: str
+ direction:
+ description:
+ - The direction of the firewall rule.
+ type: str
+ choices: [in, out]
+ protocol:
+ description:
+ - The protocol of the firewall rule.
+ type: str
+ choices: [icmp, tcp, udp, esp, gre]
+ port:
+ description:
+ - The port or port range allowed by this rule.
+ - A port range can be specified by separating two ports with a dash, e.g 1024-5000.
+ - Only used if O(rules[].protocol=tcp) or O(rules[].protocol=udp).
+ type: str
+ source_ips:
+ description:
+ - List of CIDRs that are allowed within this rule.
+ - Use 0.0.0.0/0 to allow all IPv4 addresses and ::/0 to allow all IPv6 addresses.
+ - Only used if O(rules[].direction=in).
+ type: list
+ elements: str
+ default: []
+ destination_ips:
+ description:
+ - List of CIDRs that are allowed within this rule.
+ - Use 0.0.0.0/0 to allow all IPv4 addresses and ::/0 to allow all IPv6 addresses.
+ - Only used if O(rules[].direction=out).
+ type: list
+ elements: str
+ default: []
+ force:
+ description:
+ - Force the deletion of the Firewall when still in use.
+ type: bool
+ default: false
+ state:
+ description:
+ - State of the firewall.
+ default: present
+ choices: [absent, present]
+ type: str
+
+extends_documentation_fragment:
+ - hetzner.hcloud.hcloud
+"""
+
+EXAMPLES = """
+- name: Create a basic firewall
+ hetzner.hcloud.firewall:
+ name: my-firewall
+ state: present
+
+- name: Create a firewall with rules
+ hetzner.hcloud.firewall:
+ name: my-firewall
+ rules:
+ - description: allow icmp from everywhere
+ direction: in
+ protocol: icmp
+ source_ips:
+ - 0.0.0.0/0
+ - ::/0
+ state: present
+
+- name: Create a firewall with labels
+ hetzner.hcloud.firewall:
+ name: my-firewall
+ labels:
+ key: value
+ mylabel: 123
+ state: present
+
+- name: Ensure the firewall is absent (remove if needed)
+ hetzner.hcloud.firewall:
+ name: my-firewall
+ state: absent
+"""
+
+RETURN = """
+hcloud_firewall:
+ description: The firewall instance.
+ returned: always
+ type: dict
+ contains:
+ id:
+ description: Numeric identifier of the firewall.
+ returned: always
+ type: int
+ sample: 1937415
+ name:
+ description: Name of the firewall.
+ returned: always
+ type: str
+ sample: my-firewall
+ labels:
+ description: User-defined labels (key-value pairs).
+ returned: always
+ type: dict
+ rules:
+ description: List of rules the firewall contain.
+ returned: always
+ type: list
+ elements: dict
+ contains:
+ description:
+ description: User defined description of this rule.
+ type: str
+ returned: always
+ sample: allow http from anywhere
+ direction:
+ description: The direction of the firewall rule.
+ type: str
+ returned: always
+ sample: in
+ protocol:
+ description: The protocol of the firewall rule.
+ type: str
+ returned: always
+ sample: tcp
+ port:
+ description: The port or port range allowed by this rule.
+ type: str
+ returned: if RV(hcloud_firewall.rules[].protocol=tcp) or RV(hcloud_firewall.rules[].protocol=udp)
+ sample: "80"
+ source_ips:
+ description: List of source CIDRs that are allowed within this rule.
+ type: list
+ elements: str
+ returned: always
+ sample: ["0.0.0.0/0", "::/0"]
+ destination_ips:
+ description: List of destination CIDRs that are allowed within this rule.
+ type: list
+ elements: str
+ returned: always
+ sample: []
+ applied_to:
+ description: List of Resources the Firewall is applied to.
+ returned: always
+ type: list
+ elements: dict
+ contains:
+ type:
+ description: Type of the resource.
+ type: str
+ choices: [server, label_selector]
+ sample: label_selector
+ server:
+ description: ID of the server.
+ type: int
+ sample: 12345
+ label_selector:
+ description: Label selector value.
+ type: str
+ sample: env=prod
+ applied_to_resources:
+ description: List of Resources the Firewall label selector is applied to.
+ returned: if RV(hcloud_firewall.applied_to[].type=label_selector)
+ type: list
+ elements: dict
+ contains:
+ type:
+ description: Type of resource referenced.
+ type: str
+ choices: [server]
+ sample: server
+ server:
+ description: ID of the Server.
+ type: int
+ sample: 12345
+"""
+
+import time
+
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.common.text.converters import to_native
+
+from ..module_utils.hcloud import AnsibleHCloud
+from ..module_utils.vendor.hcloud import APIException, HCloudException
+from ..module_utils.vendor.hcloud.firewalls import (
+ BoundFirewall,
+ FirewallResource,
+ FirewallRule,
+)
+
+
+class AnsibleHCloudFirewall(AnsibleHCloud):
+ represent = "hcloud_firewall"
+
+ hcloud_firewall: BoundFirewall | None = None
+
+ def _prepare_result(self):
+ return {
+ "id": to_native(self.hcloud_firewall.id),
+ "name": to_native(self.hcloud_firewall.name),
+ "rules": [self._prepare_result_rule(rule) for rule in self.hcloud_firewall.rules],
+ "labels": self.hcloud_firewall.labels,
+ "applied_to": [self._prepare_result_applied_to(resource) for resource in self.hcloud_firewall.applied_to],
+ }
+
+ def _prepare_result_rule(self, rule: FirewallRule):
+ return {
+ "direction": to_native(rule.direction),
+ "protocol": to_native(rule.protocol),
+ "port": to_native(rule.port) if rule.port is not None else None,
+ "source_ips": [to_native(cidr) for cidr in rule.source_ips],
+ "destination_ips": [to_native(cidr) for cidr in rule.destination_ips],
+ "description": to_native(rule.description) if rule.description is not None else None,
+ }
+
+ def _prepare_result_applied_to(self, resource: FirewallResource):
+ result = {
+ "type": to_native(resource.type),
+ "server": to_native(resource.server.id) if resource.server is not None else None,
+ "label_selector": (
+ to_native(resource.label_selector.selector) if resource.label_selector is not None else None
+ ),
+ }
+ if resource.applied_to_resources is not None:
+ result["applied_to_resources"] = [
+ {
+ "type": to_native(item.type),
+ "server": to_native(item.server.id) if item.server is not None else None,
+ }
+ for item in resource.applied_to_resources
+ ]
+ return result
+
+ def _get_firewall(self):
+ try:
+ if self.module.params.get("id") is not None:
+ self.hcloud_firewall = self.client.firewalls.get_by_id(self.module.params.get("id"))
+ elif self.module.params.get("name") is not None:
+ self.hcloud_firewall = self.client.firewalls.get_by_name(self.module.params.get("name"))
+
+ except HCloudException as exception:
+ self.fail_json_hcloud(exception)
+
+ def _create_firewall(self):
+ self.module.fail_on_missing_params(required_params=["name"])
+ params = {
+ "name": self.module.params.get("name"),
+ "labels": self.module.params.get("labels"),
+ }
+ rules = self.module.params.get("rules")
+ if rules is not None:
+ params["rules"] = [
+ FirewallRule(
+ direction=rule["direction"],
+ protocol=rule["protocol"],
+ source_ips=rule["source_ips"] if rule["source_ips"] is not None else [],
+ destination_ips=rule["destination_ips"] if rule["destination_ips"] is not None else [],
+ port=rule["port"],
+ description=rule["description"],
+ )
+ for rule in rules
+ ]
+
+ if not self.module.check_mode:
+ try:
+ self.client.firewalls.create(**params)
+ except HCloudException as exception:
+ self.fail_json_hcloud(exception, params=params)
+
+ self._mark_as_changed()
+ self._get_firewall()
+
+ def _update_firewall(self):
+ name = self.module.params.get("name")
+ if name is not None and self.hcloud_firewall.name != name:
+ self.module.fail_on_missing_params(required_params=["id"])
+ if not self.module.check_mode:
+ self.hcloud_firewall.update(name=name)
+ self._mark_as_changed()
+
+ labels = self.module.params.get("labels")
+ if labels is not None and self.hcloud_firewall.labels != labels:
+ if not self.module.check_mode:
+ self.hcloud_firewall.update(labels=labels)
+ self._mark_as_changed()
+
+ rules = self.module.params.get("rules")
+ if rules is not None and rules != [self._prepare_result_rule(rule) for rule in self.hcloud_firewall.rules]:
+ if not self.module.check_mode:
+ new_rules = [
+ FirewallRule(
+ direction=rule["direction"],
+ protocol=rule["protocol"],
+ source_ips=rule["source_ips"] if rule["source_ips"] is not None else [],
+ destination_ips=rule["destination_ips"] if rule["destination_ips"] is not None else [],
+ port=rule["port"],
+ description=rule["description"],
+ )
+ for rule in rules
+ ]
+ self.hcloud_firewall.set_rules(new_rules)
+ self._mark_as_changed()
+
+ self._get_firewall()
+
+ def present_firewall(self):
+ self._get_firewall()
+ if self.hcloud_firewall is None:
+ self._create_firewall()
+ else:
+ self._update_firewall()
+
+ def delete_firewall(self):
+ self._get_firewall()
+ if self.hcloud_firewall is not None:
+ if not self.module.check_mode:
+ if self.hcloud_firewall.applied_to:
+ if self.module.params.get("force"):
+ actions = self.hcloud_firewall.remove_from_resources(self.hcloud_firewall.applied_to)
+ for action in actions:
+ action.wait_until_finished()
+ else:
+ self.module.warn(
+ f"Firewall {self.hcloud_firewall.name} is currently used by "
+ "other resources. You need to unassign the resources before "
+ "deleting the Firewall or use force=true."
+ )
+
+ retry_count = 0
+ while True:
+ try:
+ self.hcloud_firewall.delete()
+ break
+ except APIException as exception:
+ if "is still in use" in exception.message and retry_count < 10:
+ retry_count += 1
+ time.sleep(0.5 * retry_count)
+ continue
+ self.fail_json_hcloud(exception)
+ except HCloudException as exception:
+ self.fail_json_hcloud(exception)
+
+ self._mark_as_changed()
+ self.hcloud_firewall = None
+
+ @classmethod
+ def define_module(cls):
+ return AnsibleModule(
+ argument_spec=dict(
+ id={"type": "int"},
+ name={"type": "str"},
+ labels={"type": "dict"},
+ rules=dict(
+ type="list",
+ elements="dict",
+ options=dict(
+ description={"type": "str"},
+ direction={"type": "str", "choices": ["in", "out"]},
+ protocol={"type": "str", "choices": ["icmp", "udp", "tcp", "esp", "gre"]},
+ port={"type": "str"},
+ source_ips={"type": "list", "elements": "str", "default": []},
+ destination_ips={"type": "list", "elements": "str", "default": []},
+ ),
+ required_together=[["direction", "protocol"]],
+ required_if=[
+ ["protocol", "udp", ["port"]],
+ ["protocol", "tcp", ["port"]],
+ ],
+ ),
+ force={"type": "bool", "default": False},
+ state={
+ "choices": ["absent", "present"],
+ "default": "present",
+ },
+ **super().base_module_arguments(),
+ ),
+ required_one_of=[["id", "name"]],
+ required_if=[["state", "present", ["name"]]],
+ supports_check_mode=True,
+ )
+
+
+def main():
+ module = AnsibleHCloudFirewall.define_module()
+
+ hcloud = AnsibleHCloudFirewall(module)
+ state = module.params.get("state")
+ if state == "absent":
+ hcloud.delete_firewall()
+ elif state == "present":
+ hcloud.present_firewall()
+
+ module.exit_json(**hcloud.get_result())
+
+
+if __name__ == "__main__":
+ main()