summaryrefslogtreecommitdiffstats
path: root/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py')
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py1186
1 files changed, 1186 insertions, 0 deletions
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py
new file mode 100644
index 0000000..6ae47a7
--- /dev/null
+++ b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py
@@ -0,0 +1,1186 @@
+# (c) 2014, Maciej Delmanowski <drybjed@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+from functools import partial
+import types
+
+try:
+ import netaddr
+except ImportError:
+ # in this case, we'll make the filters return error messages (see bottom)
+ netaddr = None
+else:
+
+ class mac_linux(netaddr.mac_unix):
+ pass
+
+ mac_linux.word_fmt = "%.2x"
+
+from ansible import errors
+
+
+# ---- IP address and network query helpers ----
+def _empty_ipaddr_query(v, vtype):
+ # We don't have any query to process, so just check what type the user
+ # expects, and return the IP address in a correct format
+ if v:
+ if vtype == "address":
+ return str(v.ip)
+ elif vtype == "network":
+ return str(v)
+
+
+def _first_last(v):
+ if v.size == 2:
+ first_usable = int(netaddr.IPAddress(v.first))
+ last_usable = int(netaddr.IPAddress(v.last))
+ return first_usable, last_usable
+ elif v.size > 1:
+ first_usable = int(netaddr.IPAddress(v.first + 1))
+ last_usable = int(netaddr.IPAddress(v.last - 1))
+ return first_usable, last_usable
+
+
+def _6to4_query(v, vtype, value):
+ if v.version == 4:
+
+ if v.size == 1:
+ ipconv = str(v.ip)
+ elif v.size > 1:
+ if v.ip != v.network:
+ ipconv = str(v.ip)
+ else:
+ ipconv = False
+
+ if ipaddr(ipconv, "public"):
+ numbers = list(map(int, ipconv.split(".")))
+
+ try:
+ return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers)
+ except Exception:
+ return False
+
+ elif v.version == 6:
+ if vtype == "address":
+ if ipaddr(str(v), "2002::/16"):
+ return value
+ elif vtype == "network":
+ if v.ip != v.network:
+ if ipaddr(str(v.ip), "2002::/16"):
+ return value
+ else:
+ return False
+
+
+def _ip_query(v):
+ if v.size == 1:
+ return str(v.ip)
+ if v.size > 1:
+ # /31 networks in netaddr have no broadcast address
+ if v.ip != v.network or not v.broadcast:
+ return str(v.ip)
+
+
+def _gateway_query(v):
+ if v.size > 1:
+ if v.ip != v.network:
+ return str(v.ip) + "/" + str(v.prefixlen)
+
+
+def _address_prefix_query(v):
+ if v.size > 1:
+ if v.ip != v.network:
+ return str(v.ip) + "/" + str(v.prefixlen)
+
+
+def _bool_ipaddr_query(v):
+ if v:
+ return True
+
+
+def _broadcast_query(v):
+ if v.size > 2:
+ return str(v.broadcast)
+
+
+def _cidr_query(v):
+ return str(v)
+
+
+def _cidr_lookup_query(v, iplist, value):
+ try:
+ if v in iplist:
+ return value
+ except Exception:
+ return False
+
+
+def _first_usable_query(v, vtype):
+ if vtype == "address":
+ "Does it make sense to raise an error"
+ raise errors.AnsibleFilterError("Not a network address")
+ elif vtype == "network":
+ if v.size == 2:
+ return str(netaddr.IPAddress(int(v.network)))
+ elif v.size > 1:
+ return str(netaddr.IPAddress(int(v.network) + 1))
+
+
+def _host_query(v):
+ if v.size == 1:
+ return str(v)
+ elif v.size > 1:
+ if v.ip != v.network:
+ return str(v.ip) + "/" + str(v.prefixlen)
+
+
+def _hostmask_query(v):
+ return str(v.hostmask)
+
+
+def _int_query(v, vtype):
+ if vtype == "address":
+ return int(v.ip)
+ elif vtype == "network":
+ return str(int(v.ip)) + "/" + str(int(v.prefixlen))
+
+
+def _ip_prefix_query(v):
+ if v.size == 2:
+ return str(v.ip) + "/" + str(v.prefixlen)
+ elif v.size > 1:
+ if v.ip != v.network:
+ return str(v.ip) + "/" + str(v.prefixlen)
+
+
+def _ip_netmask_query(v):
+ if v.size == 2:
+ return str(v.ip) + " " + str(v.netmask)
+ elif v.size > 1:
+ if v.ip != v.network:
+ return str(v.ip) + " " + str(v.netmask)
+
+
+"""
+def _ip_wildcard_query(v):
+ if v.size == 2:
+ return str(v.ip) + ' ' + str(v.hostmask)
+ elif v.size > 1:
+ if v.ip != v.network:
+ return str(v.ip) + ' ' + str(v.hostmask)
+"""
+
+
+def _ipv4_query(v, value):
+ if v.version == 6:
+ try:
+ return str(v.ipv4())
+ except Exception:
+ return False
+ else:
+ return value
+
+
+def _ipv6_query(v, value):
+ if v.version == 4:
+ return str(v.ipv6())
+ else:
+ return value
+
+
+def _last_usable_query(v, vtype):
+ if vtype == "address":
+ "Does it make sense to raise an error"
+ raise errors.AnsibleFilterError("Not a network address")
+ elif vtype == "network":
+ if v.size > 1:
+ first_usable, last_usable = _first_last(v)
+ return str(netaddr.IPAddress(last_usable))
+
+
+def _link_local_query(v, value):
+ v_ip = netaddr.IPAddress(str(v.ip))
+ if v.version == 4:
+ if ipaddr(str(v_ip), "169.254.0.0/24"):
+ return value
+
+ elif v.version == 6:
+ if ipaddr(str(v_ip), "fe80::/10"):
+ return value
+
+
+def _loopback_query(v, value):
+ v_ip = netaddr.IPAddress(str(v.ip))
+ if v_ip.is_loopback():
+ return value
+
+
+def _multicast_query(v, value):
+ if v.is_multicast():
+ return value
+
+
+def _net_query(v):
+ if v.size > 1:
+ if v.ip == v.network:
+ return str(v.network) + "/" + str(v.prefixlen)
+
+
+def _netmask_query(v):
+ return str(v.netmask)
+
+
+def _network_query(v):
+ """Return the network of a given IP or subnet"""
+ return str(v.network)
+
+
+def _network_id_query(v):
+ """Return the network of a given IP or subnet"""
+ return str(v.network)
+
+
+def _network_netmask_query(v):
+ return str(v.network) + " " + str(v.netmask)
+
+
+def _network_wildcard_query(v):
+ return str(v.network) + " " + str(v.hostmask)
+
+
+def _next_usable_query(v, vtype):
+ if vtype == "address":
+ "Does it make sense to raise an error"
+ raise errors.AnsibleFilterError("Not a network address")
+ elif vtype == "network":
+ if v.size > 1:
+ first_usable, last_usable = _first_last(v)
+ next_ip = int(netaddr.IPAddress(int(v.ip) + 1))
+ if next_ip >= first_usable and next_ip <= last_usable:
+ return str(netaddr.IPAddress(int(v.ip) + 1))
+
+
+def _peer_query(v, vtype):
+ if vtype == "address":
+ raise errors.AnsibleFilterError("Not a network address")
+ elif vtype == "network":
+ if v.size == 2:
+ return str(netaddr.IPAddress(int(v.ip) ^ 1))
+ if v.size == 4:
+ if int(v.ip) % 4 == 0:
+ raise errors.AnsibleFilterError(
+ "Network address of /30 has no peer"
+ )
+ if int(v.ip) % 4 == 3:
+ raise errors.AnsibleFilterError(
+ "Broadcast address of /30 has no peer"
+ )
+ return str(netaddr.IPAddress(int(v.ip) ^ 3))
+ raise errors.AnsibleFilterError("Not a point-to-point network")
+
+
+def _prefix_query(v):
+ return int(v.prefixlen)
+
+
+def _previous_usable_query(v, vtype):
+ if vtype == "address":
+ "Does it make sense to raise an error"
+ raise errors.AnsibleFilterError("Not a network address")
+ elif vtype == "network":
+ if v.size > 1:
+ first_usable, last_usable = _first_last(v)
+ previous_ip = int(netaddr.IPAddress(int(v.ip) - 1))
+ if previous_ip >= first_usable and previous_ip <= last_usable:
+ return str(netaddr.IPAddress(int(v.ip) - 1))
+
+
+def _private_query(v, value):
+ if v.is_private():
+ return value
+
+
+def _public_query(v, value):
+ v_ip = netaddr.IPAddress(str(v.ip))
+ if (
+ v_ip.is_unicast()
+ and not v_ip.is_private()
+ and not v_ip.is_loopback()
+ and not v_ip.is_netmask()
+ and not v_ip.is_hostmask()
+ ):
+ return value
+
+
+def _range_usable_query(v, vtype):
+ if vtype == "address":
+ "Does it make sense to raise an error"
+ raise errors.AnsibleFilterError("Not a network address")
+ elif vtype == "network":
+ if v.size > 1:
+ first_usable, last_usable = _first_last(v)
+ first_usable = str(netaddr.IPAddress(first_usable))
+ last_usable = str(netaddr.IPAddress(last_usable))
+ return "{0}-{1}".format(first_usable, last_usable)
+
+
+def _revdns_query(v):
+ v_ip = netaddr.IPAddress(str(v.ip))
+ return v_ip.reverse_dns
+
+
+def _size_query(v):
+ return v.size
+
+
+def _size_usable_query(v):
+ if v.size == 1:
+ return 0
+ elif v.size == 2:
+ return 2
+ return v.size - 2
+
+
+def _subnet_query(v):
+ return str(v.cidr)
+
+
+def _type_query(v):
+ if v.size == 1:
+ return "address"
+ if v.size > 1:
+ if v.ip != v.network:
+ return "address"
+ else:
+ return "network"
+
+
+def _unicast_query(v, value):
+ if v.is_unicast():
+ return value
+
+
+def _version_query(v):
+ return v.version
+
+
+def _wrap_query(v, vtype, value):
+ if v.version == 6:
+ if vtype == "address":
+ return "[" + str(v.ip) + "]"
+ elif vtype == "network":
+ return "[" + str(v.ip) + "]/" + str(v.prefixlen)
+ else:
+ return value
+
+
+# ---- HWaddr query helpers ----
+def _bare_query(v):
+ v.dialect = netaddr.mac_bare
+ return str(v)
+
+
+def _bool_hwaddr_query(v):
+ if v:
+ return True
+
+
+def _int_hwaddr_query(v):
+ return int(v)
+
+
+def _cisco_query(v):
+ v.dialect = netaddr.mac_cisco
+ return str(v)
+
+
+def _empty_hwaddr_query(v, value):
+ if v:
+ return value
+
+
+def _linux_query(v):
+ v.dialect = mac_linux
+ return str(v)
+
+
+def _postgresql_query(v):
+ v.dialect = netaddr.mac_pgsql
+ return str(v)
+
+
+def _unix_query(v):
+ v.dialect = netaddr.mac_unix
+ return str(v)
+
+
+def _win_query(v):
+ v.dialect = netaddr.mac_eui48
+ return str(v)
+
+
+# ---- IP address and network filters ----
+
+# Returns a minified list of subnets or a single subnet that spans all of
+# the inputs.
+def cidr_merge(value, action="merge"):
+ if not hasattr(value, "__iter__"):
+ raise errors.AnsibleFilterError(
+ "cidr_merge: expected iterable, got " + repr(value)
+ )
+
+ if action == "merge":
+ try:
+ return [str(ip) for ip in netaddr.cidr_merge(value)]
+ except Exception as e:
+ raise errors.AnsibleFilterError(
+ "cidr_merge: error in netaddr:\n%s" % e
+ )
+
+ elif action == "span":
+ # spanning_cidr needs at least two values
+ if len(value) == 0:
+ return None
+ elif len(value) == 1:
+ try:
+ return str(netaddr.IPNetwork(value[0]))
+ except Exception as e:
+ raise errors.AnsibleFilterError(
+ "cidr_merge: error in netaddr:\n%s" % e
+ )
+ else:
+ try:
+ return str(netaddr.spanning_cidr(value))
+ except Exception as e:
+ raise errors.AnsibleFilterError(
+ "cidr_merge: error in netaddr:\n%s" % e
+ )
+
+ else:
+ raise errors.AnsibleFilterError(
+ "cidr_merge: invalid action '%s'" % action
+ )
+
+
+def ipaddr(value, query="", version=False, alias="ipaddr"):
+ """ Check if string is an IP address or network and filter it """
+
+ query_func_extra_args = {
+ "": ("vtype",),
+ "6to4": ("vtype", "value"),
+ "cidr_lookup": ("iplist", "value"),
+ "first_usable": ("vtype",),
+ "int": ("vtype",),
+ "ipv4": ("value",),
+ "ipv6": ("value",),
+ "last_usable": ("vtype",),
+ "link-local": ("value",),
+ "loopback": ("value",),
+ "lo": ("value",),
+ "multicast": ("value",),
+ "next_usable": ("vtype",),
+ "peer": ("vtype",),
+ "previous_usable": ("vtype",),
+ "private": ("value",),
+ "public": ("value",),
+ "unicast": ("value",),
+ "range_usable": ("vtype",),
+ "wrap": ("vtype", "value"),
+ }
+
+ query_func_map = {
+ "": _empty_ipaddr_query,
+ "6to4": _6to4_query,
+ "address": _ip_query,
+ "address/prefix": _address_prefix_query, # deprecate
+ "bool": _bool_ipaddr_query,
+ "broadcast": _broadcast_query,
+ "cidr": _cidr_query,
+ "cidr_lookup": _cidr_lookup_query,
+ "first_usable": _first_usable_query,
+ "gateway": _gateway_query, # deprecate
+ "gw": _gateway_query, # deprecate
+ "host": _host_query,
+ "host/prefix": _address_prefix_query, # deprecate
+ "hostmask": _hostmask_query,
+ "hostnet": _gateway_query, # deprecate
+ "int": _int_query,
+ "ip": _ip_query,
+ "ip/prefix": _ip_prefix_query,
+ "ip_netmask": _ip_netmask_query,
+ # 'ip_wildcard': _ip_wildcard_query, built then could not think of use case
+ "ipv4": _ipv4_query,
+ "ipv6": _ipv6_query,
+ "last_usable": _last_usable_query,
+ "link-local": _link_local_query,
+ "lo": _loopback_query,
+ "loopback": _loopback_query,
+ "multicast": _multicast_query,
+ "net": _net_query,
+ "next_usable": _next_usable_query,
+ "netmask": _netmask_query,
+ "network": _network_query,
+ "network_id": _network_id_query,
+ "network/prefix": _subnet_query,
+ "network_netmask": _network_netmask_query,
+ "network_wildcard": _network_wildcard_query,
+ "peer": _peer_query,
+ "prefix": _prefix_query,
+ "previous_usable": _previous_usable_query,
+ "private": _private_query,
+ "public": _public_query,
+ "range_usable": _range_usable_query,
+ "revdns": _revdns_query,
+ "router": _gateway_query, # deprecate
+ "size": _size_query,
+ "size_usable": _size_usable_query,
+ "subnet": _subnet_query,
+ "type": _type_query,
+ "unicast": _unicast_query,
+ "v4": _ipv4_query,
+ "v6": _ipv6_query,
+ "version": _version_query,
+ "wildcard": _hostmask_query,
+ "wrap": _wrap_query,
+ }
+
+ vtype = None
+
+ if not value:
+ return False
+
+ elif value is True:
+ return False
+
+ # Check if value is a list and parse each element
+ elif isinstance(value, (list, tuple, types.GeneratorType)):
+
+ _ret = []
+ for element in value:
+ if ipaddr(element, str(query), version):
+ _ret.append(ipaddr(element, str(query), version))
+
+ if _ret:
+ return _ret
+ else:
+ return list()
+
+ # Check if value is a number and convert it to an IP address
+ elif str(value).isdigit():
+
+ # We don't know what IP version to assume, so let's check IPv4 first,
+ # then IPv6
+ try:
+ if (not version) or (version and version == 4):
+ v = netaddr.IPNetwork("0.0.0.0/0")
+ v.value = int(value)
+ v.prefixlen = 32
+ elif version and version == 6:
+ v = netaddr.IPNetwork("::/0")
+ v.value = int(value)
+ v.prefixlen = 128
+
+ # IPv4 didn't work the first time, so it definitely has to be IPv6
+ except Exception:
+ try:
+ v = netaddr.IPNetwork("::/0")
+ v.value = int(value)
+ v.prefixlen = 128
+
+ # The value is too big for IPv6. Are you a nanobot?
+ except Exception:
+ return False
+
+ # We got an IP address, let's mark it as such
+ value = str(v)
+ vtype = "address"
+
+ # value has not been recognized, check if it's a valid IP string
+ else:
+ try:
+ v = netaddr.IPNetwork(value)
+
+ # value is a valid IP string, check if user specified
+ # CIDR prefix or just an IP address, this will indicate default
+ # output format
+ try:
+ address, prefix = value.split("/")
+ vtype = "network"
+ except Exception:
+ vtype = "address"
+
+ # value hasn't been recognized, maybe it's a numerical CIDR?
+ except Exception:
+ try:
+ address, prefix = value.split("/")
+ address.isdigit()
+ address = int(address)
+ prefix.isdigit()
+ prefix = int(prefix)
+
+ # It's not numerical CIDR, give up
+ except Exception:
+ return False
+
+ # It is something, so let's try and build a CIDR from the parts
+ try:
+ v = netaddr.IPNetwork("0.0.0.0/0")
+ v.value = address
+ v.prefixlen = prefix
+
+ # It's not a valid IPv4 CIDR
+ except Exception:
+ try:
+ v = netaddr.IPNetwork("::/0")
+ v.value = address
+ v.prefixlen = prefix
+
+ # It's not a valid IPv6 CIDR. Give up.
+ except Exception:
+ return False
+
+ # We have a valid CIDR, so let's write it in correct format
+ value = str(v)
+ vtype = "network"
+
+ # We have a query string but it's not in the known query types. Check if
+ # that string is a valid subnet, if so, we can check later if given IP
+ # address/network is inside that specific subnet
+ try:
+ # ?? 6to4 and link-local were True here before. Should they still?
+ if (
+ query
+ and (query not in query_func_map or query == "cidr_lookup")
+ and not str(query).isdigit()
+ and ipaddr(query, "network")
+ ):
+ iplist = netaddr.IPSet([netaddr.IPNetwork(query)])
+ query = "cidr_lookup"
+ except Exception:
+ pass
+
+ # This code checks if value maches the IP version the user wants, ie. if
+ # it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()")
+ # If version does not match, return False
+ if version and v.version != version:
+ return False
+
+ extras = []
+ for arg in query_func_extra_args.get(query, tuple()):
+ extras.append(locals()[arg])
+ try:
+ return query_func_map[query](v, *extras)
+ except KeyError:
+ try:
+ float(query)
+ if v.size == 1:
+ if vtype == "address":
+ return str(v.ip)
+ elif vtype == "network":
+ return str(v)
+
+ elif v.size > 1:
+ try:
+ return str(v[query]) + "/" + str(v.prefixlen)
+ except Exception:
+ return False
+
+ else:
+ return value
+
+ except Exception:
+ raise errors.AnsibleFilterError(
+ alias + ": unknown filter type: %s" % query
+ )
+
+ return False
+
+
+def ipmath(value, amount):
+ try:
+ if "/" in value:
+ ip = netaddr.IPNetwork(value).ip
+ else:
+ ip = netaddr.IPAddress(value)
+ except (netaddr.AddrFormatError, ValueError):
+ msg = "You must pass a valid IP address; {0} is invalid".format(value)
+ raise errors.AnsibleFilterError(msg)
+
+ if not isinstance(amount, int):
+ msg = (
+ "You must pass an integer for arithmetic; "
+ "{0} is not a valid integer"
+ ).format(amount)
+ raise errors.AnsibleFilterError(msg)
+
+ return str(ip + amount)
+
+
+def ipwrap(value, query=""):
+ try:
+ if isinstance(value, (list, tuple, types.GeneratorType)):
+ _ret = []
+ for element in value:
+ if ipaddr(element, query, version=False, alias="ipwrap"):
+ _ret.append(ipaddr(element, "wrap"))
+ else:
+ _ret.append(element)
+
+ return _ret
+ else:
+ _ret = ipaddr(value, query, version=False, alias="ipwrap")
+ if _ret:
+ return ipaddr(_ret, "wrap")
+ else:
+ return value
+
+ except Exception:
+ return value
+
+
+def ipv4(value, query=""):
+ return ipaddr(value, query, version=4, alias="ipv4")
+
+
+def ipv6(value, query=""):
+ return ipaddr(value, query, version=6, alias="ipv6")
+
+
+# Split given subnet into smaller subnets or find out the biggest subnet of
+# a given IP address with given CIDR prefix
+# Usage:
+#
+# - address or address/prefix | ipsubnet
+# returns CIDR subnet of a given input
+#
+# - address/prefix | ipsubnet(cidr)
+# returns number of possible subnets for given CIDR prefix
+#
+# - address/prefix | ipsubnet(cidr, index)
+# returns new subnet with given CIDR prefix
+#
+# - address | ipsubnet(cidr)
+# returns biggest subnet with given CIDR prefix that address belongs to
+#
+# - address | ipsubnet(cidr, index)
+# returns next indexed subnet which contains given address
+#
+# - address/prefix | ipsubnet(subnet/prefix)
+# return the index of the subnet in the subnet
+def ipsubnet(value, query="", index="x"):
+ """ Manipulate IPv4/IPv6 subnets """
+
+ try:
+ vtype = ipaddr(value, "type")
+ if vtype == "address":
+ v = ipaddr(value, "cidr")
+ elif vtype == "network":
+ v = ipaddr(value, "subnet")
+
+ value = netaddr.IPNetwork(v)
+ except Exception:
+ return False
+ query_string = str(query)
+ if not query:
+ return str(value)
+
+ elif query_string.isdigit():
+ vsize = ipaddr(v, "size")
+ query = int(query)
+
+ try:
+ float(index)
+ index = int(index)
+
+ if vsize > 1:
+ try:
+ return str(list(value.subnet(query))[index])
+ except Exception:
+ return False
+
+ elif vsize == 1:
+ try:
+ return str(value.supernet(query)[index])
+ except Exception:
+ return False
+
+ except Exception:
+ if vsize > 1:
+ try:
+ return str(len(list(value.subnet(query))))
+ except Exception:
+ return False
+
+ elif vsize == 1:
+ try:
+ return str(value.supernet(query)[0])
+ except Exception:
+ return False
+
+ elif query_string:
+ vtype = ipaddr(query, "type")
+ if vtype == "address":
+ v = ipaddr(query, "cidr")
+ elif vtype == "network":
+ v = ipaddr(query, "subnet")
+ else:
+ msg = "You must pass a valid subnet or IP address; {0} is invalid".format(
+ query_string
+ )
+ raise errors.AnsibleFilterError(msg)
+ query = netaddr.IPNetwork(v)
+ for i, subnet in enumerate(query.subnet(value.prefixlen), 1):
+ if subnet == value:
+ return str(i)
+ msg = "{0} is not in the subnet {1}".format(value.cidr, query.cidr)
+ raise errors.AnsibleFilterError(msg)
+ return False
+
+
+# Returns the nth host within a network described by value.
+# Usage:
+#
+# - address or address/prefix | nthhost(nth)
+# returns the nth host within the given network
+def nthhost(value, query=""):
+ """ Get the nth host within a given network """
+ try:
+ vtype = ipaddr(value, "type")
+ if vtype == "address":
+ v = ipaddr(value, "cidr")
+ elif vtype == "network":
+ v = ipaddr(value, "subnet")
+
+ value = netaddr.IPNetwork(v)
+ except Exception:
+ return False
+
+ if not query:
+ return False
+
+ try:
+ nth = int(query)
+ if value.size > nth:
+ return value[nth]
+
+ except ValueError:
+ return False
+
+ return False
+
+
+# Returns the next nth usable ip within a network described by value.
+def next_nth_usable(value, offset):
+ try:
+ vtype = ipaddr(value, "type")
+ if vtype == "address":
+ v = ipaddr(value, "cidr")
+ elif vtype == "network":
+ v = ipaddr(value, "subnet")
+
+ v = netaddr.IPNetwork(v)
+ except Exception:
+ return False
+
+ if type(offset) != int:
+ raise errors.AnsibleFilterError("Must pass in an integer")
+ if v.size > 1:
+ first_usable, last_usable = _first_last(v)
+ nth_ip = int(netaddr.IPAddress(int(v.ip) + offset))
+ if nth_ip >= first_usable and nth_ip <= last_usable:
+ return str(netaddr.IPAddress(int(v.ip) + offset))
+
+
+# Returns the previous nth usable ip within a network described by value.
+def previous_nth_usable(value, offset):
+ try:
+ vtype = ipaddr(value, "type")
+ if vtype == "address":
+ v = ipaddr(value, "cidr")
+ elif vtype == "network":
+ v = ipaddr(value, "subnet")
+
+ v = netaddr.IPNetwork(v)
+ except Exception:
+ return False
+
+ if type(offset) != int:
+ raise errors.AnsibleFilterError("Must pass in an integer")
+ if v.size > 1:
+ first_usable, last_usable = _first_last(v)
+ nth_ip = int(netaddr.IPAddress(int(v.ip) - offset))
+ if nth_ip >= first_usable and nth_ip <= last_usable:
+ return str(netaddr.IPAddress(int(v.ip) - offset))
+
+
+def _range_checker(ip_check, first, last):
+ """
+ Tests whether an ip address is within the bounds of the first and last address.
+
+ :param ip_check: The ip to test if it is within first and last.
+ :param first: The first IP in the range to test against.
+ :param last: The last IP in the range to test against.
+
+ :return: bool
+ """
+ if ip_check >= first and ip_check <= last:
+ return True
+ else:
+ return False
+
+
+def _address_normalizer(value):
+ """
+ Used to validate an address or network type and return it in a consistent format.
+ This is being used for future use cases not currently available such as an address range.
+
+ :param value: The string representation of an address or network.
+
+ :return: The address or network in the normalized form.
+ """
+ try:
+ vtype = ipaddr(value, "type")
+ if vtype == "address" or vtype == "network":
+ v = ipaddr(value, "subnet")
+ except Exception:
+ return False
+
+ return v
+
+
+def network_in_usable(value, test):
+ """
+ Checks whether 'test' is a useable address or addresses in 'value'
+
+ :param: value: The string representation of an address or network to test against.
+ :param test: The string representation of an address or network to validate if it is within the range of 'value'.
+
+ :return: bool
+ """
+ # normalize value and test variables into an ipaddr
+ v = _address_normalizer(value)
+ w = _address_normalizer(test)
+
+ # get first and last addresses as integers to compare value and test; or cathes value when case is /32
+ v_first = ipaddr(ipaddr(v, "first_usable") or ipaddr(v, "address"), "int")
+ v_last = ipaddr(ipaddr(v, "last_usable") or ipaddr(v, "address"), "int")
+ w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int")
+ w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int")
+
+ if _range_checker(w_first, v_first, v_last) and _range_checker(
+ w_last, v_first, v_last
+ ):
+ return True
+ else:
+ return False
+
+
+def network_in_network(value, test):
+ """
+ Checks whether the 'test' address or addresses are in 'value', including broadcast and network
+
+ :param: value: The network address or range to test against.
+ :param test: The address or network to validate if it is within the range of 'value'.
+
+ :return: bool
+ """
+ # normalize value and test variables into an ipaddr
+ v = _address_normalizer(value)
+ w = _address_normalizer(test)
+
+ # get first and last addresses as integers to compare value and test; or cathes value when case is /32
+ v_first = ipaddr(ipaddr(v, "network") or ipaddr(v, "address"), "int")
+ v_last = ipaddr(ipaddr(v, "broadcast") or ipaddr(v, "address"), "int")
+ w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int")
+ w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int")
+
+ if _range_checker(w_first, v_first, v_last) and _range_checker(
+ w_last, v_first, v_last
+ ):
+ return True
+ else:
+ return False
+
+
+def reduce_on_network(value, network):
+ """
+ Reduces a list of addresses to only the addresses that match a given network.
+
+ :param: value: The list of addresses to filter on.
+ :param: network: The network to validate against.
+
+ :return: The reduced list of addresses.
+ """
+ # normalize network variable into an ipaddr
+ n = _address_normalizer(network)
+
+ # get first and last addresses as integers to compare value and test; or cathes value when case is /32
+ n_first = ipaddr(ipaddr(n, "network") or ipaddr(n, "address"), "int")
+ n_last = ipaddr(ipaddr(n, "broadcast") or ipaddr(n, "address"), "int")
+
+ # create an empty list to fill and return
+ r = []
+
+ for address in value:
+ # normalize address variables into an ipaddr
+ a = _address_normalizer(address)
+
+ # get first and last addresses as integers to compare value and test; or cathes value when case is /32
+ a_first = ipaddr(ipaddr(a, "network") or ipaddr(a, "address"), "int")
+ a_last = ipaddr(ipaddr(a, "broadcast") or ipaddr(a, "address"), "int")
+
+ if _range_checker(a_first, n_first, n_last) and _range_checker(
+ a_last, n_first, n_last
+ ):
+ r.append(address)
+
+ return r
+
+
+# Returns the SLAAC address within a network for a given HW/MAC address.
+# Usage:
+#
+# - prefix | slaac(mac)
+def slaac(value, query=""):
+ """ Get the SLAAC address within given network """
+ try:
+ vtype = ipaddr(value, "type")
+ if vtype == "address":
+ v = ipaddr(value, "cidr")
+ elif vtype == "network":
+ v = ipaddr(value, "subnet")
+
+ if ipaddr(value, "version") != 6:
+ return False
+
+ value = netaddr.IPNetwork(v)
+ except Exception:
+ return False
+
+ if not query:
+ return False
+
+ try:
+ mac = hwaddr(query, alias="slaac")
+
+ eui = netaddr.EUI(mac)
+ except Exception:
+ return False
+
+ return eui.ipv6(value.network)
+
+
+# ---- HWaddr / MAC address filters ----
+def hwaddr(value, query="", alias="hwaddr"):
+ """ Check if string is a HW/MAC address and filter it """
+
+ query_func_extra_args = {"": ("value",)}
+
+ query_func_map = {
+ "": _empty_hwaddr_query,
+ "bare": _bare_query,
+ "bool": _bool_hwaddr_query,
+ "int": _int_hwaddr_query,
+ "cisco": _cisco_query,
+ "eui48": _win_query,
+ "linux": _linux_query,
+ "pgsql": _postgresql_query,
+ "postgresql": _postgresql_query,
+ "psql": _postgresql_query,
+ "unix": _unix_query,
+ "win": _win_query,
+ }
+
+ try:
+ v = netaddr.EUI(value)
+ except Exception:
+ if query and query != "bool":
+ raise errors.AnsibleFilterError(
+ alias + ": not a hardware address: %s" % value
+ )
+
+ extras = []
+ for arg in query_func_extra_args.get(query, tuple()):
+ extras.append(locals()[arg])
+ try:
+ return query_func_map[query](v, *extras)
+ except KeyError:
+ raise errors.AnsibleFilterError(
+ alias + ": unknown filter type: %s" % query
+ )
+
+ return False
+
+
+def macaddr(value, query=""):
+ return hwaddr(value, query, alias="macaddr")
+
+
+def _need_netaddr(f_name, *args, **kwargs):
+ raise errors.AnsibleFilterError(
+ "The %s filter requires python's netaddr be "
+ "installed on the ansible controller" % f_name
+ )
+
+
+def ip4_hex(arg, delimiter=""):
+ """ Convert an IPv4 address to Hexadecimal notation """
+ numbers = list(map(int, arg.split(".")))
+ return "{0:02x}{sep}{1:02x}{sep}{2:02x}{sep}{3:02x}".format(
+ *numbers, sep=delimiter
+ )
+
+
+# ---- Ansible filters ----
+class FilterModule(object):
+ """ IP address and network manipulation filters """
+
+ filter_map = {
+ # IP addresses and networks
+ "cidr_merge": cidr_merge,
+ "ipaddr": ipaddr,
+ "ipmath": ipmath,
+ "ipwrap": ipwrap,
+ "ip4_hex": ip4_hex,
+ "ipv4": ipv4,
+ "ipv6": ipv6,
+ "ipsubnet": ipsubnet,
+ "next_nth_usable": next_nth_usable,
+ "network_in_network": network_in_network,
+ "network_in_usable": network_in_usable,
+ "reduce_on_network": reduce_on_network,
+ "nthhost": nthhost,
+ "previous_nth_usable": previous_nth_usable,
+ "slaac": slaac,
+ # MAC / HW addresses
+ "hwaddr": hwaddr,
+ "macaddr": macaddr,
+ }
+
+ def filters(self):
+ if netaddr:
+ return self.filter_map
+ else:
+ # Need to install python's netaddr for these filters to work
+ return dict(
+ (f, partial(_need_netaddr, f)) for f in self.filter_map
+ )