diff options
Diffstat (limited to 'netaddr/ip/iana.py')
-rwxr-xr-x | netaddr/ip/iana.py | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/netaddr/ip/iana.py b/netaddr/ip/iana.py new file mode 100755 index 0000000..c2c0185 --- /dev/null +++ b/netaddr/ip/iana.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python +#----------------------------------------------------------------------------- +# Copyright (c) 2008 by David P. D. Moss. All rights reserved. +# +# Released under the BSD license. See the LICENSE file for details. +#----------------------------------------------------------------------------- +# +# DISCLAIMER +# +# netaddr is not sponsored nor endorsed by IANA. +# +# Use of data from IANA (Internet Assigned Numbers Authority) is subject to +# copyright and is provided with prior written permission. +# +# IANA data files included with netaddr are not modified in any way but are +# parsed and made available to end users through an API. +# +# See README file and source code for URLs to latest copies of the relevant +# files. +# +#----------------------------------------------------------------------------- +""" +Routines for accessing data published by IANA (Internet Assigned Numbers +Authority). + +More details can be found at the following URLs :- + + - IANA Home Page - http://www.iana.org/ + - IEEE Protocols Information Home Page - http://www.iana.org/protocols/ +""" + +import sys as _sys +from xml.sax import make_parser, handler + +from netaddr.core import Publisher, Subscriber +from netaddr.ip import IPAddress, IPNetwork, IPRange, cidr_abbrev_to_verbose +from netaddr.compat import _dict_items, _callable, _open_binary + + + +#: Topic based lookup dictionary for IANA information. +IANA_INFO = { + 'IPv4': {}, + 'IPv6': {}, + 'IPv6_unicast': {}, + 'multicast': {}, +} + + +class SaxRecordParser(handler.ContentHandler): + def __init__(self, callback=None): + self._level = 0 + self._is_active = False + self._record = None + self._tag_level = None + self._tag_payload = None + self._tag_feeding = None + self._callback = callback + + def startElement(self, name, attrs): + self._level += 1 + + if self._is_active is False: + if name == 'record': + self._is_active = True + self._tag_level = self._level + self._record = {} + if 'date' in attrs: + self._record['date'] = attrs['date'] + elif self._level == self._tag_level + 1: + if name == 'xref': + if 'type' in attrs and 'data' in attrs: + l = self._record.setdefault(attrs['type'], []) + l.append(attrs['data']) + else: + self._tag_payload = [] + self._tag_feeding = True + else: + self._tag_feeding = False + + def endElement(self, name): + if self._is_active is True: + if name == 'record' and self._tag_level == self._level: + self._is_active = False + self._tag_level = None + if _callable(self._callback): + self._callback(self._record) + self._record = None + elif self._level == self._tag_level + 1: + if name != 'xref': + self._record[name] = ''.join(self._tag_payload) + self._tag_payload = None + self._tag_feeding = False + + self._level -= 1 + + def characters(self, content): + if self._tag_feeding is True: + self._tag_payload.append(content) + + +class XMLRecordParser(Publisher): + """ + A configurable Parser that understands how to parse XML based records. + """ + + def __init__(self, fh, **kwargs): + """ + Constructor. + + fh - a valid, open file handle to XML based record data. + """ + super(XMLRecordParser, self).__init__() + + self.xmlparser = make_parser() + self.xmlparser.setContentHandler(SaxRecordParser(self.consume_record)) + + self.fh = fh + + self.__dict__.update(kwargs) + + def process_record(self, rec): + """ + This is the callback method invoked for every record. It is usually + over-ridden by base classes to provide specific record-based logic. + + Any record can be vetoed (not passed to registered Subscriber objects) + by simply returning None. + """ + return rec + + def consume_record(self, rec): + record = self.process_record(rec) + if record is not None: + self.notify(record) + + def parse(self): + """ + Parse and normalises records, notifying registered subscribers with + record data as it is encountered. + """ + self.xmlparser.parse(self.fh) + + +class IPv4Parser(XMLRecordParser): + """ + A XMLRecordParser that understands how to parse and retrieve data records + from the IANA IPv4 address space file. + + It can be found online here :- + + - http://www.iana.org/assignments/ipv4-address-space/ipv4-address-space.xml + """ + + def __init__(self, fh, **kwargs): + """ + Constructor. + + fh - a valid, open file handle to an IANA IPv4 address space file. + + kwargs - additional parser options. + """ + super(IPv4Parser, self).__init__(fh) + + def process_record(self, rec): + """ + Callback method invoked for every record. + + See base class method for more details. + """ + + record = {} + for key in ('prefix', 'designation', 'date', 'whois', 'status'): + record[key] = str(rec.get(key, '')).strip() + + # Strip leading zeros from octet. + if '/' in record['prefix']: + (octet, prefix) = record['prefix'].split('/') + record['prefix'] = '%d/%d' % (int(octet), int(prefix)) + + record['status'] = record['status'].capitalize() + + return record + + +class IPv6Parser(XMLRecordParser): + """ + A XMLRecordParser that understands how to parse and retrieve data records + from the IANA IPv6 address space file. + + It can be found online here :- + + - http://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xml + """ + + def __init__(self, fh, **kwargs): + """ + Constructor. + + fh - a valid, open file handle to an IANA IPv6 address space file. + + kwargs - additional parser options. + """ + super(IPv6Parser, self).__init__(fh) + + def process_record(self, rec): + """ + Callback method invoked for every record. + + See base class method for more details. + """ + + record = { + 'prefix': str(rec.get('prefix', '')).strip(), + 'allocation': str(rec.get('description', '')).strip(), + # HACK: -1 instead of 0 is a hacky hack to get 4291 instead of 3513 from + # + # <xref type="rfc" data="rfc3513"/> was later obsoleted by <xref type="rfc" data="rfc4291"/> + # + # I imagine there's no way to solve this in a general way, maybe we should start returning a list + # of RFC-s here? + 'reference': str(rec.get('rfc', [''])[-1]).strip(), + } + + return record + + +class IPv6UnicastParser(XMLRecordParser): + """ + A XMLRecordParser that understands how to parse and retrieve data records + from the IANA IPv6 unicast address assignments file. + + It can be found online here :- + + - http://www.iana.org/assignments/ipv6-unicast-address-assignments/ipv6-unicast-address-assignments.xml + """ + def __init__(self, fh, **kwargs): + """ + Constructor. + + fh - a valid, open file handle to an IANA IPv6 address space file. + + kwargs - additional parser options. + """ + super(IPv6UnicastParser, self).__init__(fh) + + def process_record(self, rec): + """ + Callback method invoked for every record. + + See base class method for more details. + """ + record = { + 'status': str(rec.get('status', '')).strip(), + 'description': str(rec.get('description', '')).strip(), + 'prefix': str(rec.get('prefix', '')).strip(), + 'date': str(rec.get('date', '')).strip(), + 'whois': str(rec.get('whois', '')).strip(), + } + + return record + + +class MulticastParser(XMLRecordParser): + """ + A XMLRecordParser that knows how to process the IANA IPv4 multicast address + allocation file. + + It can be found online here :- + + - http://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml + """ + + def __init__(self, fh, **kwargs): + """ + Constructor. + + fh - a valid, open file handle to an IANA IPv4 multicast address + allocation file. + + kwargs - additional parser options. + """ + super(MulticastParser, self).__init__(fh) + + def normalise_addr(self, addr): + """ + Removes variations from address entries found in this particular file. + """ + if '-' in addr: + (a1, a2) = addr.split('-') + o1 = a1.strip().split('.') + o2 = a2.strip().split('.') + return '%s-%s' % ('.'.join([str(int(i)) for i in o1]), + '.'.join([str(int(i)) for i in o2])) + else: + o1 = addr.strip().split('.') + return '.'.join([str(int(i)) for i in o1]) + + def process_record(self, rec): + """ + Callback method invoked for every record. + + See base class method for more details. + """ + + if 'addr' in rec: + record = { + 'address': self.normalise_addr(str(rec['addr'])), + 'descr': str(rec.get('description', '')), + } + return record + + +class DictUpdater(Subscriber): + """ + Concrete Subscriber that inserts records received from a Publisher into a + dictionary. + """ + + def __init__(self, dct, topic, unique_key): + """ + Constructor. + + dct - lookup dict or dict like object to insert records into. + + topic - high-level category name of data to be processed. + + unique_key - key name in data dict that uniquely identifies it. + """ + self.dct = dct + self.topic = topic + self.unique_key = unique_key + + def update(self, data): + """ + Callback function used by Publisher to notify this Subscriber about + an update. Stores topic based information into dictionary passed to + constructor. + """ + data_id = data[self.unique_key] + + if self.topic == 'IPv4': + cidr = IPNetwork(cidr_abbrev_to_verbose(data_id)) + self.dct[cidr] = data + elif self.topic == 'IPv6': + cidr = IPNetwork(cidr_abbrev_to_verbose(data_id)) + self.dct[cidr] = data + elif self.topic == 'IPv6_unicast': + cidr = IPNetwork(data_id) + self.dct[cidr] = data + elif self.topic == 'multicast': + iprange = None + if '-' in data_id: + # See if we can manage a single CIDR. + (first, last) = data_id.split('-') + iprange = IPRange(first, last) + cidrs = iprange.cidrs() + if len(cidrs) == 1: + iprange = cidrs[0] + else: + iprange = IPAddress(data_id) + self.dct[iprange] = data + + +def load_info(): + """ + Parse and load internal IANA data lookups with the latest information from + data files. + """ + ipv4 = IPv4Parser(_open_binary(__package__, 'ipv4-address-space.xml')) + ipv4.attach(DictUpdater(IANA_INFO['IPv4'], 'IPv4', 'prefix')) + ipv4.parse() + + ipv6 = IPv6Parser(_open_binary(__package__, 'ipv6-address-space.xml')) + ipv6.attach(DictUpdater(IANA_INFO['IPv6'], 'IPv6', 'prefix')) + ipv6.parse() + + ipv6ua = IPv6UnicastParser( + _open_binary(__package__, 'ipv6-unicast-address-assignments.xml'), + ) + ipv6ua.attach(DictUpdater(IANA_INFO['IPv6_unicast'], 'IPv6_unicast', 'prefix')) + ipv6ua.parse() + + mcast = MulticastParser(_open_binary(__package__, 'multicast-addresses.xml')) + mcast.attach(DictUpdater(IANA_INFO['multicast'], 'multicast', 'address')) + mcast.parse() + + +def pprint_info(fh=None): + """ + Pretty prints IANA information to filehandle. + """ + if fh is None: + fh = _sys.stdout + + for category in sorted(IANA_INFO): + fh.write('-' * len(category) + "\n") + fh.write(category + "\n") + fh.write('-' * len(category) + "\n") + ipranges = IANA_INFO[category] + for iprange in sorted(ipranges): + details = ipranges[iprange] + fh.write('%-45r' % (iprange) + details + "\n") + + +def _within_bounds(ip, ip_range): + # Boundary checking for multiple IP classes. + if hasattr(ip_range, 'first'): + # IP network or IP range. + return ip in ip_range + elif hasattr(ip_range, 'value'): + # IP address. + return ip == ip_range + + raise Exception('Unsupported IP range or address: %r!' % (ip_range,)) + + +def query(ip_addr): + """Returns informational data specific to this IP address.""" + info = {} + + if ip_addr.version == 4: + for cidr, record in _dict_items(IANA_INFO['IPv4']): + if _within_bounds(ip_addr, cidr): + info.setdefault('IPv4', []) + info['IPv4'].append(record) + + if ip_addr.is_multicast(): + for iprange, record in _dict_items(IANA_INFO['multicast']): + if _within_bounds(ip_addr, iprange): + info.setdefault('Multicast', []) + info['Multicast'].append(record) + + elif ip_addr.version == 6: + for cidr, record in _dict_items(IANA_INFO['IPv6']): + if _within_bounds(ip_addr, cidr): + info.setdefault('IPv6', []) + info['IPv6'].append(record) + + for cidr, record in _dict_items(IANA_INFO['IPv6_unicast']): + if _within_bounds(ip_addr, cidr): + info.setdefault('IPv6_unicast', []) + info['IPv6_unicast'].append(record) + + return info + +# On module import, read IANA data files and populate lookups dict. +load_info() |