From 45d6379135504814ab723b57f0eb8be23393a51d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 09:24:22 +0200 Subject: Adding upstream version 1:9.16.44. Signed-off-by: Daniel Baumann --- bin/python/isc/checkds.py.in | 226 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 bin/python/isc/checkds.py.in (limited to 'bin/python/isc/checkds.py.in') diff --git a/bin/python/isc/checkds.py.in b/bin/python/isc/checkds.py.in new file mode 100644 index 0000000..f2e6562 --- /dev/null +++ b/bin/python/isc/checkds.py.in @@ -0,0 +1,226 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +import argparse +import os +import sys +from subprocess import Popen, PIPE + +from isc.utils import prefix, version + +prog = "dnssec-checkds" + + +############################################################################ +# SECRR class: +# Class for DS resource record +############################################################################ +class SECRR: + hashalgs = {1: "SHA-1", 2: "SHA-256", 3: "GOST", 4: "SHA-384"} + rrname = "" + rrclass = "IN" + keyid = None + keyalg = None + hashalg = None + digest = "" + ttl = 0 + + def __init__(self, rrtext): + if not rrtext: + raise Exception + + # 'str' does not have decode method in python3 + if type(rrtext) is not str: + fields = rrtext.decode("ascii").split() + else: + fields = rrtext.split() + if len(fields) < 7: + raise Exception + + self.rrtype = "DS" + self.rrname = fields[0].lower() + + fields = fields[1:] + if fields[0].upper() in ["IN", "CH", "HS"]: + self.rrclass = fields[0].upper() + fields = fields[1:] + else: + self.ttl = int(fields[0]) + self.rrclass = fields[1].upper() + fields = fields[2:] + + if fields[0].upper() != self.rrtype: + raise Exception("%s does not match %s" % (fields[0].upper(), self.rrtype)) + + self.keyid, self.keyalg, self.hashalg = map(int, fields[1:4]) + self.digest = "".join(fields[4:]).upper() + + def __repr__(self): + return "%s %s %s %d %d %d %s" % ( + self.rrname, + self.rrclass, + self.rrtype, + self.keyid, + self.keyalg, + self.hashalg, + self.digest, + ) + + def __eq__(self, other): + return self.__repr__() == other.__repr__() + + +############################################################################ +# check: +# Fetch DS RRset for the given zone from the DNS; fetch DNSKEY +# RRset from the masterfile if specified, or from DNS if not. +# Generate a set of expected DS records from the DNSKEY RRset, +# and report on congruency. +############################################################################ +def check(zone, args): + rrlist = [] + if args.dssetfile: + fp = open(args.dssetfile).read() + else: + cmd = [args.dig, "+noall", "+answer", "-t", "ds", "-q", zone] + fp, _ = Popen(cmd, stdout=PIPE).communicate() + + for line in fp.splitlines(): + if type(line) is not str: + line = line.decode("ascii") + rrlist.append(SECRR(line)) + rrlist = sorted(rrlist, key=lambda rr: (rr.keyid, rr.keyalg, rr.hashalg)) + + klist = [] + + cmd = [args.dsfromkey] + for algo in args.algo: + cmd += ["-a", algo] + + if args.masterfile: + cmd += ["-f", args.masterfile, zone] + fp, _ = Popen(cmd, stdout=PIPE).communicate() + else: + intods, _ = Popen( + [args.dig, "+noall", "+answer", "-t", "dnskey", "-q", zone], stdout=PIPE + ).communicate() + cmd += ["-f", "-", zone] + fp, _ = Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(intods) + + for line in fp.splitlines(): + if type(line) is not str: + line = line.decode("ascii") + klist.append(SECRR(line)) + + if len(klist) < 1: + print("No DNSKEY records found in zone apex") + return False + + match = True + for rr in rrlist: + if rr not in klist: + print( + "KSK for %s %s/%03d/%05d (%s) missing from child" + % ( + rr.rrtype, + rr.rrname.strip("."), + rr.keyalg, + rr.keyid, + SECRR.hashalgs[rr.hashalg], + ) + ) + match = False + for rr in klist: + if rr not in rrlist: + print( + "%s for KSK %s/%03d/%05d (%s) missing from parent" + % ( + rr.rrtype, + rr.rrname.strip("."), + rr.keyalg, + rr.keyid, + SECRR.hashalgs[rr.hashalg], + ) + ) + match = False + for rr in klist: + if rr in rrlist: + print( + "%s for KSK %s/%03d/%05d (%s) found in parent" + % ( + rr.rrtype, + rr.rrname.strip("."), + rr.keyalg, + rr.keyid, + SECRR.hashalgs[rr.hashalg], + ) + ) + + return match + + +############################################################################ +# parse_args: +# Read command line arguments, set global 'args' structure +############################################################################ +def parse_args(): + parser = argparse.ArgumentParser(description=prog + ": checks DS coverage") + + bindir = "bin" + sbindir = "bin" if os.name == "nt" else "sbin" + + parser.add_argument("zone", type=str, help="zone to check") + parser.add_argument( + "-a", + "--algo", + dest="algo", + action="append", + default=[], + type=str, + help="DS digest algorithm", + ) + parser.add_argument( + "-d", + "--dig", + dest="dig", + default=os.path.join(prefix(bindir), "dig"), + type=str, + help="path to 'dig'", + ) + parser.add_argument( + "-D", + "--dsfromkey", + dest="dsfromkey", + default=os.path.join(prefix(sbindir), "dnssec-dsfromkey"), + type=str, + help="path to 'dnssec-dsfromkey'", + ) + parser.add_argument( + "-f", "--file", dest="masterfile", type=str, help="zone master file" + ) + parser.add_argument( + "-s", "--dsset", dest="dssetfile", type=str, help="prepared DSset file" + ) + parser.add_argument("-v", "--version", action="version", version=version) + args = parser.parse_args() + + args.zone = args.zone.strip(".") + + return args + + +############################################################################ +# Main +############################################################################ +def main(): + args = parse_args() + match = check(args.zone, args) + exit(0 if match else 1) -- cgit v1.2.3