summaryrefslogtreecommitdiffstats
path: root/contrib/cdsdump.py
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/cdsdump.py')
-rwxr-xr-xcontrib/cdsdump.py699
1 files changed, 0 insertions, 699 deletions
diff --git a/contrib/cdsdump.py b/contrib/cdsdump.py
deleted file mode 100755
index 3eee22f..0000000
--- a/contrib/cdsdump.py
+++ /dev/null
@@ -1,699 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (c) 2016-2021, OARC, Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in
-# the documentation and/or other materials provided with the
-# distribution.
-#
-# 3. Neither the name of the copyright holder nor the names of its
-# contributors may be used to endorse or promote products derived
-# from this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
-# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-import sys
-import logging
-import optparse
-import struct
-import socket
-from cbor2 import CBORDecoder;
-
-logging.basicConfig(format='%(levelname).5s: %(module)s:%(lineno)d: '
- '%(message)s')
-log = logging.getLogger(__name__)
-
-class SimpleValue(object):
- def __init__(self, value):
- self.value = value
-
- def get(self):
- return self.value
-
- def __repr__(self):
- return "{}".format(self.value)
-
-def decode_simple_value(self, fp, shareable_index=None):
- return SimpleValue(struct.unpack('>B', fp.read(1))[0])
-
-try:
- from cbor2.types import CBORSimpleValue
-except Exception:
- CBORSimpleValue = SimpleValue
-
-class LastValues(object):
- def __init__(self):
- self.reset()
-
- def reset(self):
- self.ts = None
- self.src_addr4 = None
- self.src_port4 = None
- self.dest_addr4 = None
- self.dest_port4 = None
- self.src_addr6 = None
- self.src_port6 = None
- self.dest_addr6 = None
- self.dest_port6 = None
- self.rlabel = []
- self.mlabel = []
- self.rr_type = None
- self.rr_class = None
- self.rr_ttl = None
-
-last = LastValues()
-
-MAX_RLABELS = 255
-MIN_RLABEL_SIZE = 3
-
-def get_rlabel(idx):
- rlabel_idx = -idx - 1
- try:
- label = last.rlabel.pop(rlabel_idx)
- last.rlabel.insert(0, label)
- return label
- except:
- raise Exception("rlabel index {} out of range".format(rlabel_idx))
-
-def add_rlabel(label):
- size = 0
- if isinstance(label, list):
- for l in label:
- if isinstance(l, str):
- size += len(l)
- if size < MIN_RLABEL_SIZE:
- return
- last.rlabel.insert(0, label)
- if len(last.rlabel) > MAX_RLABELS:
- last.rlabel.pop()
-
-def build_mlabel_label(label):
- if isinstance(label, int) and label < 0:
- label = get_rlabel(label)
- else:
- add_rlabel(label)
-
- if isinstance(label, str):
- last.mlabel.append(label)
- elif isinstance(label, list):
- if len(label) and isinstance(label[0], int):
- last.mlabel.append(label)
- return
- label = list(label)
- while len(label):
- last.mlabel.append(list(label))
- label.pop(0)
-
-def build_mlabel(rrs):
- for rr in rrs:
- if len(rr) and isinstance(rr[0], bool):
- continue
- if len(rr):
- build_mlabel_label(rr[0])
- if len(rr) > 1 and isinstance(rr[len(rr)-1], list):
- for l in rr[len(rr)-1]:
- build_mlabel_label(l)
-
-def parse_label(label, lvl):
- if isinstance(label, int) and label < 0:
- label = get_rlabel(label)
- else:
- add_rlabel(label)
-
- if isinstance(label, bytes):
- print((" " * lvl)+"label: {}".format(bytes))
- elif isinstance(label, list):
- if len(label) and isinstance(label[0], int) and label[0] < 0:
- dn = list(get_rlabel(label[0]))
- else:
- dn = list(label)
- print((" " * lvl)+"clabel: {}".format(dn))
- dnstr = []
- seen_mlabel = {}
- while len(dn):
- while isinstance(dn[0], int):
- if dn[0] in seen_mlabel:
- dn = [ "{ name compression loop }" ]
- break
- seen_mlabel[dn[0]] = 1
- dn = list(last.mlabel[dn[0]])
- dnstr.append(dn.pop(0))
- print((" " * lvl)+"label: "+ " . ".join(dnstr))
-
- else:
- raise Exception("invalid label type {}".format(type(label)))
-
-
-def parse_rrs(rrs, lvl):
- for rr in rrs:
- print((" " * lvl)+"rr:")
- lvl+=2
-
- if len(rr) and isinstance(rr[0], bool):
- print((" " * lvl)+"incomplete/broken DNS RR, no support for these yet")
- continue
-
- parse_label(rr.pop(0), lvl)
-
- bits = 0
- if isinstance(rr[0], CBORSimpleValue):
- bits = rr.pop(0).value
- print((" " * lvl)+"type (0): "+("yes" if bits & 1 else "no"))
- print((" " * lvl)+"class (1): "+("yes" if bits & 1<<1 else "no"))
- print((" " * lvl)+"ttl (2): "+("yes" if bits & 1<<2 else "no"))
- print((" " * lvl)+"rdlength(3): "+("yes" if bits & 1<<3 else "no"))
-
- rr_type = None
- rr_class = None
- rr_ttl = None
- rdlength = None
- if not bits:
- if len(rr) > 4:
- bits = 0xff
- elif len(rr) > 1:
- raise Exception("invalid rr, expected none (0) or all (4) optional values but got {}".format(len(rr)-1))
- if bits & 1:
- if not isinstance(rr[0], int):
- raise Exception("invalid rr.type, expected int but got: {}".format(type(rr[0])))
- rr_type = rr.pop(0)
- if bits & 1<<1:
- if not isinstance(rr[0], int):
- raise Exception("invalid rr.class, expected int but got: {}".format(type(rr[0])))
- rr_class = rr.pop(0)
- if bits & 1<<2:
- if not isinstance(rr[0], int):
- raise Exception("invalid rr.ttl, expected int but got: {}".format(type(rr[0])))
- rr_ttl = rr.pop(0)
- if bits & 1<<3:
- if not isinstance(rr[0], int):
- raise Exception("invalid rr.rdlength, expected int but got: {}".format(type(rr[0])))
- rdlength = rr.pop(0)
-
- if not rr_type:
- rr_type = last.rr_type
- if not rr_class:
- rr_class = last.rr_class
- if not rr_ttl:
- rr_ttl = last.rr_ttl
-
- print((" " * lvl)+"type: {}".format(rr_type))
- print((" " * lvl)+"class: {}".format(rr_class))
- print((" " * lvl)+"ttl: {}".format(rr_ttl))
- if rdlength:
- print((" " * lvl)+"rdlength: {}".format(rdlength))
-
- if rr_type != 41:
- last.rr_type = rr_type
- last.rr_class = rr_class
- last.rr_ttl = rr_ttl
-
- if isinstance(rr[0], bytes):
- print((" " * lvl)+"rdata: "+"".join("{:02x}".format(byte) for byte in rr.pop(0)))
- elif isinstance(rr[0], list):
- rdata = []
- for i in rr.pop(0):
- if isinstance(i, int) and i < 0:
- i = get_rlabel(i)
- elif not isinstance(i, bytes):
- add_rlabel(i)
-
- if isinstance(i, bytes):
- rdata.append("".join("{:02x}".format(byte) for byte in i))
- elif isinstance(i, list):
- dn = list(i)
- dnstr = []
- seen_mlabel = {}
- while len(dn):
- while isinstance(dn[0], int):
- if dn[0] in seen_mlabel:
- dn = [ "{ name compression loop }" ]
- break
- seen_mlabel[dn[0]] = 1
- dn = list(last.mlabel[dn[0]])
- dnstr.append(dn.pop(0))
- rdata.append("[ clabel: {} label: ".format(i) + " . ".join(dnstr) + " ]")
- else:
- raise Exception("invalid rr.rdata[], expected bytes|list but got: {}".format(type(i)))
-
- print((" " * lvl)+"rdata: "+" ".join(rdata))
- else:
- raise Exception("invalid rr.rdata, expected bytes|list but got: {}".format(type(rr[0])))
-
- lvl-=2
-
-def parse_qrs(qrs, lvl):
- for qr in qrs:
- print((" " * lvl)+"qr:")
- lvl+=2
- parse_label(qr.pop(0), lvl)
-
- rr_type = None
- rr_class = None
- if len(qr):
- if not isinstance(qr[0], int):
- raise Exception("invalid qr.type|class, expected int but got {}".format(type(qr[0])))
- if qr[0] > -1:
- rr_type = qr.pop(0)
- if len(qr):
- if not isinstance(qr[0], int):
- raise Exception("invalid qr.class, expected int but got {}".format(type(qr[0])))
- elif not qr[0] < 0:
- raise Exception("invalid qr.class, expected negative int but got positive")
- rr_class = -qr.pop(0) - 1
- else:
- rr_class = -qr.pop(0) - 1
-
- if not rr_type:
- rr_type = last.rr_type
- if not rr_class:
- rr_class = last.rr_class
-
- print((" " * lvl)+"type: {}".format(rr_type))
- print((" " * lvl)+"class: {}".format(rr_class))
-
- if rr_type != 41:
- last.rr_type = rr_type
- last.rr_class = rr_class
-
- lvl-=2
-
-def parse_dns_message(dns, lvl):
- print((" " * lvl)+"dns:")
- lvl+=2
-
- if isinstance(dns[0], bool):
- print((" " * lvl)+"incomplete/broken DNS packet, no support for these yet")
- return
-
- print((" " * lvl)+"header:")
- lvl+=2
- id = dns.pop(0)
- print((" " * lvl)+"id: {}".format(id))
- raw = dns.pop(0)
- print((" " * lvl)+"raw: 0x{:04x}".format(raw))
- lvl+=2
- print((" " * lvl)+" QR: "+("yes" if raw & 1<<15 else "no"))
- print((" " * lvl)+"Opcode: {}".format(((raw >> 11) & 0xf)))
- print((" " * lvl)+" AA: "+("yes" if raw & 1<<10 else "no"))
- print((" " * lvl)+" TC: "+("yes" if raw & 1<<9 else "no"))
- print((" " * lvl)+" RD: "+("yes" if raw & 1<<8 else "no"))
- print((" " * lvl)+" RA: "+("yes" if raw & 1<<7 else "no"))
- print((" " * lvl)+" Z: "+("yes" if raw & 1<<6 else "no"))
- print((" " * lvl)+" AD: "+("yes" if raw & 1<<5 else "no"))
- print((" " * lvl)+" CD: "+("yes" if raw & 1<<4 else "no"))
- print((" " * lvl)+" RCODE: {}".format(raw & 0xf))
- lvl-=2
-
- bits = 0
- if isinstance(dns[0], int) and dns[0] < 0:
- bits = -dns.pop(0) - 1
- print((" " * lvl)+"qdcount(0): "+("yes" if bits & 1 else "no"))
- print((" " * lvl)+"ancount(1): "+("yes" if bits & 1<<1 else "no"))
- print((" " * lvl)+"nscount(2): "+("yes" if bits & 1<<2 else "no"))
- print((" " * lvl)+"arcount(3): "+("yes" if bits & 1<<3 else "no"))
-
- if not bits:
- if isinstance(dns[0], int):
- bits = 0xff
-
- if bits & 1:
- if not isinstance(dns[0], int):
- raise Exception("invalid dns.header.qdcount, expected int but got: {}".format(type(dns[0])))
- print((" " * lvl)+"qdcount: {}".format(dns.pop(0)))
- if bits & 1<<1:
- if not isinstance(dns[0], int):
- raise Exception("invalid dns.header.ancount, expected int but got: {}".format(type(dns[0])))
- print((" " * lvl)+"ancount: {}".format(dns.pop(0)))
- if bits & 1<<2:
- if not isinstance(dns[0], int):
- raise Exception("invalid dns.header.nscount, expected int but got: {}".format(type(dns[0])))
- print((" " * lvl)+"nscount: {}".format(dns.pop(0)))
- if bits & 1<<3:
- if not isinstance(dns[0], int):
- raise Exception("invalid dns.header.arcount, expected int but got: {}".format(type(dns[0])))
- print((" " * lvl)+"arcount: {}".format(dns.pop(0)))
-
- bits = 0
- if isinstance(dns[0], CBORSimpleValue):
- bits = dns.pop(0).value
- print((" " * lvl)+"questions (0): "+("yes" if bits & 1 else "no"))
- print((" " * lvl)+"answers (1): "+("yes" if bits & 1<<1 else "no"))
- print((" " * lvl)+"authorities(2): "+("yes" if bits & 1<<2 else "no"))
- print((" " * lvl)+"additionals(3): "+("yes" if bits & 1<<3 else "no"))
-
- last.mlabel = []
- rlabel = list(last.rlabel)
- for n in range(4):
- if len(dns) > n and isinstance(dns[n], list):
- build_mlabel(dns[n])
- last.rlabel = rlabel
-
- if not bits:
- if len(dns) > 3:
- bits = 0xff
- elif len(dns) > 0:
- raise Exception("invalid dns.message rr's, expected none (0) or all (4) but got {}".format(len(dns)))
-
- if bits & 1:
- if not isinstance(dns[0], list):
- raise Exception("invalid dns.message.questions, expected list but got: {}".format(type(dns[0])))
- print((" " * lvl)+"questions:")
- parse_qrs(dns.pop(0), lvl+2)
- if bits & 1<<1:
- if not isinstance(dns[0], list):
- raise Exception("invalid dns.message.answers, expected list but got: {}".format(type(dns[0])))
- print((" " * lvl)+"answers:")
- parse_rrs(dns.pop(0), lvl+2)
- if bits & 1<<2:
- if not isinstance(dns[0], list):
- raise Exception("invalid dns.message.authorities, expected list but got: {}".format(type(dns[0])))
- print((" " * lvl)+"authorities:")
- parse_rrs(dns.pop(0), lvl+2)
- if bits & 1<<3:
- if not isinstance(dns[0], list):
- raise Exception("invalid dns.message.additionals, expected list but got: {}".format(type(dns[0])))
- print((" " * lvl)+"additionals:")
- parse_rrs(dns.pop(0), lvl+2)
-
- if len(dns):
- if isinstance(dns[0], bytes):
- print((" " * lvl)+"malformed: "+"".join("{:02x}".format(byte) for byte in dns.pop(0)))
- if len(dns):
- raise Exception("invalid dns.message, garbage at end: {}".format(dns))
-
-def parse_ip_header(ip_header, lvl):
- print((" " * lvl)+"ip_header:")
- lvl+=2
-
- print((" " * lvl)+"bits:")
- lvl+=2
- bits = ip_header.pop(0)
- reverse = False
- if isinstance(bits, int):
- if bits < 0:
- print((" " * lvl)+"reverse: yes")
- bits = -bits - 1
- reverse = True
- print((" " * lvl)+"family (0): "+("INET6" if bits & 1 else "INET"))
- print((" " * lvl)+"have_src (1): "+("yes" if bits & 1<<1 else "no"))
- print((" " * lvl)+"have_dest(2): "+("yes" if bits & 1<<2 else "no"))
- print((" " * lvl)+"have_port(3): "+("yes" if bits & 1<<3 else "no"))
- else:
- raise Exception("invalid ip_header.bits, expected int but got: {}".format(type(bits)))
- lvl-=2
-
- src_addr = None
- dest_addr = None
- src_port = None
- dest_port = None
-
- if bits & 1<<1:
- src_addr = ip_header.pop(0)
- if not isinstance(src_addr, bytes):
- raise Exception("invalid ip_header.src_addr, expected bytes but got: {}".format(type(src_addr)))
- else:
- if reverse:
- src_addr = last.dest_addr6 if bits & 1 else last.dest_addr4
- if not src_addr:
- raise Exception("invalid ip_header.bits, expected to have last dest addr but don't")
- else:
- src_addr = last.src_addr6 if bits & 1 else last.src_addr4
- if not src_addr:
- raise Exception("invalid ip_header.bits, expected to have last src addr but don't")
-
- if bits & 1<<2:
- dest_addr = ip_header.pop(0)
- if not isinstance(dest_addr, bytes):
- raise Exception("invalid ip_header.dest_addr, expected bytes but got: {}".format(type(dest_addr)))
- else:
- if reverse:
- dest_addr = last.src_addr6 if bits & 1 else last.src_addr4
- if not dest_addr:
- raise Exception("invalid ip_header.bits, expected to have last src addr but don't")
- else:
- dest_addr = last.dest_addr6 if bits & 1 else last.dest_addr4
- if not dest_addr:
- raise Exception("invalid ip_header.bits, expected to have last dest addr but don't")
-
- if bits & 1<<3:
- ports = ip_header.pop(0)
- if not isinstance(ports, int):
- raise Exception("invalid ip_header.src_dest_port, expected int but got: {}".format(type(ports)))
- if ports > 0xffff:
- src_port = ports & 0xffff
- dest_port = ports >> 16
- elif ports < 0:
- if reverse:
- src_port = last.dest_port6 if bits & 1 else last.dest_port4
- if src_port is None:
- raise Exception("invalid ip_header.bits, expected to have last dest port but don't")
- else:
- src_port = last.src_port6 if bits & 1 else last.src_port4
- if src_port is None:
- raise Exception("invalid ip_header.bits, expected to have last src port but don't")
- dest_port = -ports - 1
- else:
- src_port = ports
- if reverse:
- dest_port = last.src_port6 if bits & 1 else last.src_port4
- if dest_port is None:
- raise Exception("invalid ip_header.bits, expected to have last src port but don't")
- else:
- dest_port = last.dest_port6 if bits & 1 else last.dest_port4
- if dest_port is None:
- raise Exception("invalid ip_header.bits, expected to have last dest port but don't")
- else:
- if reverse:
- src_port = last.dest_port6 if bits & 1 else last.dest_port4
- if src_port is None:
- raise Exception("invalid ip_header.bits, expected to have last dest port but don't")
- else:
- src_port = last.src_port6 if bits & 1 else last.src_port4
- if src_port is None:
- raise Exception("invalid ip_header.bits, expected to have last src port but don't")
- if reverse:
- dest_port = last.src_port6 if bits & 1 else last.src_port4
- if dest_port is None:
- raise Exception("invalid ip_header.bits, expected to have last src port but don't")
- else:
- dest_port = last.dest_port6 if bits & 1 else last.dest_port4
- if dest_port is None:
- raise Exception("invalid ip_header.bits, expected to have last dest port but don't")
-
- print((" " * lvl)+" src addr: " + socket.inet_ntop(socket.AF_INET6 if bits & 1 else socket.AF_INET, src_addr))
- print((" " * lvl)+"dest addr: " + socket.inet_ntop(socket.AF_INET6 if bits & 1 else socket.AF_INET, dest_addr))
- print((" " * lvl)+" src port: {}".format(src_port))
- print((" " * lvl)+"dest port: {}".format(dest_port))
-
- if bits & 1:
- last.src_addr6 = src_addr
- last.dest_addr6 = dest_addr
- last.src_port6 = src_port
- last.dest_port6 = dest_port
- else:
- last.src_addr4 = src_addr
- last.dest_addr4 = dest_addr
- last.src_port4 = src_port
- last.dest_port4 = dest_port
-
-
-def parse_message_bits(bits, lvl):
- print((" " * lvl)+"message_bits:")
- lvl+=2
- dns = "no"
- if isinstance(bits, int):
- if bits & 1:
- dns = "yes"
- print((" " * lvl)+"dns (0): "+dns)
-
- if bits & 1<<1:
- proto = "tcp"
- elif dns == "yes":
- proto = "udp"
- else:
- proto = "icmp"
- print((" " * lvl)+"proto (1): "+proto)
-
- if bits & 1<<2:
- frag = "yes"
- else:
- frag = "no"
- print((" " * lvl)+"frag (2): "+frag)
-
- if bits & 1<<3:
- malformed = "yes"
- else:
- malformed = "no"
- print((" " * lvl)+"malformed(3): "+malformed)
-
- else:
- raise Exception("invalid message_bits, expected int but got: {}".format(type(bits)))
-
- return 1 if dns == "yes" else 0
-
-def parse_timestamp(ts, lvl):
- print((" " * lvl)+"timestamp:")
- lvl+=2
-
- if isinstance(ts, list):
- if ts[0] < 0:
- if not last.ts:
- raise Exception("invalid timestamp.seconds, got diff from last value but have no last value")
- if not len(last.ts) == len(ts):
- raise Exception("invalid timestamp.seconds, differentialy precision missmatch")
-
- ts[0] = last.ts[0] + ( -ts[0] - 1 )
- print((" " * lvl)+"seconds: {}".format(ts[0]))
-
- if len(ts) > 1:
- ts[1] = last.ts[1] + ts[1]
- print((" " * lvl)+"useconds: {}".format(ts[1]))
- if len(ts) > 2:
- ts[2] = last.ts[2] + ts[2]
- print((" " * lvl)+"nseconds: {}".format(ts[2]))
- else:
- print((" " * lvl)+"seconds: {}".format(ts[0]))
- if len(ts) > 1:
- print((" " * lvl)+"useconds: {}".format(ts[1]))
- if len(ts) > 2:
- print((" " * lvl)+"nseconds: {}".format(ts[2]))
- last.ts = ts
-
- elif isinstance(ts, int):
- print((" " * lvl)+"seconds: {}".format(ts))
- else:
- raise Exception("invalid timestamp, expected list|int but got: {}".format(type(ts)))
-
-def parse(cds):
- print("paket:")
- try:
- parse_timestamp(cds.pop(0), 2)
- is_dns = parse_message_bits(cds.pop(0), 2)
- parse_ip_header(cds, 2)
- if not is_dns:
- raise Exception("not dns? huh?")
- parse_dns_message(cds, 2)
- except IndexError as idx:
- if not str(idx) == "pop from empty list":
- raise
- print(" ...")
- except:
- raise
-
-def main():
- usage = '%prog [-v] [-h] <cds file...>'
- parser = optparse.OptionParser(usage, version='%prog 0.01')
- parser.add_option('-v', '--verbose', action='store_true', dest='verbose',
- help='turn verbose mode on')
-
- (options, args) = parser.parse_args()
-
- if options.verbose == True:
- log.setLevel(logging.DEBUG)
- log.debug('argv: %s', sys.argv)
- log.debug('options: %s', options)
- log.debug('args: %s', args)
- else:
- log.setLevel(logging.WARNING)
-
- if not args:
- parser.print_usage()
- exit(1)
-
- decoder = CBORDecoder()
- # if https://github.com/agronholm/cbor2/pull/5 is not merged/released yet
- if 0 not in decoder.special_decoders:
- decoder.special_decoders[0] = lambda self, fp, shareable_index=None: SimpleValue(0)
- decoder.special_decoders[1] = lambda self, fp, shareable_index=None: SimpleValue(1)
- decoder.special_decoders[2] = lambda self, fp, shareable_index=None: SimpleValue(2)
- decoder.special_decoders[3] = lambda self, fp, shareable_index=None: SimpleValue(3)
- decoder.special_decoders[4] = lambda self, fp, shareable_index=None: SimpleValue(4)
- decoder.special_decoders[5] = lambda self, fp, shareable_index=None: SimpleValue(5)
- decoder.special_decoders[6] = lambda self, fp, shareable_index=None: SimpleValue(6)
- decoder.special_decoders[7] = lambda self, fp, shareable_index=None: SimpleValue(7)
- decoder.special_decoders[8] = lambda self, fp, shareable_index=None: SimpleValue(8)
- decoder.special_decoders[9] = lambda self, fp, shareable_index=None: SimpleValue(9)
- decoder.special_decoders[10] = lambda self, fp, shareable_index=None: SimpleValue(10)
- decoder.special_decoders[11] = lambda self, fp, shareable_index=None: SimpleValue(11)
- decoder.special_decoders[12] = lambda self, fp, shareable_index=None: SimpleValue(12)
- decoder.special_decoders[13] = lambda self, fp, shareable_index=None: SimpleValue(13)
- decoder.special_decoders[14] = lambda self, fp, shareable_index=None: SimpleValue(14)
- decoder.special_decoders[15] = lambda self, fp, shareable_index=None: SimpleValue(15)
- decoder.special_decoders[16] = lambda self, fp, shareable_index=None: SimpleValue(16)
- decoder.special_decoders[17] = lambda self, fp, shareable_index=None: SimpleValue(17)
- decoder.special_decoders[18] = lambda self, fp, shareable_index=None: SimpleValue(18)
- decoder.special_decoders[19] = lambda self, fp, shareable_index=None: SimpleValue(19)
- decoder.special_decoders[24] = decode_simple_value
-
- version = None
-
- for f in args:
- log.debug('file: %s', f)
- with open(f, 'rb') as fp:
- obj = None
- try:
- obj = decoder.decode(fp)
- except Exception as e:
- if e.__str__().find("index out of range") == -1:
- raise
- if not isinstance(obj, list):
- raise Exception("Invalid element, expected an array but found: {}".format(type(obj)))
-
- version = obj.pop(0)
- if version != "CDSv1":
- raise Exception("Invalid version, expected CDSv1 but got: {}".format(version))
-
- while len(obj):
- opt = obj.pop(0)
- if not isinstance(opt, int):
- raise Exception("Invalid option, expected int but got: {}".format(type(opt)))
- if opt == 0:
- MAX_RLABELS = obj.pop(0)
- if not isinstance(MAX_RLABELS, int) or MAX_RLABELS < 1:
- raise Exception("Invalid option for maximum rlabels, got: {}".format(MAX_RLABELS))
- log.debug("Using maximum rlabels {}".format(MAX_RLABELS))
- elif opt == 1:
- MIN_RLABEL_SIZE = obj.pop(0)
- if not isinstance(MIN_RLABEL_SIZE, int) or MIN_RLABEL_SIZE < 1:
- raise Exception("Invalid option for minimum rlabel size, got: {}".format(MIN_RLABEL_SIZE))
- log.debug("Using minimum rlabel size {}".format(MIN_RLABEL_SIZE))
- else:
- raise Exception("Unknown option: {}".format(opt))
-
- while True:
- obj = None
- try:
- obj = decoder.decode(fp)
- except Exception as e:
- if e.__str__().find("index out of range") == -1:
- raise
- if obj is None:
- break
- if not isinstance(obj, list):
- raise Exception("Invalid element, expected an array but found: {}".format(type(obj)))
- parse(obj)
-
- last.reset()
-
-if __name__ == '__main__':
- main()