1310 lines
41 KiB
Python
1310 lines
41 KiB
Python
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
|
|
from datetime import timedelta
|
|
import os
|
|
import shutil
|
|
import time
|
|
|
|
import pytest
|
|
|
|
import isctest
|
|
from isctest.kasp import KeyTimingMetadata
|
|
|
|
pytestmark = pytest.mark.extra_artifacts(
|
|
[
|
|
"K*",
|
|
"common.test.*",
|
|
"future.test.*",
|
|
"in-the-middle.test.*",
|
|
"ksk-roll.test.*",
|
|
"last-bundle.test.*",
|
|
"past.test.*",
|
|
"two-tone.test.*",
|
|
"unlimited.test.*",
|
|
"ns1/K*",
|
|
"ns1/_default.nzd",
|
|
"ns1/_default.nzf",
|
|
"ns1/common.test.db",
|
|
"ns1/common.test.db.jbk",
|
|
"ns1/common.test.db.signed",
|
|
"ns1/common.test.db.signed.jnl",
|
|
"ns1/common.test.skr.2",
|
|
"ns1/future.test.db",
|
|
"ns1/future.test.db.jbk",
|
|
"ns1/future.test.db.signed",
|
|
"ns1/future.test.skr.1",
|
|
"ns1/in-the-middle.test.db",
|
|
"ns1/in-the-middle.test.db.jbk",
|
|
"ns1/in-the-middle.test.db.signed",
|
|
"ns1/in-the-middle.test.db.signed.jnl",
|
|
"ns1/in-the-middle.test.skr.1",
|
|
"ns1/keydir",
|
|
"ns1/ksk-roll.test.db",
|
|
"ns1/ksk-roll.test.db.jbk",
|
|
"ns1/ksk-roll.test.db.signed",
|
|
"ns1/ksk-roll.test.db.signed.jnl",
|
|
"ns1/ksk-roll.test.skr.1",
|
|
"ns1/last-bundle.test.db",
|
|
"ns1/last-bundle.test.db.jbk",
|
|
"ns1/last-bundle.test.db.signed",
|
|
"ns1/last-bundle.test.db.signed.jnl",
|
|
"ns1/last-bundle.test.skr.1",
|
|
"ns1/offline",
|
|
"ns1/past.test.db",
|
|
"ns1/past.test.db.jbk",
|
|
"ns1/past.test.db.signed",
|
|
"ns1/past.test.skr.1",
|
|
"ns1/two-tone.test.db",
|
|
"ns1/two-tone.test.db.jbk",
|
|
"ns1/two-tone.test.db.signed",
|
|
"ns1/two-tone.test.db.signed.jnl",
|
|
"ns1/two-tone.test.skr.1",
|
|
"ns1/unlimited.test.db",
|
|
"ns1/unlimited.test.db.jbk",
|
|
"ns1/unlimited.test.db.signed",
|
|
"ns1/unlimited.test.db.signed.jnl",
|
|
"ns1/unlimited.test.unlimited.skr.1",
|
|
]
|
|
)
|
|
|
|
|
|
def between(value, start, end):
|
|
if value is None or start is None or end is None:
|
|
return False
|
|
|
|
return start < value < end
|
|
|
|
|
|
def ksr(zone, policy, action, options="", raise_on_exception=True):
|
|
ksr_command = [
|
|
os.environ.get("KSR"),
|
|
"-l",
|
|
"ns1/named.conf",
|
|
"-k",
|
|
policy,
|
|
*options.split(),
|
|
action,
|
|
zone,
|
|
]
|
|
|
|
out = isctest.run.cmd(
|
|
ksr_command, log_stdout=True, raise_on_exception=raise_on_exception
|
|
)
|
|
return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
|
|
|
|
|
|
def check_keys(
|
|
keys,
|
|
lifetime,
|
|
alg=os.environ["DEFAULT_ALGORITHM_NUMBER"],
|
|
size=os.environ["DEFAULT_BITS"],
|
|
offset=0,
|
|
with_state=False,
|
|
):
|
|
# Check keys that were created.
|
|
num = 0
|
|
|
|
now = KeyTimingMetadata.now()
|
|
|
|
for key in keys:
|
|
# created: from keyfile plus offset
|
|
created = key.get_timing("Created") + offset
|
|
|
|
# active: retired previous key
|
|
active = created
|
|
if num > 0 and retired is not None:
|
|
active = retired
|
|
|
|
# published: dnskey-ttl + publish-safety + propagation
|
|
published = active - timedelta(hours=2, minutes=5)
|
|
|
|
# retired: zsk-lifetime
|
|
if lifetime is not None:
|
|
retired = active + lifetime
|
|
|
|
if key.is_ksk():
|
|
# removed: ttlds + retire-safety + parent-propagation
|
|
removed = retired + timedelta(days=1, hours=2)
|
|
else:
|
|
# removed: ttlsig + retire-safety + sign-delay + propagation
|
|
removed = retired + timedelta(days=10, hours=1, minutes=5)
|
|
else:
|
|
retired = None
|
|
removed = None
|
|
|
|
goal = "hidden"
|
|
state_dnskey = "hidden"
|
|
state_zrrsig = "hidden"
|
|
state_krrsig = "hidden"
|
|
state_ds = "hidden"
|
|
if retired is None or between(now, published, retired):
|
|
goal = "omnipresent"
|
|
pubdelay = published + timedelta(hours=2, minutes=5)
|
|
signdelay = active + timedelta(days=10, hours=1, minutes=5)
|
|
|
|
if between(now, published, pubdelay):
|
|
state_dnskey = "rumoured"
|
|
state_krrsig = "rumoured"
|
|
else:
|
|
state_dnskey = "omnipresent"
|
|
state_krrsig = "omnipresent"
|
|
|
|
if key.is_ksk():
|
|
state_ds = "hidden"
|
|
else:
|
|
if between(now, active, signdelay):
|
|
state_zrrsig = "rumoured"
|
|
else:
|
|
state_zrrsig = "omnipresent"
|
|
|
|
with open(key.statefile, "r", encoding="utf-8") as file:
|
|
metadata = file.read()
|
|
assert f"Algorithm: {alg}" in metadata
|
|
assert f"Length: {size}" in metadata
|
|
|
|
if key.is_ksk():
|
|
assert "KSK: yes" in metadata
|
|
else:
|
|
assert "KSK: no" in metadata
|
|
|
|
if key.is_zsk():
|
|
assert "ZSK: yes" in metadata
|
|
else:
|
|
assert "ZSK: no" in metadata
|
|
|
|
assert f"Published: {published}" in metadata
|
|
assert f"Active: {active}" in metadata
|
|
|
|
if lifetime is not None:
|
|
assert f"Retired: {retired}" in metadata
|
|
assert f"Removed: {removed}" in metadata
|
|
assert f"Lifetime: {int(lifetime.total_seconds())}" in metadata
|
|
else:
|
|
assert "Lifetime: 0" in metadata
|
|
assert "Retired:" not in metadata
|
|
assert "Removed:" not in metadata
|
|
|
|
if with_state:
|
|
assert f"GoalState: {goal}" in metadata
|
|
assert f"DNSKEYState: {state_dnskey}" in metadata
|
|
|
|
if key.is_ksk():
|
|
assert f"KRRSIGState: {state_krrsig}" in metadata
|
|
assert f"DSState: {state_ds}" in metadata
|
|
else:
|
|
assert "KRRSIGState:" not in metadata
|
|
assert "DSState:" not in metadata
|
|
|
|
if key.is_zsk():
|
|
assert f"ZRRSIGState: {state_zrrsig}" in metadata
|
|
else:
|
|
assert "ZRRSIGState:" not in metadata
|
|
|
|
num += 1
|
|
|
|
|
|
def check_key_bundle(bundle_keys, bundle_lines, cdnskey=False):
|
|
count = 0
|
|
for key in bundle_keys:
|
|
found = False
|
|
for line in bundle_lines:
|
|
if key.dnskey_equals(line, cdnskey):
|
|
found = True
|
|
count += 1
|
|
assert found
|
|
|
|
assert count == len(bundle_keys)
|
|
assert count == len(bundle_lines)
|
|
|
|
|
|
def check_cds_bundle(bundle_keys, bundle_lines, expected_cds):
|
|
count = 0
|
|
for key in bundle_keys:
|
|
found = False
|
|
# the cds of this ksk must be in the ksr
|
|
for line in bundle_lines:
|
|
for alg in expected_cds:
|
|
if key.cds_equals(line, alg.strip()):
|
|
found = True
|
|
count += 1
|
|
assert found
|
|
|
|
assert count == len(expected_cds) * len(bundle_keys)
|
|
assert count == len(bundle_lines)
|
|
|
|
|
|
def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart):
|
|
count = 0
|
|
for key in bundle_keys:
|
|
found = False
|
|
alg = key.get_metadata("Algorithm")
|
|
expect = f"{zone}. 3600 IN RRSIG {rrtype} {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
|
|
# there must be a signature of this ksk
|
|
for line in bundle_lines:
|
|
rrsig = " ".join(line.split())
|
|
if expect in rrsig:
|
|
found = True
|
|
count += 1
|
|
assert found
|
|
|
|
assert count == len(bundle_keys)
|
|
assert count == len(bundle_lines)
|
|
|
|
|
|
def check_keysigningrequest(out, zsks, start, end):
|
|
lines = out.split("\n")
|
|
line_no = 0
|
|
|
|
inception = start
|
|
while inception < end:
|
|
next_bundle = end + 1
|
|
# expect bundle header
|
|
assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no]
|
|
line_no += 1
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
# expect zsks
|
|
for key in zsks:
|
|
published = key.get_timing("Publish")
|
|
if between(published, inception, next_bundle):
|
|
next_bundle = published
|
|
|
|
removed = key.get_timing("Delete", must_exist=False)
|
|
if between(removed, inception, next_bundle):
|
|
next_bundle = removed
|
|
|
|
if published > inception:
|
|
continue
|
|
if removed is not None and inception >= removed:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
|
|
check_key_bundle(bundle_keys, bundle_lines)
|
|
|
|
inception = next_bundle
|
|
|
|
# ksr footer
|
|
assert ";; KeySigningRequest 1.0 generated at" in lines[line_no]
|
|
line_no += 1
|
|
|
|
# trailing empty lines
|
|
while line_no < len(lines):
|
|
assert lines[line_no] == ""
|
|
line_no += 1
|
|
|
|
assert line_no == len(lines)
|
|
|
|
|
|
def check_signedkeyresponse(
|
|
out,
|
|
zone,
|
|
ksks,
|
|
zsks,
|
|
start,
|
|
end,
|
|
refresh,
|
|
cdnskey=True,
|
|
cds="SHA-256",
|
|
):
|
|
lines = out.split("\n")
|
|
line_no = 0
|
|
next_bundle = end + 1
|
|
|
|
inception = start
|
|
while inception < end:
|
|
# A single signed key response may consist of:
|
|
# ;; SignedKeyResponse (header)
|
|
# ;; DNSKEY 257 (one per published key in ksks)
|
|
# ;; DNSKEY 256 (one per published key in zsks)
|
|
# ;; RRSIG(DNSKEY) (one per active key in ksks)
|
|
# ;; CDNSKEY (one per published key in ksks)
|
|
# ;; RRSIG(CDNSKEY) (one per active key in ksks)
|
|
# ;; CDS (one per published key in ksks)
|
|
# ;; RRSIG(CDS) (one per active key in ksks)
|
|
|
|
sigstart = inception - timedelta(hours=1) # clockskew
|
|
sigend = inception + timedelta(days=14) # sig-validity
|
|
next_bundle = sigend + refresh
|
|
|
|
# ignore empty lines
|
|
while line_no < len(lines):
|
|
if lines[line_no] == "":
|
|
line_no += 1
|
|
else:
|
|
break
|
|
|
|
# expect bundle header
|
|
assert f";; SignedKeyResponse 1.0 {inception}" in lines[line_no]
|
|
line_no += 1
|
|
|
|
# expect ksks
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
for key in ksks:
|
|
published = key.get_timing("Publish")
|
|
if between(published, inception, next_bundle):
|
|
next_bundle = published
|
|
|
|
removed = key.get_timing("Delete", must_exist=False)
|
|
|
|
if published > inception:
|
|
continue
|
|
if removed is not None and inception >= removed:
|
|
continue
|
|
|
|
if between(removed, inception, next_bundle):
|
|
next_bundle = removed
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
|
|
check_key_bundle(bundle_keys, bundle_lines)
|
|
|
|
# expect zsks
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
for key in zsks:
|
|
published = key.get_timing("Publish")
|
|
if between(published, inception, next_bundle):
|
|
next_bundle = published
|
|
|
|
removed = key.get_timing("Delete", must_exist=False)
|
|
if between(removed, inception, next_bundle):
|
|
next_bundle = removed
|
|
|
|
if published > inception:
|
|
continue
|
|
if removed is not None and inception >= removed:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
|
|
check_key_bundle(bundle_keys, bundle_lines)
|
|
|
|
# expect rrsig(dnskey)
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
for key in ksks:
|
|
active = key.get_timing("Activate")
|
|
inactive = key.get_timing("Inactive", must_exist=False)
|
|
if active > inception:
|
|
continue
|
|
if inactive is not None and inception >= inactive:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
|
|
check_rrsig_bundle(bundle_keys, bundle_lines, zone, "DNSKEY", sigend, sigstart)
|
|
|
|
# expect cdnskey
|
|
have_cdnskey = False
|
|
if cdnskey:
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
for key in ksks:
|
|
published = key.get_timing("SyncPublish")
|
|
if between(published, inception, next_bundle):
|
|
next_bundle = published
|
|
|
|
removed = key.get_timing("SyncDelete", must_exist=False)
|
|
if between(removed, inception, next_bundle):
|
|
next_bundle = removed
|
|
|
|
if published > inception:
|
|
continue
|
|
if removed is not None and inception >= removed:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
have_cdnskey = True
|
|
|
|
check_key_bundle(bundle_keys, bundle_lines, cdnskey=True)
|
|
|
|
if have_cdnskey:
|
|
# expect rrsig(cdnskey)
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
for key in ksks:
|
|
active = key.get_timing("Activate")
|
|
inactive = key.get_timing("Inactive", must_exist=False)
|
|
if active > inception:
|
|
continue
|
|
if inactive is not None and inception >= inactive:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
|
|
check_rrsig_bundle(
|
|
bundle_keys, bundle_lines, zone, "CDNSKEY", sigend, sigstart
|
|
)
|
|
|
|
# expect cds
|
|
have_cds = False
|
|
if cds != "":
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
expected_cds = cds.split(",")
|
|
for key in ksks:
|
|
published = key.get_timing("SyncPublish")
|
|
if between(published, inception, next_bundle):
|
|
next_bundle = published
|
|
|
|
removed = key.get_timing("SyncDelete", must_exist=False)
|
|
if between(removed, inception, next_bundle):
|
|
next_bundle = removed
|
|
|
|
if published > inception:
|
|
continue
|
|
if removed is not None and inception >= removed:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
# pylint: disable=unused-variable
|
|
for _arg in expected_cds:
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
have_cds = True
|
|
|
|
check_cds_bundle(bundle_keys, bundle_lines, expected_cds)
|
|
|
|
if have_cds:
|
|
# expect rrsig(cds)
|
|
bundle_keys = []
|
|
bundle_lines = []
|
|
for key in ksks:
|
|
active = key.get_timing("Activate")
|
|
inactive = key.get_timing("Inactive", must_exist=False)
|
|
if active > inception:
|
|
continue
|
|
if inactive is not None and inception >= inactive:
|
|
continue
|
|
|
|
# collect keys that should be in this bundle
|
|
# collect lines that should be in this bundle
|
|
bundle_keys.append(key)
|
|
bundle_lines.append(lines[line_no])
|
|
line_no += 1
|
|
|
|
check_rrsig_bundle(bundle_keys, bundle_lines, zone, "CDS", sigend, sigstart)
|
|
|
|
inception = next_bundle
|
|
|
|
# skr footer
|
|
assert ";; SignedKeyResponse 1.0 generated at" in lines[line_no]
|
|
line_no += 1
|
|
|
|
# trailing empty lines
|
|
while line_no < len(lines):
|
|
assert lines[line_no] == ""
|
|
line_no += 1
|
|
|
|
assert line_no == len(lines)
|
|
|
|
|
|
def test_ksr_errors():
|
|
# check that 'dnssec-ksr' errors on unknown action
|
|
_, err = ksr("common.test", "common", "foobar", raise_on_exception=False)
|
|
assert "dnssec-ksr: fatal: unknown command 'foobar'" in err
|
|
|
|
# check that 'dnssec-ksr keygen' errors on missing end date
|
|
_, err = ksr("common.test", "common", "keygen", raise_on_exception=False)
|
|
assert "dnssec-ksr: fatal: keygen requires an end date" in err
|
|
|
|
# check that 'dnssec-ksr keygen' errors on zone with csk
|
|
_, err = ksr(
|
|
"csk.test", "csk", "keygen", options="-K ns1 -e +2y", raise_on_exception=False
|
|
)
|
|
assert "dnssec-ksr: fatal: no keys created for policy 'csk'" in err
|
|
|
|
# check that 'dnssec-ksr request' errors on missing end date
|
|
_, err = ksr("common.test", "common", "request", raise_on_exception=False)
|
|
assert "dnssec-ksr: fatal: request requires an end date" in err
|
|
|
|
# check that 'dnssec-ksr sign' errors on missing ksr file
|
|
_, err = ksr(
|
|
"common.test",
|
|
"common",
|
|
"sign",
|
|
options="-K ns1/offline -i now -e +1y",
|
|
raise_on_exception=False,
|
|
)
|
|
assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in err
|
|
|
|
|
|
def test_ksr_common(servers):
|
|
# common test cases (1)
|
|
zone = "common.test"
|
|
policy = "common"
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 1
|
|
|
|
check_keys(ksks, None)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y")
|
|
zsks = isctest.kasp.keystr_to_keylist(out)
|
|
assert len(zsks) == 2
|
|
|
|
lifetime = timedelta(days=31 * 6)
|
|
check_keys(zsks, lifetime)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
# in the given key directory
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(zsks) == 2
|
|
|
|
lifetime = timedelta(days=31 * 6)
|
|
check_keys(zsks, lifetime)
|
|
|
|
for key in zsks:
|
|
privatefile = f"{key.path}.private"
|
|
keyfile = f"{key.path}.key"
|
|
statefile = f"{key.path}.state"
|
|
shutil.copyfile(privatefile, f"{privatefile}.backup")
|
|
shutil.copyfile(keyfile, f"{keyfile}.backup")
|
|
shutil.copyfile(statefile, f"{statefile}.backup")
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr
|
|
now = zsks[0].get_timing("Created")
|
|
until = now + timedelta(days=365)
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, now, until)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
|
|
)
|
|
|
|
fname = f"{zone}.skr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
|
|
|
|
# common test cases (2)
|
|
n = 2
|
|
|
|
# check that 'dnssec-ksr keygen' selects pregenerated keys for
|
|
# the same time bundle
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
|
|
selected_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(selected_zsks) == 2
|
|
for index, key in enumerate(selected_zsks):
|
|
assert zsks[index] == key
|
|
isctest.check.file_contents_equal(
|
|
f"{key.path}.private", f"{key.path}.private.backup"
|
|
)
|
|
isctest.check.file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
|
|
isctest.check.file_contents_equal(
|
|
f"{key.path}.state", f"{key.path}.state.backup"
|
|
)
|
|
|
|
# check that 'dnssec-ksr keygen' generates only necessary keys for
|
|
# overlapping time bundle
|
|
out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
|
|
overlapping_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(overlapping_zsks) == 4
|
|
|
|
verbose = err.split()
|
|
selected = 0
|
|
generated = 0
|
|
for output in verbose:
|
|
if "Selecting" in output:
|
|
selected += 1
|
|
if "Generating" in output:
|
|
generated += 1
|
|
# Subtract if there was a key collision.
|
|
if "collide" in output:
|
|
generated -= 1
|
|
|
|
assert selected == 2
|
|
assert generated == 2
|
|
for index, key in enumerate(overlapping_zsks):
|
|
if index < 2:
|
|
assert zsks[index] == key
|
|
isctest.check.file_contents_equal(
|
|
f"{key.path}.private", f"{key.path}.private.backup"
|
|
)
|
|
isctest.check.file_contents_equal(
|
|
f"{key.path}.key", f"{key.path}.key.backup"
|
|
)
|
|
isctest.check.file_contents_equal(
|
|
f"{key.path}.state", f"{key.path}.state.backup"
|
|
)
|
|
|
|
# run 'dnssec-ksr keygen' again with verbosity 0
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
|
|
overlapping_zsks2 = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(overlapping_zsks2) == 4
|
|
check_keys(overlapping_zsks2, lifetime)
|
|
for index, key in enumerate(overlapping_zsks2):
|
|
assert overlapping_zsks[index] == key
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr if the
|
|
# interval is shorter
|
|
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y")
|
|
|
|
fname = f"{zone}.ksr.{n}.shorter"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, now, until)
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr with new interval
|
|
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
until = now + timedelta(days=365 * 2)
|
|
check_keysigningrequest(out, overlapping_zsks, now, until)
|
|
|
|
# check that 'dnssec-ksr request' errors if there are not enough keys
|
|
_, err = ksr(
|
|
zone,
|
|
policy,
|
|
"request",
|
|
options=f"-K ns1 -i {now} -e +3y",
|
|
raise_on_exception=False,
|
|
)
|
|
error = f"no {zone}/ECDSAP256SHA256 zsk key pair found for bundle"
|
|
assert f"dnssec-ksr: fatal: {error}" in err
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +2y"
|
|
)
|
|
|
|
fname = f"{zone}.skr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(
|
|
out,
|
|
zone,
|
|
ksks,
|
|
overlapping_zsks,
|
|
now,
|
|
until,
|
|
refresh,
|
|
)
|
|
|
|
# add zone
|
|
ns1 = servers["ns1"]
|
|
ns1.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(fname, f"ns1/{fname}")
|
|
ns1.rndc(f"skr -import {fname} {zone}", log=False)
|
|
|
|
# test zone is correctly signed
|
|
# - check rndc dnssec -status output
|
|
isctest.kasp.check_dnssecstatus(ns1, zone, overlapping_zsks, policy=policy)
|
|
# - zone is signed
|
|
isctest.kasp.check_zone_is_signed(ns1, zone)
|
|
# - dnssec_verify
|
|
isctest.kasp.check_dnssec_verify(ns1, zone)
|
|
# - check keys
|
|
check_keys(overlapping_zsks, lifetime, with_state=True)
|
|
# - check apex
|
|
isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
|
|
# - check subdomain
|
|
isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
|
|
|
|
|
|
def test_ksr_lastbundle(servers):
|
|
zone = "last-bundle.test"
|
|
policy = "common"
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
offset = -timedelta(days=365)
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 1
|
|
|
|
check_keys(ksks, None, offset=offset)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(zsks) == 2
|
|
|
|
lifetime = timedelta(days=31 * 6)
|
|
check_keys(zsks, lifetime, offset=offset)
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr
|
|
then = zsks[0].get_timing("Created") + offset
|
|
until = then + timedelta(days=366)
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, then, until)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1d"
|
|
)
|
|
|
|
fname = f"{zone}.skr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
|
|
|
|
# add zone
|
|
ns1 = servers["ns1"]
|
|
ns1.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(fname, f"ns1/{fname}")
|
|
ns1.rndc(f"skr -import {fname} {zone}", log=False)
|
|
|
|
# test zone is correctly signed
|
|
# - check rndc dnssec -status output
|
|
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
|
|
# - zone is signed
|
|
isctest.kasp.check_zone_is_signed(ns1, zone)
|
|
# - dnssec_verify
|
|
isctest.kasp.check_dnssec_verify(ns1, zone)
|
|
# - check keys
|
|
check_keys(zsks, lifetime, offset=offset, with_state=True)
|
|
# - check apex
|
|
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
# - check subdomain
|
|
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
|
|
# check that last bundle warning is logged
|
|
warning = "last bundle in skr, please import new skr file"
|
|
assert f"zone {zone}/IN (signed): zone_rekey: {warning}" in ns1.log
|
|
|
|
|
|
def test_ksr_inthemiddle(servers):
|
|
zone = "in-the-middle.test"
|
|
policy = "common"
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
offset = -timedelta(days=365)
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 1
|
|
|
|
check_keys(ksks, None, offset=offset)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(zsks) == 4
|
|
|
|
lifetime = timedelta(days=31 * 6)
|
|
check_keys(zsks, lifetime, offset=offset)
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr
|
|
then = zsks[0].get_timing("Created")
|
|
then = then + offset
|
|
until = then + timedelta(days=365 * 2)
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, then, until)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1y"
|
|
)
|
|
|
|
fname = f"{zone}.skr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
|
|
|
|
# add zone
|
|
ns1 = servers["ns1"]
|
|
ns1.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(fname, f"ns1/{fname}")
|
|
ns1.rndc(f"skr -import {fname} {zone}", log=False)
|
|
|
|
# test zone is correctly signed
|
|
# - check rndc dnssec -status output
|
|
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
|
|
# - zone is signed
|
|
isctest.kasp.check_zone_is_signed(ns1, zone)
|
|
# - dnssec_verify
|
|
isctest.kasp.check_dnssec_verify(ns1, zone)
|
|
# - check keys
|
|
check_keys(zsks, lifetime, offset=offset, with_state=True)
|
|
# - check apex
|
|
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
# - check subdomain
|
|
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
|
|
# check that no last bundle warning is logged
|
|
warning = "last bundle in skr, please import new skr file"
|
|
assert f"zone {zone}/IN (signed): zone_rekey: {warning}" not in ns1.log
|
|
|
|
|
|
def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
now = KeyTimingMetadata.now()
|
|
then = now + offset
|
|
until = now + end
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 1
|
|
|
|
# key generation
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(zsks) == 2
|
|
|
|
# create request
|
|
now = zsks[0].get_timing("Created")
|
|
then = now + offset
|
|
until = now + end
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
# sign request
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e {until}"
|
|
)
|
|
|
|
fname = f"{zone}.skr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
# add zone
|
|
server.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(fname, f"ns1/{fname}")
|
|
server.rndc(f"skr -import {fname} {zone}", log=False)
|
|
|
|
# test that rekey logs error
|
|
time_remaining = 10
|
|
warning = "no available SKR bundle"
|
|
line = f"zone {zone}/IN (signed): zone_rekey failure: {warning}"
|
|
while time_remaining > 0:
|
|
if line not in server.log:
|
|
time_remaining -= 1
|
|
time.sleep(1)
|
|
else:
|
|
break
|
|
assert line in server.log
|
|
|
|
|
|
def test_ksr_rekey_logs_error(servers):
|
|
# check that an SKR that is too old logs error
|
|
check_ksr_rekey_logs_error(
|
|
servers["ns1"], "past.test", "common", -63072000, -31536000
|
|
)
|
|
# check that an SKR that is too new logs error
|
|
check_ksr_rekey_logs_error(
|
|
servers["ns1"], "future.test", "common", 2592000, 31536000
|
|
)
|
|
|
|
|
|
def test_ksr_unlimited(servers):
|
|
zone = "unlimited.test"
|
|
policy = "unlimited"
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 1
|
|
|
|
check_keys(ksks, None)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(zsks) == 1
|
|
|
|
lifetime = None
|
|
check_keys(zsks, lifetime)
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr
|
|
now = zsks[0].get_timing("Created")
|
|
until = now + timedelta(days=365 * 4)
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, now, until)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr without cdnskey
|
|
out, _ = ksr(
|
|
zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
|
|
)
|
|
|
|
skrfile = f"{zone}.no-cdnskey.skr.{n}"
|
|
with open(skrfile, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(
|
|
out,
|
|
zone,
|
|
ksks,
|
|
zsks,
|
|
now,
|
|
until,
|
|
refresh,
|
|
cdnskey=False,
|
|
cds="SHA-1, SHA-256, SHA-384",
|
|
)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr without cds
|
|
out, _ = ksr(
|
|
zone, "no-cds", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
|
|
)
|
|
|
|
skrfile = f"{zone}.no-cds.skr.{n}"
|
|
with open(skrfile, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(
|
|
out,
|
|
zone,
|
|
ksks,
|
|
zsks,
|
|
now,
|
|
until,
|
|
refresh,
|
|
cds="",
|
|
)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
|
|
)
|
|
|
|
skrfile = f"{zone}.{policy}.skr.{n}"
|
|
with open(skrfile, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
|
|
|
|
# add zone
|
|
ns1 = servers["ns1"]
|
|
ns1.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(skrfile, f"ns1/{skrfile}")
|
|
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
|
|
|
|
# test zone is correctly signed
|
|
# - check rndc dnssec -status output
|
|
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
|
|
# - zone is signed
|
|
isctest.kasp.check_zone_is_signed(ns1, zone)
|
|
# - dnssec_verify
|
|
isctest.kasp.check_dnssec_verify(ns1, zone)
|
|
# - check keys
|
|
check_keys(zsks, lifetime, with_state=True)
|
|
# - check apex
|
|
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
# - check subdomain
|
|
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
|
|
|
|
def test_ksr_twotone(servers):
|
|
zone = "two-tone.test"
|
|
policy = "two-tone"
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 2
|
|
|
|
ksks_defalg = []
|
|
ksks_altalg = []
|
|
for ksk in ksks:
|
|
alg = ksk.get_metadata("Algorithm")
|
|
if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"):
|
|
ksks_defalg.append(ksk)
|
|
elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"):
|
|
ksks_altalg.append(ksk)
|
|
|
|
assert len(ksks_defalg) == 1
|
|
assert len(ksks_altalg) == 1
|
|
|
|
check_keys(ksks_defalg, None)
|
|
|
|
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
|
|
size = os.environ.get("ALTERNATIVE_BITS")
|
|
check_keys(ksks_altalg, None, alg, size)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
# First algorithm keys have a lifetime of 3 months, so there should
|
|
# be 4 created keys. Second algorithm keys have a lifetime of 5
|
|
# months, so there should be 3 created keys. While only two time
|
|
# bundles of 5 months fit into one year, we need to create an extra
|
|
# key for the remainder of the bundle. So 7 in total.
|
|
assert len(zsks) == 7
|
|
|
|
zsks_defalg = []
|
|
zsks_altalg = []
|
|
for zsk in zsks:
|
|
alg = zsk.get_metadata("Algorithm")
|
|
if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"):
|
|
zsks_defalg.append(zsk)
|
|
elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"):
|
|
zsks_altalg.append(zsk)
|
|
|
|
assert len(zsks_defalg) == 4
|
|
assert len(zsks_altalg) == 3
|
|
|
|
lifetime = timedelta(days=31 * 3)
|
|
check_keys(zsks_defalg, lifetime)
|
|
|
|
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
|
|
size = os.environ.get("ALTERNATIVE_BITS")
|
|
lifetime = timedelta(days=31 * 5)
|
|
check_keys(zsks_altalg, lifetime, alg, size)
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr
|
|
now = zsks[0].get_timing("Created")
|
|
until = now + timedelta(days=365)
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, now, until)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
|
|
)
|
|
|
|
skrfile = f"{zone}.skr.{n}"
|
|
with open(skrfile, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -timedelta(days=5)
|
|
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
|
|
|
|
# add zone
|
|
ns1 = servers["ns1"]
|
|
ns1.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(skrfile, f"ns1/{skrfile}")
|
|
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
|
|
|
|
# test zone is correctly signed
|
|
# - check rndc dnssec -status output
|
|
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
|
|
# - zone is signed
|
|
isctest.kasp.check_zone_is_signed(ns1, zone)
|
|
# - dnssec_verify
|
|
isctest.kasp.check_dnssec_verify(ns1, zone)
|
|
# - check keys
|
|
lifetime = timedelta(days=31 * 3)
|
|
check_keys(zsks_defalg, lifetime, with_state=True)
|
|
|
|
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
|
|
size = os.environ.get("ALTERNATIVE_BITS")
|
|
lifetime = timedelta(days=31 * 5)
|
|
check_keys(zsks_altalg, lifetime, alg, size, with_state=True)
|
|
# - check apex
|
|
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
# - check subdomain
|
|
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
|
|
|
|
def test_ksr_kskroll(servers):
|
|
zone = "ksk-roll.test"
|
|
policy = "ksk-roll"
|
|
n = 1
|
|
|
|
# create ksk
|
|
kskdir = "ns1/offline"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
|
|
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
|
|
assert len(ksks) == 2
|
|
|
|
lifetime = timedelta(days=31 * 6)
|
|
check_keys(ksks, lifetime)
|
|
|
|
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
|
|
zskdir = "ns1"
|
|
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
|
|
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
|
|
assert len(zsks) == 1
|
|
|
|
check_keys(zsks, None)
|
|
|
|
# check that 'dnssec-ksr request' creates correct ksr
|
|
now = zsks[0].get_timing("Created")
|
|
until = now + timedelta(days=365)
|
|
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
|
|
|
|
fname = f"{zone}.ksr.{n}"
|
|
with open(fname, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
check_keysigningrequest(out, zsks, now, until)
|
|
|
|
# check that 'dnssec-ksr sign' creates correct skr
|
|
out, _ = ksr(
|
|
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
|
|
)
|
|
|
|
skrfile = f"{zone}.skr.{n}"
|
|
with open(skrfile, "w", encoding="utf-8") as file:
|
|
file.write(out)
|
|
|
|
refresh = -432000 # 5 days
|
|
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
|
|
|
|
# add zone
|
|
ns1 = servers["ns1"]
|
|
ns1.rndc(
|
|
f"addzone {zone} "
|
|
+ "{ type primary; file "
|
|
+ f'"{zone}.db"; dnssec-policy {policy}; '
|
|
+ "};",
|
|
log=False,
|
|
)
|
|
|
|
# import skr
|
|
shutil.copyfile(skrfile, f"ns1/{skrfile}")
|
|
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
|
|
|
|
# test zone is correctly signed
|
|
# - check rndc dnssec -status output
|
|
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
|
|
# - zone is signed
|
|
isctest.kasp.check_zone_is_signed(ns1, zone)
|
|
# - dnssec_verify
|
|
isctest.kasp.check_dnssec_verify(ns1, zone)
|
|
# - check keys
|
|
check_keys(zsks, None, with_state=True)
|
|
# - check apex
|
|
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
|
|
# - check subdomain
|
|
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
|