diff options
Diffstat (limited to '')
-rw-r--r-- | netaddr/fbsocket.py | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/netaddr/fbsocket.py b/netaddr/fbsocket.py new file mode 100644 index 0000000..0e376b3 --- /dev/null +++ b/netaddr/fbsocket.py @@ -0,0 +1,246 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2008 by David P. D. Moss. All rights reserved. +# +# Released under the BSD license. See the LICENSE file for details. +#----------------------------------------------------------------------------- +"""Fallback routines for Python's standard library socket module""" + +from struct import unpack as _unpack, pack as _pack + +from netaddr.compat import _bytes_join, _is_str + +AF_INET = 2 +AF_INET6 = 10 + + +def inet_ntoa(packed_ip): + """ + Convert an IP address from 32-bit packed binary format to string format. + """ + if not _is_str(packed_ip): + raise TypeError('string type expected, not %s' % type(packed_ip)) + + if len(packed_ip) != 4: + raise ValueError('invalid length of packed IP address string') + + return '%d.%d.%d.%d' % _unpack('4B', packed_ip) + + +def _compact_ipv6_tokens(tokens): + new_tokens = [] + + positions = [] + start_index = None + num_tokens = 0 + + # Discover all runs of zeros. + for idx, token in enumerate(tokens): + if token == '0': + if start_index is None: + start_index = idx + num_tokens += 1 + else: + if num_tokens > 1: + positions.append((num_tokens, start_index)) + start_index = None + num_tokens = 0 + + new_tokens.append(token) + + # Store any position not saved before loop exit. + if num_tokens > 1: + positions.append((num_tokens, start_index)) + + # Replace first longest run with an empty string. + if len(positions) != 0: + # Locate longest, left-most run of zeros. + positions.sort(key=lambda x: x[1]) + best_position = positions[0] + for position in positions: + if position[0] > best_position[0]: + best_position = position + # Replace chosen zero run. + (length, start_idx) = best_position + new_tokens = new_tokens[0:start_idx] + [''] + new_tokens[start_idx + length:] + + # Add start and end blanks so join creates '::'. + if new_tokens[0] == '': + new_tokens.insert(0, '') + + if new_tokens[-1] == '': + new_tokens.append('') + + return new_tokens + + +def inet_ntop(af, packed_ip): + """Convert an packed IP address of the given family to string format.""" + if af == AF_INET: + # IPv4. + return inet_ntoa(packed_ip) + elif af == AF_INET6: + # IPv6. + if len(packed_ip) != 16 or not _is_str(packed_ip): + raise ValueError('invalid length of packed IP address string') + + tokens = ['%x' % i for i in _unpack('>8H', packed_ip)] + + # Convert packed address to an integer value. + words = list(_unpack('>8H', packed_ip)) + int_val = 0 + for i, num in enumerate(reversed(words)): + word = num + word = word << 16 * i + int_val = int_val | word + + if 0xffff < int_val <= 0xffffffff or int_val >> 32 == 0xffff: + # IPv4 compatible / mapped IPv6. + packed_ipv4 = _pack('>2H', *[int(i, 16) for i in tokens[-2:]]) + ipv4_str = inet_ntoa(packed_ipv4) + tokens = tokens[0:-2] + [ipv4_str] + + return ':'.join(_compact_ipv6_tokens(tokens)) + else: + raise ValueError('unknown address family %d' % af) + + +def _inet_pton_af_inet(ip_string): + """ + Convert an IP address in string format (123.45.67.89) to the 32-bit packed + binary format used in low-level network functions. Differs from inet_aton + by only support decimal octets. Using octal or hexadecimal values will + raise a ValueError exception. + """ + #TODO: optimise this ... use inet_aton with mods if available ... + if _is_str(ip_string): + invalid_addr = ValueError('illegal IP address string %r' % ip_string) + # Support for hexadecimal and octal octets. + tokens = ip_string.split('.') + + # Pack octets. + if len(tokens) == 4: + words = [] + for token in tokens: + if token.startswith('0x') or (token.startswith('0') and len(token) > 1): + raise invalid_addr + try: + octet = int(token) + except ValueError: + raise invalid_addr + + if (octet >> 8) != 0: + raise invalid_addr + words.append(_pack('B', octet)) + return _bytes_join(words) + else: + raise invalid_addr + + raise ValueError('argument should be a string, not %s' % type(ip_string)) + + +def inet_pton(af, ip_string): + """ + Convert an IP address from string format to a packed string suitable for + use with low-level network functions. + """ + if af == AF_INET: + # IPv4. + return _inet_pton_af_inet(ip_string) + elif af == AF_INET6: + invalid_addr = ValueError('illegal IP address string %r' % ip_string) + # IPv6. + values = [] + + if not _is_str(ip_string): + raise invalid_addr + + if 'x' in ip_string: + # Don't accept hextets with the 0x prefix. + raise invalid_addr + + if '::' in ip_string: + if ip_string == '::': + # Unspecified address. + return '\x00'.encode() * 16 + # IPv6 compact mode. + try: + prefix, suffix = ip_string.split('::') + except ValueError: + raise invalid_addr + + l_prefix = [] + l_suffix = [] + + if prefix != '': + l_prefix = prefix.split(':') + + if suffix != '': + l_suffix = suffix.split(':') + + # IPv6 compact IPv4 compatibility mode. + if len(l_suffix) and '.' in l_suffix[-1]: + ipv4_str = _inet_pton_af_inet(l_suffix.pop()) + l_suffix.append('%x' % _unpack('>H', ipv4_str[0:2])[0]) + l_suffix.append('%x' % _unpack('>H', ipv4_str[2:4])[0]) + + token_count = len(l_prefix) + len(l_suffix) + + if not 0 <= token_count <= 8 - 1: + raise invalid_addr + + gap_size = 8 - ( len(l_prefix) + len(l_suffix) ) + + values = ( + [_pack('>H', int(i, 16)) for i in l_prefix] + + ['\x00\x00'.encode() for i in range(gap_size)] + + [_pack('>H', int(i, 16)) for i in l_suffix] + ) + try: + for token in l_prefix + l_suffix: + word = int(token, 16) + if not 0 <= word <= 0xffff: + raise invalid_addr + except ValueError: + raise invalid_addr + else: + # IPv6 verbose mode. + if ':' in ip_string: + tokens = ip_string.split(':') + + if '.' in ip_string: + ipv6_prefix = tokens[:-1] + if ipv6_prefix[:-1] != ['0', '0', '0', '0', '0']: + raise invalid_addr + + if ipv6_prefix[-1].lower() not in ('0', 'ffff'): + raise invalid_addr + + # IPv6 verbose IPv4 compatibility mode. + if len(tokens) != 7: + raise invalid_addr + + ipv4_str = _inet_pton_af_inet(tokens.pop()) + tokens.append('%x' % _unpack('>H', ipv4_str[0:2])[0]) + tokens.append('%x' % _unpack('>H', ipv4_str[2:4])[0]) + + values = [_pack('>H', int(i, 16)) for i in tokens] + else: + # IPv6 verbose mode. + if len(tokens) != 8: + raise invalid_addr + try: + tokens = [int(token, 16) for token in tokens] + for token in tokens: + if not 0 <= token <= 0xffff: + raise invalid_addr + + except ValueError: + raise invalid_addr + + values = [_pack('>H', i) for i in tokens] + else: + raise invalid_addr + + return _bytes_join(values) + else: + raise ValueError('Unknown address family %d' % af) |