summaryrefslogtreecommitdiffstats
path: root/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 15:26:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 15:26:00 +0000
commit830407e88f9d40d954356c3754f2647f91d5c06a (patch)
treed6a0ece6feea91f3c656166dbaa884ef8a29740e /modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py
parentInitial commit. (diff)
downloadknot-resolver-upstream/5.6.0.tar.xz
knot-resolver-upstream/5.6.0.zip
Adding upstream version 5.6.0.upstream/5.6.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py')
-rwxr-xr-xmodules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py222
1 files changed, 222 insertions, 0 deletions
diff --git a/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py b/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py
new file mode 100755
index 0000000..317d671
--- /dev/null
+++ b/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py
@@ -0,0 +1,222 @@
+#!/usr/bin/python3
+"""
+Generate RFC 5011 test simulating successful KSK roll-over in 2017.
+
+Dependencies: Knot DNS server + Deckard library.
+Environment: Set PYTHONPATH variable so "import pydnstest" will use module from Deckard.
+Input: Root zone files, presumably created by genkeyszones.sh.
+Output: RPL file for Deckard on standard output.
+"""
+
+import copy
+import datetime
+import os.path
+import subprocess
+import time
+
+import dns.resolver
+
+import pydnstest.scenario
+
+try:
+ VARIANT = os.environ["VARIANT"]
+except KeyError:
+ VARIANT = ""
+
+def store_answer(qname, qtype, template):
+ answ = dns.resolver.query(qname, qtype, raise_on_no_answer=False)
+ entr = copy.copy(template)
+ entr.message = answ.response
+ return entr
+
+
+def resolver_init():
+ """
+ Configure dns.resolver to ask ::1@5353 with EDNS0 DO set.
+ """
+ dns.resolver.reset_default_resolver()
+ dns.resolver.default_resolver.use_edns(0, dns.flags.DO, 4096)
+ dns.resolver.default_resolver.nameservers = ['::1']
+ dns.resolver.default_resolver.nameserver_ports = {'::1': 5353}
+ dns.resolver.default_resolver.flags = 0
+
+
+def get_templates():
+ """
+ Return empty objects for RANGE and ENTRY suitable as object templates.
+ """
+ empty_case, _ = pydnstest.scenario.parse_file(os.path.realpath('empty.rpl'))
+
+ rng = copy.copy(empty_case.ranges[0])
+
+ entry = copy.copy(rng.stored[0])
+ entry.adjust_fields = ['copy_id']
+ entry.match_fields = ['opcode', 'question']
+
+ rng.addresses = {'198.41.0.4', '2001:503:ba3e::2:30'}
+ rng.stored = []
+
+ return rng, entry
+
+
+def generate_range(filename, rng_templ, entry_templ):
+ """
+ Run Knot DNS server with specified zone file and generate RANGE object.
+ """
+ assert filename.startswith('20')
+ assert filename.endswith('.db')
+ try:
+ os.unlink('root.db')
+ except FileNotFoundError:
+ pass
+ os.link(filename, 'root.db')
+
+ # run server
+ knotd = subprocess.Popen(['knotd', '-c', 'knot.root.conf', '-s', '/tmp/knot-dns2rpl.sock'])
+ time.sleep(0.1) # give kresd time to start so we do not wait for first timeout
+
+ # query data
+ rng = copy.copy(rng_templ)
+ rng.stored = []
+ rng.stored.append(store_answer('.', 'SOA', entry_templ))
+ rng.stored.append(store_answer('.', 'DNSKEY', entry_templ))
+ rng.stored.append(store_answer('.', 'NS', entry_templ))
+ rng.stored.append(store_answer('rootns.', 'NS', entry_templ))
+ rng.stored.append(store_answer('rootns.', 'A', entry_templ))
+ rng.stored.append(store_answer('rootns.', 'AAAA', entry_templ))
+ rng.stored.append(store_answer('test.', 'TXT', entry_templ))
+ rng.a = int(filename[:-len('.db')])
+
+ # kill server
+ knotd.kill()
+
+ return rng
+
+
+def generate_step_query(tcurr, id_prefix):
+ out = '; {0}'.format(tcurr.isoformat())
+ out += '''
+STEP {0}000000 QUERY
+ENTRY_BEGIN
+REPLY RD AD
+SECTION QUESTION
+test. IN TXT
+ENTRY_END
+'''.format(id_prefix)
+ return out
+
+
+def generate_step_check(id_prefix):
+ return '''STEP {0}000001 CHECK_ANSWER
+ENTRY_BEGIN
+REPLY QR RD RA AD
+MATCH opcode rcode flags question answer
+SECTION QUESTION
+test. IN TXT
+SECTION ANSWER
+test. IN TXT "it works"
+ENTRY_END
+'''.format(id_prefix)
+
+def generate_step_nocheck(id_prefix):
+ return '''STEP {0}000001 CHECK_ANSWER
+ENTRY_BEGIN
+REPLY QR RD RA AD
+MATCH opcode qname question
+SECTION QUESTION
+test. IN TXT
+SECTION ANSWER
+test. IN TXT "it works"
+ENTRY_END
+'''.format(id_prefix)
+
+def generate_step_finish_msg(id_prefix):
+ return '''STEP {0}000001 CHECK_ANSWER
+ENTRY_BEGIN
+REPLY QR RD RA AA NXDOMAIN
+MATCH opcode rcode flags question answer
+SECTION QUESTION
+test. IN TXT
+SECTION AUTHORITY
+test. 10800 IN SOA test. nobody.invalid. 1 3600 1200 604800 10800
+SECTION ADDITIONAL
+explanation.invalid. 10800 IN TXT "check last answer"
+ENTRY_END
+'''.format(id_prefix)
+
+def generate_step_elapse(tstep, id_prefix):
+ out = '; move time by {0}\n'.format(tstep)
+ out += '''STEP {0}000099 TIME_PASSES ELAPSE {1}\n\n'''.format(
+ id_prefix, int(tstep.total_seconds()))
+ return out
+
+
+def main():
+ resolver_init()
+ rng_templ, entry_templ = get_templates()
+ ranges = []
+ check_last_msg = False
+
+ # transform data in zones files into RANGEs
+ files = os.listdir()
+ files.sort()
+ for fn in files:
+ if not fn.endswith('.db') or not fn.startswith('20'):
+ continue
+ ranges.append(generate_range(fn, rng_templ, entry_templ))
+
+ # connect ranges
+ for i in range(1, len(ranges)):
+ ranges[i - 1].b = ranges[i].a - 1
+ ranges[-1].b = 99999999999999
+
+ # steps
+ steps = []
+ tstart = datetime.datetime(year=2017, month=7, day=1)
+ if VARIANT == "unmanaged_key":
+ tend = datetime.datetime(year=2017, month=7, day=21, hour=23, minute=59, second=59)
+ check_last_msg = True
+ else:
+ tend = datetime.datetime(year=2017, month=12, day=31, hour=23, minute=59, second=59)
+ tstep = datetime.timedelta(days=1)
+ tcurr = tstart
+ while tcurr < tend:
+ id_prefix = tcurr.strftime('%Y%m%d')
+ steps.append(generate_step_query(tcurr, id_prefix))
+ if (check_last_msg is True and tcurr + tstep > tend):
+ steps.append(generate_step_finish_msg(id_prefix))
+ elif VARIANT == "unmanaged_key":
+ steps.append(generate_step_nocheck(id_prefix))
+ else:
+ steps.append(generate_step_check(id_prefix))
+ steps.append(generate_step_elapse(tstep, id_prefix))
+ tcurr += tstep
+
+ # generate output
+ with open('keys/ds') as dsfile:
+ tas = dsfile.read().strip()
+
+ # constant RPL file header
+ print("stub-addr: 2001:503:ba3e::2:30")
+ for ta in tas.split('\n'):
+ print ("trust-anchor: " + ta)
+ print("""val-override-date: 20170701000000
+query-minimization: off
+CONFIG_END
+
+SCENARIO_BEGIN Simulation of successful RFC 5011 KSK roll-over during 2017
+ """.format(ta=ta))
+ for rng in ranges:
+ print(rng)
+
+ for step in steps:
+ print(step)
+
+ # constant RPL file footer
+ print('''
+SCENARIO_END
+ ''')
+
+
+if __name__ == '__main__':
+ main()