1
0
Fork 0
bind9/bin/tests/system/isctest/kasp.py
Daniel Baumann f66ff7eae6
Adding upstream version 1:9.20.9.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:32:37 +02:00

1358 lines
44 KiB
Python

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from functools import total_ordering
import glob
import os
from pathlib import Path
import re
import subprocess
import time
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime, timedelta, timezone
import dns
import dns.tsig
import isctest.log
import isctest.query
import isctest.util
DEFAULT_TTL = 300
NEXT_KEY_EVENT_THRESHOLD = 100
def _query(server, qname, qtype, tsig=None):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
if tsig is not None:
tsigkey = tsig.split(":")
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring)
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
except dns.exception.Timeout:
isctest.log.debug(f"query timeout for query {qname} {qtype} to {server.ip}")
return None
return response
@total_ordering
class KeyTimingMetadata:
"""
Represent a single timing information for a key.
These objects can be easily compared, support addition and subtraction of
timedelta objects or integers(value in seconds). A lack of timing metadata
in the key (value 0) should be represented with None rather than an
instance of this object.
"""
FORMAT = "%Y%m%d%H%M%S"
def __init__(self, timestamp: str):
if int(timestamp) <= 0:
raise ValueError(f'invalid timing metadata value: "{timestamp}"')
self.value = datetime.strptime(timestamp, self.FORMAT).replace(
tzinfo=timezone.utc
)
def __repr__(self):
return self.value.strftime(self.FORMAT)
def __str__(self) -> str:
return self.value.strftime(self.FORMAT)
def __add__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value + other
return result
def __sub__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value - other
return result
def __iadd__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value += other
def __isub__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value -= other
def __lt__(self, other: "KeyTimingMetadata"):
return self.value < other.value
def __eq__(self, other: object):
return isinstance(other, KeyTimingMetadata) and self.value == other.value
@staticmethod
def now() -> "KeyTimingMetadata":
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = datetime.now(timezone.utc)
return result
class KeyProperties:
"""
Represent the (expected) properties a key should have.
"""
def __init__(
self,
name: str,
properties: dict,
metadata: dict,
timing: Dict[str, KeyTimingMetadata],
):
self.name = name
self.key = None
self.properties = properties
self.metadata = metadata
self.timing = timing
def __repr__(self):
return self.name
def __str__(self) -> str:
return self.name
@staticmethod
def default(with_state=True) -> "KeyProperties":
properties = {
"expect": True,
"private": True,
"legacy": False,
"role": "csk",
"role_full": "key-signing",
"dnskey_ttl": 3600,
"flags": 257,
}
metadata = {
"Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number,
"Length": 256,
"Lifetime": 0,
"KSK": "yes",
"ZSK": "yes",
}
timing: Dict[str, KeyTimingMetadata] = {}
result = KeyProperties(
name="DEFAULT", properties=properties, metadata=metadata, timing=timing
)
result.name = "DEFAULT"
result.key = None
if with_state:
result.metadata["GoalState"] = "omnipresent"
result.metadata["DNSKEYState"] = "rumoured"
result.metadata["KRRSIGState"] = "rumoured"
result.metadata["ZRRSIGState"] = "rumoured"
result.metadata["DSState"] = "hidden"
return result
def Ipub(self, config):
ipub = timedelta(0)
if self.key.get_metadata("Predecessor", must_exist=False) != "undefined":
# Ipub = Dprp + TTLkey
ipub = (
config["dnskey-ttl"]
+ config["zone-propagation-delay"]
+ config["publish-safety"]
)
self.timing["Active"] = self.timing["Published"] + ipub
def IpubC(self, config):
if not self.key.is_ksk():
return
ttl1 = config["dnskey-ttl"] + config["publish-safety"]
ttl2 = timedelta(0)
if self.key.get_metadata("Predecessor", must_exist=False) == "undefined":
# If this is the first key, we also need to wait until the zone
# signatures are omnipresent. Use max-zone-ttl instead of
# dnskey-ttl, and no publish-safety (because we are looking at
# signatures here, not the public key).
ttl2 = config["max-zone-ttl"]
# IpubC = DprpC + TTLkey
ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2)
self.timing["PublishCDS"] = self.timing["Published"] + ipubc
if self.metadata["Lifetime"] != 0:
self.timing["DeleteCDS"] = (
self.timing["PublishCDS"] + self.metadata["Lifetime"]
)
def Iret(self, config):
if self.metadata["Lifetime"] == 0:
return
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
safety_interval = config["retire-safety"]
iretKSK = timedelta(0)
iretZSK = timedelta(0)
if self.key.is_ksk():
# Iret = DprpP + TTLds
iretKSK = (
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
)
if self.key.is_zsk():
# Iret = Dsgn + Dprp + TTLsig
iretZSK = (
sign_delay
+ config["zone-propagation-delay"]
+ config["max-zone-ttl"]
+ safety_interval
)
self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK)
def set_expected_keytimes(self, config, offset=None, pregenerated=False):
if self.key is None:
raise ValueError("KeyProperties must be attached to a Key")
if self.properties["legacy"]:
return
if offset is None:
offset = self.properties["offset"]
self.timing["Generated"] = self.key.get_timing("Created")
self.timing["Published"] = self.timing["Generated"]
if pregenerated:
self.timing["Published"] = self.key.get_timing("Publish")
self.timing["Published"] = self.timing["Published"] + offset
self.Ipub(config)
# Set Retired timing metadata if key has lifetime.
if self.metadata["Lifetime"] != 0:
self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"]
self.IpubC(config)
self.Iret(config)
# Key state change times must exist, but since we cannot reliably tell
# when named made the actual state change, we don't care what the
# value is. Set it to None will verify that the metadata exists, but
# without actual checking the value.
self.timing["DNSKEYChange"] = None
if self.key.is_ksk():
self.timing["DSChange"] = None
self.timing["KRRSIGChange"] = None
if self.key.is_zsk():
self.timing["ZRRSIGChange"] = None
@total_ordering
class Key:
"""
Represent a key from a keyfile.
This object keeps track of its origin (keydir + name), can be used to
retrieve metadata from the underlying files and supports convenience
operations for KASP tests.
"""
def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None):
self.name = name
if keydir is None:
self.keydir = Path()
else:
self.keydir = Path(keydir)
self.path = str(self.keydir / name)
self.privatefile = f"{self.path}.private"
self.keyfile = f"{self.path}.key"
self.statefile = f"{self.path}.state"
self.tag = int(self.name[-5:])
def get_timing(
self, metadata: str, must_exist: bool = True
) -> Optional[KeyTimingMetadata]:
regex = rf";\s+{metadata}:\s+(\d+).*"
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
match = re.match(regex, line)
if match is not None:
try:
return KeyTimingMetadata(match.group(1))
except ValueError:
break
if must_exist:
raise ValueError(
f'timing metadata "{metadata}" for key "{self.name}" invalid'
)
return None
def get_metadata(
self, metadata: str, file=None, comment=False, must_exist=True
) -> str:
if file is None:
file = self.statefile
value = "undefined"
regex = rf"{metadata}:\s+(\S+).*"
if comment:
# The expected metadata is prefixed with a ';'.
regex = rf";\s+{metadata}:\s+(\S+).*"
with open(file, "r", encoding="utf-8") as fp:
for line in fp:
match = re.match(regex, line)
if match is not None:
value = match.group(1)
break
if must_exist and value == "undefined":
raise ValueError(
f'metadata "{metadata}" for key "{self.name}" in file "{file}" undefined'
)
return value
def get_signing_state(
self, offline_ksk=False, zsk_missing=False
) -> Tuple[bool, bool]:
"""
This returns the signing state derived from the key states, KRRSIGState
and ZRRSIGState.
If 'offline_ksk' is set to True, we determine the signing state from
the timing metadata. If 'zsigning' is True, ensure the current time is
between the Active and Retired timing metadata.
If 'zsk_missing' is set to True, it means the ZSK private key file is
missing, and the KSK should take over signing the RRset, and the
expected zone signing state (zsigning) is reversed.
"""
# Fetch key timing metadata.
now = KeyTimingMetadata.now()
activate = self.get_timing("Activate")
assert activate is not None # to silence mypy - its implied by line above
inactive = self.get_timing("Inactive", must_exist=False)
active = now >= activate
retired = inactive is not None and inactive <= now
signing = active and not retired
# Fetch key state metadata.
krrsigstate = self.get_metadata("KRRSIGState", must_exist=False)
ksigning = krrsigstate in ["rumoured", "omnipresent"]
zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False)
zsigning = zrrsigstate in ["rumoured", "omnipresent"]
if ksigning:
assert self.is_ksk()
if zsigning:
assert self.is_zsk()
# If the ZSK private key file is missing, revers the zone signing state.
if zsk_missing:
zsigning = not zsigning
# If testing offline KSK, retrieve the signing state from the key timing
# metadata.
if offline_ksk and signing and self.is_zsk():
assert zsigning
if offline_ksk and signing and self.is_ksk():
ksigning = signing
return ksigning, zsigning
def ttl(self) -> int:
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
if line.startswith(";"):
continue
return int(line.split()[1])
return 0
def dnskey(self):
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
if "DNSKEY" in line:
return line.strip()
return "undefined"
def is_ksk(self) -> bool:
return self.get_metadata("KSK") == "yes"
def is_zsk(self) -> bool:
return self.get_metadata("ZSK") == "yes"
def dnskey_equals(self, value, cdnskey=False):
dnskey = value.split()
if cdnskey:
# fourth element is the rrtype
assert dnskey[3] == "CDNSKEY"
dnskey[3] = "DNSKEY"
dnskey_fromfile = []
rdata = " ".join(dnskey[:7])
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
if f"{rdata}" in line:
dnskey_fromfile = line.split()
pubkey_fromfile = "".join(dnskey_fromfile[7:])
pubkey_fromwire = "".join(dnskey[7:])
return pubkey_fromfile == pubkey_fromwire
def cds_equals(self, value, alg):
cds = value.split()
dsfromkey_command = [
os.environ.get("DSFROMKEY"),
"-T",
str(self.ttl()),
"-a",
alg,
"-C",
"-w",
str(self.keyfile),
]
out = isctest.run.cmd(dsfromkey_command, log_stdout=True)
dsfromkey = out.stdout.decode("utf-8").split()
rdata_fromfile = " ".join(dsfromkey[:7])
rdata_fromwire = " ".join(cds[:7])
if rdata_fromfile != rdata_fromwire:
isctest.log.debug(
f"CDS RDATA MISMATCH: {rdata_fromfile} - {rdata_fromwire}"
)
return False
digest_fromfile = "".join(dsfromkey[7:]).lower()
digest_fromwire = "".join(cds[7:]).lower()
if digest_fromfile != digest_fromwire:
isctest.log.debug(
f"CDS DIGEST MISMATCH: {digest_fromfile} - {digest_fromwire}"
)
return False
return digest_fromfile == digest_fromwire
def is_metadata_consistent(self, key, metadata, checkval=True):
"""
If 'key' exists in 'metadata' then it must also exist in the state
meta data. Otherwise, it must not exist in the state meta data.
If 'checkval' is True, the meta data values must also match.
"""
if key in metadata:
if checkval:
value = self.get_metadata(key)
if value != f"{metadata[key]}":
isctest.log.debug(
f"{self.name} {key} METADATA MISMATCH: {value} - {metadata[key]}"
)
return value == f"{metadata[key]}"
return self.get_metadata(key) != "undefined"
value = self.get_metadata(key, must_exist=False)
if value != "undefined":
isctest.log.debug(f"{self.name} {key} METADATA UNEXPECTED: {value}")
return value == "undefined"
def is_timing_consistent(self, key, timing, file, comment=False):
"""
If 'key' exists in 'timing' then it must match the value in the state
timing data. Otherwise, it must also not exist in the state timing data.
"""
if key in timing:
value = self.get_metadata(key, file=file, comment=comment)
if value != str(timing[key]):
isctest.log.debug(
f"{self.name} {key} TIMING MISMATCH: {value} - {timing[key]}"
)
return value == str(timing[key])
value = self.get_metadata(key, file=file, comment=comment, must_exist=False)
if value != "undefined":
isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}")
return value == "undefined"
def match_properties(self, zone, properties):
"""
Check the key with given properties.
"""
if not properties.properties["expect"]:
return False
# Check file existence.
# Noop. If file is missing then the get_metadata calls will fail.
# Check the public key file.
role = properties.properties["role_full"]
comment = f"This is a {role} key, keyid {self.tag}, for {zone}."
if not isctest.util.file_contents_contain(self.keyfile, comment):
isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'")
return False
ttl = properties.properties["dnskey_ttl"]
flags = properties.properties["flags"]
alg = properties.metadata["Algorithm"]
dnskey = f"{zone}. {ttl} IN DNSKEY {flags} 3 {alg}"
if not isctest.util.file_contents_contain(self.keyfile, dnskey):
isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'")
return False
# Now check the private key file.
if properties.properties["private"]:
# Retrieve creation date.
created = self.get_metadata("Generated")
pval = self.get_metadata("Created", file=self.privatefile)
if pval != created:
isctest.log.debug(
f"{self.name} Created METADATA MISMATCH: {pval} - {created}"
)
return False
pval = self.get_metadata("Private-key-format", file=self.privatefile)
if pval != "v1.3":
isctest.log.debug(
f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3"
)
return False
pval = self.get_metadata("Algorithm", file=self.privatefile)
if pval != f"{alg}":
isctest.log.debug(
f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}"
)
return False
# Now check the key state file.
if properties.properties["legacy"]:
return True
comment = f"This is the state of key {self.tag}, for {zone}."
if not isctest.util.file_contents_contain(self.statefile, comment):
isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'")
return False
attributes = [
"Lifetime",
"Algorithm",
"Length",
"KSK",
"ZSK",
"GoalState",
"DNSKEYState",
"KRRSIGState",
"ZRRSIGState",
"DSState",
]
for key in attributes:
if not self.is_metadata_consistent(key, properties.metadata):
return False
# A match is found.
return True
def match_timingmetadata(self, timings, file=None, comment=False):
if file is None:
file = self.statefile
attributes = [
"Generated",
"Created",
"Published",
"Publish",
"PublishCDS",
"SyncPublish",
"Active",
"Activate",
"Retired",
"Inactive",
"Revoked",
"Removed",
"Delete",
]
for key in attributes:
if not self.is_timing_consistent(key, timings, file, comment=comment):
isctest.log.debug(f"{self.name} TIMING METADATA MISMATCH: {key}")
return False
return True
def __lt__(self, other: "Key"):
return self.name < other.name
def __eq__(self, other: object):
return isinstance(other, Key) and self.path == other.path
def __repr__(self):
return self.path
def check_zone_is_signed(server, zone, tsig=None):
addr = server.ip
fqdn = f"{zone}."
# wait until zone is fully signed
signed = False
for _ in range(10):
response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig)
if not isinstance(response, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} NSEC from {addr}")
elif response.rcode() != dns.rcode.NOERROR:
rcode = dns.rcode.to_text(response.rcode())
isctest.log.debug(f"{rcode} response for {fqdn} NSEC from {addr}")
else:
has_nsec = False
has_rrsig = False
for rr in response.answer:
if not has_nsec:
has_nsec = rr.match(
dns.name.from_text(fqdn),
dns.rdataclass.IN,
dns.rdatatype.NSEC,
dns.rdatatype.NONE,
)
if not has_rrsig:
has_rrsig = rr.match(
dns.name.from_text(fqdn),
dns.rdataclass.IN,
dns.rdatatype.RRSIG,
dns.rdatatype.NSEC,
)
if not has_nsec:
isctest.log.debug(
f"missing apex {fqdn} NSEC record in response from {addr}"
)
if not has_rrsig:
isctest.log.debug(
f"missing {fqdn} NSEC signature in response from {addr}"
)
signed = has_nsec and has_rrsig
if signed:
break
time.sleep(1)
assert signed
def check_keys(zone, keys, expected):
"""
Checks keys for a configured zone. This verifies:
1. The expected number of keys exist in 'keys'.
2. The keys match the expected properties.
"""
def _verify_keys():
# check number of keys matches expected.
if len(keys) != len(expected):
return False
if len(keys) == 0:
return True
for expect in expected:
expect.key = None
for key in keys:
found = False
i = 0
while not found and i < len(expected):
if expected[i].key is None:
found = key.match_properties(zone, expected[i])
if found:
key.external = expected[i].properties["legacy"]
expected[i].key = key
i += 1
if not found:
return False
return True
isctest.run.retry_with_timeout(_verify_keys, timeout=10)
def check_keytimes(keys, expected):
"""
Check the key timing metadata for all keys in 'keys'.
"""
assert len(keys) == len(expected)
if len(keys) == 0:
return
for key in keys:
for expect in expected:
if expect.properties["legacy"]:
continue
if not key is expect.key:
continue
synonyms = {}
if "Generated" in expect.timing:
synonyms["Created"] = expect.timing["Generated"]
if "Published" in expect.timing:
synonyms["Publish"] = expect.timing["Published"]
if "PublishCDS" in expect.timing:
synonyms["SyncPublish"] = expect.timing["PublishCDS"]
if "Active" in expect.timing:
synonyms["Activate"] = expect.timing["Active"]
if "Retired" in expect.timing:
synonyms["Inactive"] = expect.timing["Retired"]
if "DeleteCDS" in expect.timing:
synonyms["SyncDelete"] = expect.timing["DeleteCDS"]
if "Revoked" in expect.timing:
synonyms["Revoked"] = expect.timing["Revoked"]
if "Removed" in expect.timing:
synonyms["Delete"] = expect.timing["Removed"]
assert key.match_timingmetadata(synonyms, file=key.keyfile, comment=True)
if expect.properties["private"]:
assert key.match_timingmetadata(synonyms, file=key.privatefile)
if not expect.properties["legacy"]:
assert key.match_timingmetadata(expect.timing)
state_changes = [
"DNSKEYChange",
"KRRSIGChange",
"ZRRSIGChange",
"DSChange",
]
for change in state_changes:
assert key.is_metadata_consistent(
change, expect.timing, checkval=False
)
def check_keyrelationships(keys, expected):
"""
Check the key relationships (Successor and Predecessor metadata).
"""
for key in keys:
for expect in expected:
if expect.properties["legacy"]:
continue
if not key is expect.key:
continue
relationship_status = ["Predecessor", "Successor"]
for status in relationship_status:
assert key.is_metadata_consistent(status, expect.metadata)
def check_dnssec_verify(server, zone, tsig=None):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."
verified = False
for _ in range(10):
transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig)
if not isinstance(transfer, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
elif transfer.rcode() != dns.rcode.NOERROR:
rcode = dns.rcode.to_text(transfer.rcode())
isctest.log.debug(f"{rcode} response for {fqdn} AXFR from {server.ip}")
else:
zonefile = f"{zone}.axfr"
with open(zonefile, "w", encoding="utf-8") as file:
for rr in transfer.answer:
file.write(rr.to_text())
file.write("\n")
try:
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
verified = isctest.run.cmd(verify_command)
except subprocess.CalledProcessError:
pass
if verified:
break
time.sleep(1)
assert verified
def check_dnssecstatus(server, zone, keys, policy=None, view=None):
# Call rndc dnssec -status on 'server' for 'zone'. Expect 'policy' in
# the output. This is a loose verification, it just tests if the right
# policy name is returned, and if all expected keys are listed.
response = ""
if view is None:
response = server.rndc(f"dnssec -status {zone}", log=False)
else:
response = server.rndc(f"dnssec -status {zone} in {view}", log=False)
if policy is None:
assert "Zone does not have dnssec-policy" in response
return
assert f"dnssec-policy: {policy}" in response
for key in keys:
assert f"key: {key.tag}" in response
def _check_signatures(
signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False
):
numsigs = 0
zrrsig = True
if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]:
zrrsig = False
krrsig = not zrrsig
for key in keys:
ksigning, zsigning = key.get_signing_state(
offline_ksk=offline_ksk, zsk_missing=zsk_missing
)
alg = key.get_metadata("Algorithm")
rtype = dns.rdatatype.to_text(covers)
expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}"
if zrrsig and zsigning:
has_rrsig = False
for rrsig in signatures:
if re.search(expect, rrsig) is not None:
has_rrsig = True
break
assert has_rrsig, f"Expected signature but not found: {expect}"
numsigs += 1
if zrrsig and not zsigning:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
if krrsig and ksigning:
has_rrsig = False
for rrsig in signatures:
if re.search(expect, rrsig) is not None:
has_rrsig = True
break
assert has_rrsig, f"Expected signature but not found: {expect}"
numsigs += 1
if krrsig and not ksigning:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
return numsigs
def check_signatures(
rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False
):
# Check if signatures with covering type are signed with the right keys.
# The right keys are the ones that expect a signature and have the
# correct role.
numsigs = 0
signatures = []
for rr in rrset:
for rdata in rr:
rdclass = dns.rdataclass.to_text(rr.rdclass)
rdtype = dns.rdatatype.to_text(rr.rdtype)
rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
signatures.append(rrsig)
numsigs += _check_signatures(
signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
)
numsigs += _check_signatures(
signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
)
assert numsigs == len(signatures)
def _check_dnskeys(dnskeys, keys, cdnskey=False):
now = KeyTimingMetadata.now()
numkeys = 0
publish_md = "Publish"
delete_md = "Delete"
if cdnskey:
publish_md = f"Sync{publish_md}"
delete_md = f"Sync{delete_md}"
for key in keys:
publish = key.get_timing(publish_md, must_exist=False)
delete = key.get_timing(delete_md, must_exist=False)
published = publish is not None and now >= publish
removed = delete is not None and delete <= now
if not published or removed:
for dnskey in dnskeys:
assert not key.dnskey_equals(dnskey, cdnskey=cdnskey)
continue
has_dnskey = False
for dnskey in dnskeys:
if key.dnskey_equals(dnskey, cdnskey=cdnskey):
has_dnskey = True
break
if not cdnskey:
assert has_dnskey
if has_dnskey:
numkeys += 1
return numkeys
def check_dnskeys(rrset, ksks, zsks, cdnskey=False):
# Check if the correct DNSKEY records are published. If the current time
# is between the timing metadata 'publish' and 'delete', the key must have
# a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY
# records instead.
numkeys = 0
dnskeys = []
for rr in rrset:
for rdata in rr:
rdclass = dns.rdataclass.to_text(rr.rdclass)
rdtype = dns.rdatatype.to_text(rr.rdtype)
dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
dnskeys.append(dnskey)
numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey)
if not cdnskey:
numkeys += _check_dnskeys(dnskeys, zsks)
assert numkeys == len(dnskeys)
def check_cds(rrset, keys):
# Check if the correct CDS records are published. If the current time
# is between the timing metadata 'publish' and 'delete', the key must have
# a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY
# records instead.
now = KeyTimingMetadata.now()
numcds = 0
cdss = []
for rr in rrset:
for rdata in rr:
rdclass = dns.rdataclass.to_text(rr.rdclass)
rdtype = dns.rdatatype.to_text(rr.rdtype)
cds = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
cdss.append(cds)
for key in keys:
assert key.is_ksk()
publish = key.get_timing("SyncPublish")
delete = key.get_timing("SyncDelete", must_exist=False)
published = now >= publish
removed = delete is not None and delete <= now
if not published or removed:
for cds in cdss:
assert not key.cds_equals(cds, "SHA-256")
continue
has_cds = False
for cds in cdss:
if key.cds_equals(cds, "SHA-256"):
has_cds = True
break
assert has_cds
numcds += 1
assert numcds == len(cdss)
def _query_rrset(server, fqdn, qtype, tsig=None):
response = _query(server, fqdn, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
rrs = []
rrsigs = []
for rrset in response.answer:
if rrset.match(
dns.name.from_text(fqdn), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype
):
rrsigs.append(rrset)
elif rrset.match(
dns.name.from_text(fqdn), dns.rdataclass.IN, qtype, dns.rdatatype.NONE
):
rrs.append(rrset)
else:
assert False
return rrs, rrsigs
def check_apex(
server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None
):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
check_dnskeys(dnskeys, ksks, zsks)
check_signatures(
rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
check_signatures(
rrsigs,
dns.rdatatype.SOA,
fqdn,
ksks,
zsks,
offline_ksk=offline_ksk,
zsk_missing=zsk_missing,
)
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
check_signatures(
rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
check_signatures(
rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
qtype = dns.rdatatype.A
response = _query(server, qname, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"
rrsigs = []
for rrset in response.answer:
if rrset.match(
dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype
):
rrsigs.append(rrset)
else:
assert match in rrset.to_text()
check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk)
def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None):
"""
Test an RRset below the apex and verify it is updated and signed correctly.
"""
response = _query(server, qname, qtype, tsig=tsig)
if response.rcode() != dns.rcode.NOERROR:
return False
rrtype = dns.rdatatype.to_text(qtype)
match = f"{qname} {DEFAULT_TTL} IN {rrtype} {rdata}"
rrsigs = []
for rrset in response.answer:
if rrset.match(
dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype
):
rrsigs.append(rrset)
elif not match in rrset.to_text():
return False
if len(rrsigs) == 0:
return False
# Zone is updated, ready to verify the signatures.
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
return True
def verify_rrsig_is_refreshed(
server, fqdn, zonefile, qname, qtype, ksks, zsks, tsig=None
):
"""
Verify signature for RRset has been refreshed.
"""
response = _query(server, qname, qtype, tsig=tsig)
if response.rcode() != dns.rcode.NOERROR:
return False
rrtype = dns.rdatatype.to_text(qtype)
match = f"{qname}. {DEFAULT_TTL} IN {rrtype}"
rrsigs = []
for rrset in response.answer:
if rrset.match(
dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype
):
rrsigs.append(rrset)
elif not match in rrset.to_text():
return False
if len(rrsigs) == 0:
return False
tmp_zonefile = f"{zonefile}.tmp"
isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
"-q",
"-o",
tmp_zonefile,
"-f",
"raw",
fqdn,
zonefile,
],
)
zone = dns.zone.from_file(tmp_zonefile, fqdn)
for rrsig in rrsigs:
if isctest.util.zone_contains(zone, rrsig):
return False
# Zone is updated, ready to verify the signatures.
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
return True
def verify_rrsig_is_reused(server, fqdn, zonefile, qname, qtype, ksks, zsks, tsig=None):
"""
Verify signature for RRset has been reused.
"""
response = _query(server, qname, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
rrtype = dns.rdatatype.to_text(qtype)
match = f"{qname}. {DEFAULT_TTL} IN {rrtype}"
rrsigs = []
for rrset in response.answer:
if rrset.match(
dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype
):
rrsigs.append(rrset)
else:
assert match in rrset.to_text()
tmp_zonefile = f"{zonefile}.tmp"
isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
"-q",
"-o",
tmp_zonefile,
"-f",
"raw",
fqdn,
zonefile,
],
)
zone = dns.zone.from_file(tmp_zonefile, dns.name.from_text(fqdn), relativize=False)
for rrsig in rrsigs:
assert isctest.util.zone_contains(zone, rrsig)
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
def next_key_event_equals(server, zone, next_event):
if next_event is None:
# No next key event check.
return True
val = int(next_event.total_seconds())
if val == 3600:
waitfor = rf".*zone {zone}.*: next key event in (.*) seconds"
else:
# Don't want default loadkeys interval.
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
with server.watch_log_from_start() as watcher:
watcher.wait_for_line(re.compile(waitfor))
# WMM: The with code below is extracting the line the watcher was
# waiting for. If WatchLog.wait_for_line()` returned the matched string,
# we can use it directly on `re.match`.
next_found = False
minval = val - NEXT_KEY_EVENT_THRESHOLD
maxval = val + NEXT_KEY_EVENT_THRESHOLD
with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp:
for line in fp:
match = re.match(waitfor, line)
if match is not None:
nextval = int(match.group(1))
if minval <= nextval <= maxval:
next_found = True
break
isctest.log.debug(
f"check next key event: expected {val} in: {line.strip()}"
)
return next_found
def keydir_to_keylist(
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
) -> List[Key]:
"""
Retrieve all keys from the key files in a directory. If 'zone' is None,
retrieve all keys in the directory, otherwise only those matching the
zone name. If 'keydir' is None, search the current directory.
"""
if zone is None:
zone = ""
all_keys = []
if keydir is None:
regex = rf"(K{zone}\.\+.*\+.*)\.key"
for filename in glob.glob(f"K{zone}.+*+*.key"):
match = re.match(regex, filename)
if match is not None:
all_keys.append(Key(match.group(1)))
else:
regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key"
for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"):
match = re.match(regex, filename)
if match is not None:
all_keys.append(Key(match.group(1), keydir))
states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"]
def used(kk):
if not in_use:
return True
for state in states:
val = kk.get_metadata(state, must_exist=False)
if val not in ["undefined", "hidden"]:
isctest.log.debug(f"key {kk} in use")
return True
return False
return [k for k in all_keys if used(k)]
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
return [Key(name, keydir) for name in keystr.split()]
def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
"""
Get the policies from a list of specially formatted strings.
The splitted line should result in the following items:
line[0]: Role
line[1]: Lifetime
line[2]: Algorithm
line[3]: Length
Then, optional data for specific tests may follow:
- "goal", "dnskey", "krrsig", "zrrsig", "ds", followed by a value,
sets the given state to the specific value
- "missing", set if the private key file for this key is not available.
- "offset", an offset for testing key rollover timings
"""
proplist = []
count = 0
for key in keys:
count += 1
line = key.split()
keyprop = KeyProperties(f"KEY{count}", {}, {}, {})
keyprop.properties["expect"] = True
keyprop.properties["private"] = True
keyprop.properties["legacy"] = False
keyprop.properties["offset"] = timedelta(0)
keyprop.properties["role"] = line[0]
if line[0] == "zsk":
keyprop.properties["role_full"] = "zone-signing"
keyprop.properties["flags"] = 256
keyprop.metadata["ZSK"] = "yes"
keyprop.metadata["KSK"] = "no"
else:
keyprop.properties["role_full"] = "key-signing"
keyprop.properties["flags"] = 257
keyprop.metadata["ZSK"] = "yes" if line[0] == "csk" else "no"
keyprop.metadata["KSK"] = "yes"
keyprop.properties["dnskey_ttl"] = ttl
keyprop.metadata["Algorithm"] = line[2]
keyprop.metadata["Length"] = line[3]
keyprop.metadata["Lifetime"] = 0
if line[1] != "unlimited":
keyprop.metadata["Lifetime"] = int(line[1])
for i in range(4, len(line)):
if line[i].startswith("goal:"):
keyval = line[i].split(":")
keyprop.metadata["GoalState"] = keyval[1]
elif line[i].startswith("dnskey:"):
keyval = line[i].split(":")
keyprop.metadata["DNSKEYState"] = keyval[1]
elif line[i].startswith("krrsig:"):
keyval = line[i].split(":")
keyprop.metadata["KRRSIGState"] = keyval[1]
elif line[i].startswith("zrrsig:"):
keyval = line[i].split(":")
keyprop.metadata["ZRRSIGState"] = keyval[1]
elif line[i].startswith("ds:"):
keyval = line[i].split(":")
keyprop.metadata["DSState"] = keyval[1]
elif line[i].startswith("offset:"):
keyval = line[i].split(":")
keyprop.properties["offset"] = timedelta(seconds=int(keyval[1]))
elif line[i] == "missing":
keyprop.properties["private"] = False
else:
assert False, f"undefined optional data {line[i]}"
proplist.append(keyprop)
return proplist