diff options
Diffstat (limited to 'modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py')
-rwxr-xr-x | modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py b/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py new file mode 100755 index 0000000..317d671 --- /dev/null +++ b/modules/ta_update/ta_update.test.integr/rfc5011/dns2rpl.py @@ -0,0 +1,222 @@ +#!/usr/bin/python3 +""" +Generate RFC 5011 test simulating successful KSK roll-over in 2017. + +Dependencies: Knot DNS server + Deckard library. +Environment: Set PYTHONPATH variable so "import pydnstest" will use module from Deckard. +Input: Root zone files, presumably created by genkeyszones.sh. +Output: RPL file for Deckard on standard output. +""" + +import copy +import datetime +import os.path +import subprocess +import time + +import dns.resolver + +import pydnstest.scenario + +try: + VARIANT = os.environ["VARIANT"] +except KeyError: + VARIANT = "" + +def store_answer(qname, qtype, template): + answ = dns.resolver.query(qname, qtype, raise_on_no_answer=False) + entr = copy.copy(template) + entr.message = answ.response + return entr + + +def resolver_init(): + """ + Configure dns.resolver to ask ::1@5353 with EDNS0 DO set. + """ + dns.resolver.reset_default_resolver() + dns.resolver.default_resolver.use_edns(0, dns.flags.DO, 4096) + dns.resolver.default_resolver.nameservers = ['::1'] + dns.resolver.default_resolver.nameserver_ports = {'::1': 5353} + dns.resolver.default_resolver.flags = 0 + + +def get_templates(): + """ + Return empty objects for RANGE and ENTRY suitable as object templates. + """ + empty_case, _ = pydnstest.scenario.parse_file(os.path.realpath('empty.rpl')) + + rng = copy.copy(empty_case.ranges[0]) + + entry = copy.copy(rng.stored[0]) + entry.adjust_fields = ['copy_id'] + entry.match_fields = ['opcode', 'question'] + + rng.addresses = {'198.41.0.4', '2001:503:ba3e::2:30'} + rng.stored = [] + + return rng, entry + + +def generate_range(filename, rng_templ, entry_templ): + """ + Run Knot DNS server with specified zone file and generate RANGE object. + """ + assert filename.startswith('20') + assert filename.endswith('.db') + try: + os.unlink('root.db') + except FileNotFoundError: + pass + os.link(filename, 'root.db') + + # run server + knotd = subprocess.Popen(['knotd', '-c', 'knot.root.conf', '-s', '/tmp/knot-dns2rpl.sock']) + time.sleep(0.1) # give kresd time to start so we do not wait for first timeout + + # query data + rng = copy.copy(rng_templ) + rng.stored = [] + rng.stored.append(store_answer('.', 'SOA', entry_templ)) + rng.stored.append(store_answer('.', 'DNSKEY', entry_templ)) + rng.stored.append(store_answer('.', 'NS', entry_templ)) + rng.stored.append(store_answer('rootns.', 'NS', entry_templ)) + rng.stored.append(store_answer('rootns.', 'A', entry_templ)) + rng.stored.append(store_answer('rootns.', 'AAAA', entry_templ)) + rng.stored.append(store_answer('test.', 'TXT', entry_templ)) + rng.a = int(filename[:-len('.db')]) + + # kill server + knotd.kill() + + return rng + + +def generate_step_query(tcurr, id_prefix): + out = '; {0}'.format(tcurr.isoformat()) + out += ''' +STEP {0}000000 QUERY +ENTRY_BEGIN +REPLY RD AD +SECTION QUESTION +test. IN TXT +ENTRY_END +'''.format(id_prefix) + return out + + +def generate_step_check(id_prefix): + return '''STEP {0}000001 CHECK_ANSWER +ENTRY_BEGIN +REPLY QR RD RA AD +MATCH opcode rcode flags question answer +SECTION QUESTION +test. IN TXT +SECTION ANSWER +test. IN TXT "it works" +ENTRY_END +'''.format(id_prefix) + +def generate_step_nocheck(id_prefix): + return '''STEP {0}000001 CHECK_ANSWER +ENTRY_BEGIN +REPLY QR RD RA AD +MATCH opcode qname question +SECTION QUESTION +test. IN TXT +SECTION ANSWER +test. IN TXT "it works" +ENTRY_END +'''.format(id_prefix) + +def generate_step_finish_msg(id_prefix): + return '''STEP {0}000001 CHECK_ANSWER +ENTRY_BEGIN +REPLY QR RD RA AA NXDOMAIN +MATCH opcode rcode flags question answer +SECTION QUESTION +test. IN TXT +SECTION AUTHORITY +test. 10800 IN SOA test. nobody.invalid. 1 3600 1200 604800 10800 +SECTION ADDITIONAL +explanation.invalid. 10800 IN TXT "check last answer" +ENTRY_END +'''.format(id_prefix) + +def generate_step_elapse(tstep, id_prefix): + out = '; move time by {0}\n'.format(tstep) + out += '''STEP {0}000099 TIME_PASSES ELAPSE {1}\n\n'''.format( + id_prefix, int(tstep.total_seconds())) + return out + + +def main(): + resolver_init() + rng_templ, entry_templ = get_templates() + ranges = [] + check_last_msg = False + + # transform data in zones files into RANGEs + files = os.listdir() + files.sort() + for fn in files: + if not fn.endswith('.db') or not fn.startswith('20'): + continue + ranges.append(generate_range(fn, rng_templ, entry_templ)) + + # connect ranges + for i in range(1, len(ranges)): + ranges[i - 1].b = ranges[i].a - 1 + ranges[-1].b = 99999999999999 + + # steps + steps = [] + tstart = datetime.datetime(year=2017, month=7, day=1) + if VARIANT == "unmanaged_key": + tend = datetime.datetime(year=2017, month=7, day=21, hour=23, minute=59, second=59) + check_last_msg = True + else: + tend = datetime.datetime(year=2017, month=12, day=31, hour=23, minute=59, second=59) + tstep = datetime.timedelta(days=1) + tcurr = tstart + while tcurr < tend: + id_prefix = tcurr.strftime('%Y%m%d') + steps.append(generate_step_query(tcurr, id_prefix)) + if (check_last_msg is True and tcurr + tstep > tend): + steps.append(generate_step_finish_msg(id_prefix)) + elif VARIANT == "unmanaged_key": + steps.append(generate_step_nocheck(id_prefix)) + else: + steps.append(generate_step_check(id_prefix)) + steps.append(generate_step_elapse(tstep, id_prefix)) + tcurr += tstep + + # generate output + with open('keys/ds') as dsfile: + tas = dsfile.read().strip() + + # constant RPL file header + print("stub-addr: 2001:503:ba3e::2:30") + for ta in tas.split('\n'): + print ("trust-anchor: " + ta) + print("""val-override-date: 20170701000000 +query-minimization: off +CONFIG_END + +SCENARIO_BEGIN Simulation of successful RFC 5011 KSK roll-over during 2017 + """.format(ta=ta)) + for rng in ranges: + print(rng) + + for step in steps: + print(step) + + # constant RPL file footer + print(''' +SCENARIO_END + ''') + + +if __name__ == '__main__': + main() |