diff options
Diffstat (limited to '')
-rw-r--r-- | bin/python/isc/dnskey.py.in | 570 |
1 files changed, 570 insertions, 0 deletions
diff --git a/bin/python/isc/dnskey.py.in b/bin/python/isc/dnskey.py.in new file mode 100644 index 0000000..c4cdc57 --- /dev/null +++ b/bin/python/isc/dnskey.py.in @@ -0,0 +1,570 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +import os +import time +import calendar +from subprocess import Popen, PIPE + + +######################################################################## +# Class dnskey +######################################################################## +class TimePast(Exception): + def __init__(self, key, prop, value): + super(TimePast, self).__init__( + "%s time for key %s (%d) is already past" % (prop, key, value) + ) + + +class dnskey: + """An individual DNSSEC key. Identified by path, name, algorithm, keyid. + Contains a dictionary of metadata events.""" + + _PROPS = ( + "Created", + "Publish", + "Activate", + "Inactive", + "Delete", + "Revoke", + "DSPublish", + "SyncPublish", + "SyncDelete", + ) + _OPTS = (None, "-P", "-A", "-I", "-D", "-R", None, "-Psync", "-Dsync") + + _ALGNAMES = ( + None, + "RSAMD5", + "DH", + "DSA", + None, + "RSASHA1", + "NSEC3DSA", + "NSEC3RSASHA1", + "RSASHA256", + None, + "RSASHA512", + None, + "ECCGOST", + "ECDSAP256SHA256", + "ECDSAP384SHA384", + "ED25519", + "ED448", + ) + + def __init__(self, key, directory=None, keyttl=None): + # this makes it possible to use algname as a class or instance method + if isinstance(key, tuple) and len(key) == 3: + self._dir = directory or "." + (name, alg, keyid) = key + self.fromtuple(name, alg, keyid, keyttl) + + self._dir = directory or os.path.dirname(key) or "." + key = os.path.basename(key) + (name, alg, keyid) = key.split("+") + name = name[1:-1] + alg = int(alg) + keyid = int(keyid.split(".")[0]) + self.fromtuple(name, alg, keyid, keyttl) + + def fromtuple(self, name, alg, keyid, keyttl): + if name.endswith("."): + fullname = name + name = name.rstrip(".") + else: + fullname = name + "." + + keystr = "K%s+%03d+%05d" % (fullname, alg, keyid) + key_file = self._dir + (self._dir and os.sep or "") + keystr + ".key" + private_file = self._dir + (self._dir and os.sep or "") + keystr + ".private" + + self.keystr = keystr + + self.name = name + self.alg = int(alg) + self.keyid = int(keyid) + self.fullname = fullname + + kfp = open(key_file, "r") + for line in kfp: + if line[0] == ";": + continue + tokens = line.split() + if not tokens: + continue + + if tokens[1].lower() in ("in", "ch", "hs"): + septoken = 3 + self.ttl = keyttl + else: + septoken = 4 + self.ttl = int(tokens[1]) if not keyttl else keyttl + + if (int(tokens[septoken]) & 0x1) == 1: + self.sep = True + else: + self.sep = False + kfp.close() + + pfp = open(private_file, "r") + + self.metadata = dict() + self._changed = dict() + self._delete = dict() + self._times = dict() + self._fmttime = dict() + self._timestamps = dict() + self._original = dict() + self._origttl = None + + for line in pfp: + line = line.strip() + if not line or line[0] in ("!#"): + continue + punctuation = [line.find(c) for c in ":= "] + [len(line)] + found = min([pos for pos in punctuation if pos != -1]) + name = line[:found].rstrip() + value = line[found:].lstrip(":= ").rstrip() + self.metadata[name] = value + + for prop in dnskey._PROPS: + self._changed[prop] = False + if prop in self.metadata: + t = self.parsetime(self.metadata[prop]) + self._times[prop] = t + self._fmttime[prop] = self.formattime(t) + self._timestamps[prop] = self.epochfromtime(t) + self._original[prop] = self._timestamps[prop] + else: + self._times[prop] = None + self._fmttime[prop] = None + self._timestamps[prop] = None + self._original[prop] = None + + pfp.close() + + def commit(self, settime_bin, **kwargs): + quiet = kwargs.get("quiet", False) + cmd = [] + first = True + + if self._origttl is not None: + cmd += ["-L", str(self.ttl)] + + for prop, opt in zip(dnskey._PROPS, dnskey._OPTS): + if not opt or not self._changed[prop]: + continue + + delete = False + if prop in self._delete and self._delete[prop]: + delete = True + + when = "none" if delete else self._fmttime[prop] + cmd += [opt, when] + first = False + + if cmd: + fullcmd = ( + [settime_bin, "-K", self._dir] + + cmd + + [ + self.keystr, + ] + ) + if not quiet: + print("# " + " ".join(fullcmd)) + try: + p = Popen(fullcmd, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + if stderr: + raise Exception(str(stderr)) + except Exception as e: + raise Exception("unable to run %s: %s" % (settime_bin, str(e))) + self._origttl = None + for prop in dnskey._PROPS: + self._original[prop] = self._timestamps[prop] + self._changed[prop] = False + + @classmethod + def generate( + cls, + keygen_bin, + randomdev, + keys_dir, + name, + alg, + keysize, + sep, + ttl, + publish=None, + activate=None, + **kwargs + ): + quiet = kwargs.get("quiet", False) + + keygen_cmd = [keygen_bin, "-q", "-K", keys_dir, "-L", str(ttl)] + + if randomdev: + keygen_cmd += ["-r", randomdev] + + if sep: + keygen_cmd.append("-fk") + + if alg: + keygen_cmd += ["-a", alg] + + if keysize: + keygen_cmd += ["-b", str(keysize)] + + if publish: + t = dnskey.timefromepoch(publish) + keygen_cmd += ["-P", dnskey.formattime(t)] + + if activate: + t = dnskey.timefromepoch(activate) + keygen_cmd += ["-A", dnskey.formattime(activate)] + + keygen_cmd.append(name) + + if not quiet: + print("# " + " ".join(keygen_cmd)) + + p = Popen(keygen_cmd, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + if stderr: + raise Exception("unable to generate key: " + str(stderr)) + + try: + keystr = stdout.splitlines()[0].decode("ascii") + newkey = dnskey(keystr, keys_dir, ttl) + return newkey + except Exception as e: + raise Exception("unable to parse generated key: %s" % str(e)) + + def generate_successor(self, keygen_bin, randomdev, prepublish, **kwargs): + quiet = kwargs.get("quiet", False) + + if not self.inactive(): + raise Exception("predecessor key %s has no inactive date" % self) + + keygen_cmd = [keygen_bin, "-q", "-K", self._dir, "-S", self.keystr] + + if self.ttl: + keygen_cmd += ["-L", str(self.ttl)] + + if randomdev: + keygen_cmd += ["-r", randomdev] + + if prepublish: + keygen_cmd += ["-i", str(prepublish)] + + if not quiet: + print("# " + " ".join(keygen_cmd)) + + p = Popen(keygen_cmd, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + if stderr: + raise Exception("unable to generate key: " + stderr) + + try: + keystr = stdout.splitlines()[0].decode("ascii") + newkey = dnskey(keystr, self._dir, self.ttl) + return newkey + except: + raise Exception("unable to generate successor for key %s" % self) + + @staticmethod + def algstr(alg): + name = None + if alg in range(len(dnskey._ALGNAMES)): + name = dnskey._ALGNAMES[alg] + return name if name else ("%03d" % alg) + + @staticmethod + def algnum(alg): + if not alg: + return None + alg = alg.upper() + try: + return dnskey._ALGNAMES.index(alg) + except ValueError: + return None + + def algname(self, alg=None): + return self.algstr(alg or self.alg) + + @staticmethod + def timefromepoch(secs): + return time.gmtime(secs) + + @staticmethod + def parsetime(string): + return time.strptime(string, "%Y%m%d%H%M%S") + + @staticmethod + def epochfromtime(t): + return calendar.timegm(t) + + @staticmethod + def formattime(t): + return time.strftime("%Y%m%d%H%M%S", t) + + def setmeta(self, prop, secs, now, **kwargs): + force = kwargs.get("force", False) + + if self._timestamps[prop] == secs: + return + + if ( + self._original[prop] is not None + and self._original[prop] < now + and not force + ): + raise TimePast(self, prop, self._original[prop]) + + if secs is None: + self._changed[prop] = False if self._original[prop] is None else True + + self._delete[prop] = True + self._timestamps[prop] = None + self._times[prop] = None + self._fmttime[prop] = None + return + + t = self.timefromepoch(secs) + self._timestamps[prop] = secs + self._times[prop] = t + self._fmttime[prop] = self.formattime(t) + self._changed[prop] = ( + False if self._original[prop] == self._timestamps[prop] else True + ) + + def gettime(self, prop): + return self._times[prop] + + def getfmttime(self, prop): + return self._fmttime[prop] + + def gettimestamp(self, prop): + return self._timestamps[prop] + + def created(self): + return self._timestamps["Created"] + + def syncpublish(self): + return self._timestamps["SyncPublish"] + + def setsyncpublish(self, secs, now=time.time(), **kwargs): + self.setmeta("SyncPublish", secs, now, **kwargs) + + def publish(self): + return self._timestamps["Publish"] + + def setpublish(self, secs, now=time.time(), **kwargs): + self.setmeta("Publish", secs, now, **kwargs) + + def activate(self): + return self._timestamps["Activate"] + + def setactivate(self, secs, now=time.time(), **kwargs): + self.setmeta("Activate", secs, now, **kwargs) + + def revoke(self): + return self._timestamps["Revoke"] + + def setrevoke(self, secs, now=time.time(), **kwargs): + self.setmeta("Revoke", secs, now, **kwargs) + + def inactive(self): + return self._timestamps["Inactive"] + + def setinactive(self, secs, now=time.time(), **kwargs): + self.setmeta("Inactive", secs, now, **kwargs) + + def delete(self): + return self._timestamps["Delete"] + + def setdelete(self, secs, now=time.time(), **kwargs): + self.setmeta("Delete", secs, now, **kwargs) + + def syncdelete(self): + return self._timestamps["SyncDelete"] + + def setsyncdelete(self, secs, now=time.time(), **kwargs): + self.setmeta("SyncDelete", secs, now, **kwargs) + + def setttl(self, ttl): + if ttl is None or self.ttl == ttl: + return + elif self._origttl is None: + self._origttl = self.ttl + self.ttl = ttl + elif self._origttl == ttl: + self._origttl = None + self.ttl = ttl + else: + self.ttl = ttl + + def keytype(self): + return "KSK" if self.sep else "ZSK" + + def __str__(self): + return "%s/%s/%05d" % (self.name, self.algname(), self.keyid) + + def __repr__(self): + return "%s/%s/%05d (%s)" % ( + self.name, + self.algname(), + self.keyid, + ("KSK" if self.sep else "ZSK"), + ) + + def date(self): + return self.activate() or self.publish() or self.created() + + # keys are sorted first by zone name, then by algorithm. within + # the same name/algorithm, they are sorted according to their + # 'date' value: the activation date if set, OR the publication + # if set, OR the creation date. + def __lt__(self, other): + if self.name != other.name: + return self.name < other.name + if self.alg != other.alg: + return self.alg < other.alg + return self.date() < other.date() + + def check_prepub(self, output=None): + def noop(*args, **kwargs): + pass + + if not output: + output = noop + + now = int(time.time()) + a = self.activate() + p = self.publish() + + if not a: + return False + + if not p: + if a > now: + output( + "WARNING: Key %s is scheduled for\n" + "\t activation but not for publication." % repr(self) + ) + return False + + if p <= now and a <= now: + return True + + if p == a: + output( + "WARNING: %s is scheduled to be\n" + "\t published and activated at the same time. This\n" + "\t could result in a coverage gap if the zone was\n" + "\t previously signed. Activation should be at least\n" + "\t %s after publication." + % (repr(self), dnskey.duration(self.ttl) or "one DNSKEY TTL") + ) + return True + + if a < p: + output("WARNING: Key %s is active before it is published" % repr(self)) + return False + + if self.ttl is not None and a - p < self.ttl: + output( + "WARNING: Key %s is activated too soon\n" + "\t after publication; this could result in coverage \n" + "\t gaps due to resolver caches containing old data.\n" + "\t Activation should be at least %s after\n" + "\t publication." + % (repr(self), dnskey.duration(self.ttl) or "one DNSKEY TTL") + ) + return False + + return True + + def check_postpub(self, output=None, timespan=None): + def noop(*args, **kwargs): + pass + + if output is None: + output = noop + + if timespan is None: + timespan = self.ttl + + if timespan is None: + output("WARNING: Key %s using default TTL." % repr(self)) + timespan = 60 * 60 * 24 + + now = time.time() + d = self.delete() + i = self.inactive() + + if not d: + return False + + if not i: + if d > now: + output( + "WARNING: Key %s is scheduled for\n" + "\t deletion but not for inactivation." % repr(self) + ) + return False + + if d < now and i < now: + return True + + if d < i: + output( + "WARNING: Key %s is scheduled for\n" + "\t deletion before inactivation." % repr(self) + ) + return False + + if d - i < timespan: + output( + "WARNING: Key %s scheduled for\n" + "\t deletion too soon after deactivation; this may \n" + "\t result in coverage gaps due to resolver caches\n" + "\t containing old data. Deletion should be at least\n" + "\t %s after inactivation." % (repr(self), dnskey.duration(timespan)) + ) + return False + + return True + + @staticmethod + def duration(secs): + if not secs: + return None + + units = [ + ("year", 60 * 60 * 24 * 365), + ("month", 60 * 60 * 24 * 30), + ("day", 60 * 60 * 24), + ("hour", 60 * 60), + ("minute", 60), + ("second", 1), + ] + + output = [] + for unit in units: + v, secs = secs // unit[1], secs % unit[1] + if v > 0: + output.append("%d %s%s" % (v, unit[0], "s" if v > 1 else "")) + + return ", ".join(output) |