#!/usr/bin/env python #----------------------------------------------------------------------------- # Copyright (c) 2008 by David P. D. Moss. All rights reserved. # # Released under the BSD license. See the LICENSE file for details. #----------------------------------------------------------------------------- # # DISCLAIMER # # netaddr is not sponsored nor endorsed by IANA. # # Use of data from IANA (Internet Assigned Numbers Authority) is subject to # copyright and is provided with prior written permission. # # IANA data files included with netaddr are not modified in any way but are # parsed and made available to end users through an API. # # See README file and source code for URLs to latest copies of the relevant # files. # #----------------------------------------------------------------------------- """ Routines for accessing data published by IANA (Internet Assigned Numbers Authority). More details can be found at the following URLs :- - IANA Home Page - http://www.iana.org/ - IEEE Protocols Information Home Page - http://www.iana.org/protocols/ """ import sys as _sys from xml.sax import make_parser, handler from netaddr.core import Publisher, Subscriber from netaddr.ip import IPAddress, IPNetwork, IPRange, cidr_abbrev_to_verbose from netaddr.compat import _dict_items, _callable, _open_binary #: Topic based lookup dictionary for IANA information. IANA_INFO = { 'IPv4': {}, 'IPv6': {}, 'IPv6_unicast': {}, 'multicast': {}, } class SaxRecordParser(handler.ContentHandler): def __init__(self, callback=None): self._level = 0 self._is_active = False self._record = None self._tag_level = None self._tag_payload = None self._tag_feeding = None self._callback = callback def startElement(self, name, attrs): self._level += 1 if self._is_active is False: if name == 'record': self._is_active = True self._tag_level = self._level self._record = {} if 'date' in attrs: self._record['date'] = attrs['date'] elif self._level == self._tag_level + 1: if name == 'xref': if 'type' in attrs and 'data' in attrs: l = self._record.setdefault(attrs['type'], []) l.append(attrs['data']) else: self._tag_payload = [] self._tag_feeding = True else: self._tag_feeding = False def endElement(self, name): if self._is_active is True: if name == 'record' and self._tag_level == self._level: self._is_active = False self._tag_level = None if _callable(self._callback): self._callback(self._record) self._record = None elif self._level == self._tag_level + 1: if name != 'xref': self._record[name] = ''.join(self._tag_payload) self._tag_payload = None self._tag_feeding = False self._level -= 1 def characters(self, content): if self._tag_feeding is True: self._tag_payload.append(content) class XMLRecordParser(Publisher): """ A configurable Parser that understands how to parse XML based records. """ def __init__(self, fh, **kwargs): """ Constructor. fh - a valid, open file handle to XML based record data. """ super(XMLRecordParser, self).__init__() self.xmlparser = make_parser() self.xmlparser.setContentHandler(SaxRecordParser(self.consume_record)) self.fh = fh self.__dict__.update(kwargs) def process_record(self, rec): """ This is the callback method invoked for every record. It is usually over-ridden by base classes to provide specific record-based logic. Any record can be vetoed (not passed to registered Subscriber objects) by simply returning None. """ return rec def consume_record(self, rec): record = self.process_record(rec) if record is not None: self.notify(record) def parse(self): """ Parse and normalises records, notifying registered subscribers with record data as it is encountered. """ self.xmlparser.parse(self.fh) class IPv4Parser(XMLRecordParser): """ A XMLRecordParser that understands how to parse and retrieve data records from the IANA IPv4 address space file. It can be found online here :- - http://www.iana.org/assignments/ipv4-address-space/ipv4-address-space.xml """ def __init__(self, fh, **kwargs): """ Constructor. fh - a valid, open file handle to an IANA IPv4 address space file. kwargs - additional parser options. """ super(IPv4Parser, self).__init__(fh) def process_record(self, rec): """ Callback method invoked for every record. See base class method for more details. """ record = {} for key in ('prefix', 'designation', 'date', 'whois', 'status'): record[key] = str(rec.get(key, '')).strip() # Strip leading zeros from octet. if '/' in record['prefix']: (octet, prefix) = record['prefix'].split('/') record['prefix'] = '%d/%d' % (int(octet), int(prefix)) record['status'] = record['status'].capitalize() return record class IPv6Parser(XMLRecordParser): """ A XMLRecordParser that understands how to parse and retrieve data records from the IANA IPv6 address space file. It can be found online here :- - http://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xml """ def __init__(self, fh, **kwargs): """ Constructor. fh - a valid, open file handle to an IANA IPv6 address space file. kwargs - additional parser options. """ super(IPv6Parser, self).__init__(fh) def process_record(self, rec): """ Callback method invoked for every record. See base class method for more details. """ record = { 'prefix': str(rec.get('prefix', '')).strip(), 'allocation': str(rec.get('description', '')).strip(), # HACK: -1 instead of 0 is a hacky hack to get 4291 instead of 3513 from # # was later obsoleted by # # I imagine there's no way to solve this in a general way, maybe we should start returning a list # of RFC-s here? 'reference': str(rec.get('rfc', [''])[-1]).strip(), } return record class IPv6UnicastParser(XMLRecordParser): """ A XMLRecordParser that understands how to parse and retrieve data records from the IANA IPv6 unicast address assignments file. It can be found online here :- - http://www.iana.org/assignments/ipv6-unicast-address-assignments/ipv6-unicast-address-assignments.xml """ def __init__(self, fh, **kwargs): """ Constructor. fh - a valid, open file handle to an IANA IPv6 address space file. kwargs - additional parser options. """ super(IPv6UnicastParser, self).__init__(fh) def process_record(self, rec): """ Callback method invoked for every record. See base class method for more details. """ record = { 'status': str(rec.get('status', '')).strip(), 'description': str(rec.get('description', '')).strip(), 'prefix': str(rec.get('prefix', '')).strip(), 'date': str(rec.get('date', '')).strip(), 'whois': str(rec.get('whois', '')).strip(), } return record class MulticastParser(XMLRecordParser): """ A XMLRecordParser that knows how to process the IANA IPv4 multicast address allocation file. It can be found online here :- - http://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml """ def __init__(self, fh, **kwargs): """ Constructor. fh - a valid, open file handle to an IANA IPv4 multicast address allocation file. kwargs - additional parser options. """ super(MulticastParser, self).__init__(fh) def normalise_addr(self, addr): """ Removes variations from address entries found in this particular file. """ if '-' in addr: (a1, a2) = addr.split('-') o1 = a1.strip().split('.') o2 = a2.strip().split('.') return '%s-%s' % ('.'.join([str(int(i)) for i in o1]), '.'.join([str(int(i)) for i in o2])) else: o1 = addr.strip().split('.') return '.'.join([str(int(i)) for i in o1]) def process_record(self, rec): """ Callback method invoked for every record. See base class method for more details. """ if 'addr' in rec: record = { 'address': self.normalise_addr(str(rec['addr'])), 'descr': str(rec.get('description', '')), } return record class DictUpdater(Subscriber): """ Concrete Subscriber that inserts records received from a Publisher into a dictionary. """ def __init__(self, dct, topic, unique_key): """ Constructor. dct - lookup dict or dict like object to insert records into. topic - high-level category name of data to be processed. unique_key - key name in data dict that uniquely identifies it. """ self.dct = dct self.topic = topic self.unique_key = unique_key def update(self, data): """ Callback function used by Publisher to notify this Subscriber about an update. Stores topic based information into dictionary passed to constructor. """ data_id = data[self.unique_key] if self.topic == 'IPv4': cidr = IPNetwork(cidr_abbrev_to_verbose(data_id)) self.dct[cidr] = data elif self.topic == 'IPv6': cidr = IPNetwork(cidr_abbrev_to_verbose(data_id)) self.dct[cidr] = data elif self.topic == 'IPv6_unicast': cidr = IPNetwork(data_id) self.dct[cidr] = data elif self.topic == 'multicast': iprange = None if '-' in data_id: # See if we can manage a single CIDR. (first, last) = data_id.split('-') iprange = IPRange(first, last) cidrs = iprange.cidrs() if len(cidrs) == 1: iprange = cidrs[0] else: iprange = IPAddress(data_id) self.dct[iprange] = data def load_info(): """ Parse and load internal IANA data lookups with the latest information from data files. """ ipv4 = IPv4Parser(_open_binary(__package__, 'ipv4-address-space.xml')) ipv4.attach(DictUpdater(IANA_INFO['IPv4'], 'IPv4', 'prefix')) ipv4.parse() ipv6 = IPv6Parser(_open_binary(__package__, 'ipv6-address-space.xml')) ipv6.attach(DictUpdater(IANA_INFO['IPv6'], 'IPv6', 'prefix')) ipv6.parse() ipv6ua = IPv6UnicastParser( _open_binary(__package__, 'ipv6-unicast-address-assignments.xml'), ) ipv6ua.attach(DictUpdater(IANA_INFO['IPv6_unicast'], 'IPv6_unicast', 'prefix')) ipv6ua.parse() mcast = MulticastParser(_open_binary(__package__, 'multicast-addresses.xml')) mcast.attach(DictUpdater(IANA_INFO['multicast'], 'multicast', 'address')) mcast.parse() def pprint_info(fh=None): """ Pretty prints IANA information to filehandle. """ if fh is None: fh = _sys.stdout for category in sorted(IANA_INFO): fh.write('-' * len(category) + "\n") fh.write(category + "\n") fh.write('-' * len(category) + "\n") ipranges = IANA_INFO[category] for iprange in sorted(ipranges): details = ipranges[iprange] fh.write('%-45r' % (iprange) + details + "\n") def _within_bounds(ip, ip_range): # Boundary checking for multiple IP classes. if hasattr(ip_range, 'first'): # IP network or IP range. return ip in ip_range elif hasattr(ip_range, 'value'): # IP address. return ip == ip_range raise Exception('Unsupported IP range or address: %r!' % (ip_range,)) def query(ip_addr): """Returns informational data specific to this IP address.""" info = {} if ip_addr.version == 4: for cidr, record in _dict_items(IANA_INFO['IPv4']): if _within_bounds(ip_addr, cidr): info.setdefault('IPv4', []) info['IPv4'].append(record) if ip_addr.is_multicast(): for iprange, record in _dict_items(IANA_INFO['multicast']): if _within_bounds(ip_addr, iprange): info.setdefault('Multicast', []) info['Multicast'].append(record) elif ip_addr.version == 6: for cidr, record in _dict_items(IANA_INFO['IPv6']): if _within_bounds(ip_addr, cidr): info.setdefault('IPv6', []) info['IPv6'].append(record) for cidr, record in _dict_items(IANA_INFO['IPv6_unicast']): if _within_bounds(ip_addr, cidr): info.setdefault('IPv6_unicast', []) info['IPv6_unicast'].append(record) return info # On module import, read IANA data files and populate lookups dict. load_info()