diff options
Diffstat (limited to 'tests/deckard/sets/resolver/rfc5011/dns2rpl.py')
-rwxr-xr-x | tests/deckard/sets/resolver/rfc5011/dns2rpl.py | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/tests/deckard/sets/resolver/rfc5011/dns2rpl.py b/tests/deckard/sets/resolver/rfc5011/dns2rpl.py new file mode 100755 index 0000000..5a12743 --- /dev/null +++ b/tests/deckard/sets/resolver/rfc5011/dns2rpl.py @@ -0,0 +1,182 @@ +#!/usr/bin/python3 +""" +Generate RFC 5011 test simulating succesfull KSK roll-over in 2017. + +Depedencies: Knot DNS server + Deckard library. +Environment: Set PYTHONPATH variable so "import pydnstest" will use module from Deckard. +Input: Root zone files, presumably created by genkeyszones.sh. +Output: RPL file for Deckard on standard output. +""" + +import copy +import datetime +import os.path +import subprocess +import time + +import dns.resolver + +import pydnstest.scenario + + +def store_answer(qname, qtype, template): + answ = dns.resolver.query(qname, qtype, raise_on_no_answer=False) + entr = copy.copy(template) + entr.message = answ.response + return entr + + +def resolver_init(): + """ + Configure dns.resolver to ask ::1@5353 with EDNS0 DO set. + """ + dns.resolver.reset_default_resolver() + dns.resolver.default_resolver.use_edns(0, dns.flags.DO, 4096) + dns.resolver.default_resolver.nameservers = ['::1'] + dns.resolver.default_resolver.nameserver_ports = {'::1': 5353} + dns.resolver.default_resolver.flags = 0 + + +def get_templates(): + """ + Return empty objects for RANGE and ENTRY suitable as object templates. + """ + empty_case, _ = pydnstest.scenario.parse_file(os.path.realpath('empty.rpl')) + + rng = copy.copy(empty_case.ranges[0]) + + entry = copy.copy(rng.stored[0]) + entry.adjust_fields = ['copy_id'] + entry.match_fields = ['opcode', 'question'] + + rng.addresses = {'198.41.0.4', '2001:503:ba3e::2:30'} + rng.stored = [] + + return rng, entry + + +def generate_range(filename, rng_templ, entry_templ): + """ + Run Knot DNS server with specified zone file and generate RANGE object. + """ + assert filename.startswith('20') + assert filename.endswith('.db') + try: + os.unlink('root.db') + except FileNotFoundError: + pass + os.link(filename, 'root.db') + + # run server + knotd = subprocess.Popen(['/usr/local/sbin/knotd', '-c', 'knot.root.conf']) + time.sleep(0.1) # give kresd time to start so we do not wait for first timeout + + # query data + rng = copy.copy(rng_templ) + rng.stored = [] + rng.stored.append(store_answer('.', 'SOA', entry_templ)) + rng.stored.append(store_answer('.', 'DNSKEY', entry_templ)) + rng.stored.append(store_answer('.', 'NS', entry_templ)) + rng.stored.append(store_answer('rootns.', 'NS', entry_templ)) + rng.stored.append(store_answer('rootns.', 'A', entry_templ)) + rng.stored.append(store_answer('rootns.', 'AAAA', entry_templ)) + rng.stored.append(store_answer('test.', 'TXT', entry_templ)) + rng.a = int(filename[:-len('.db')]) + + # kill server + knotd.kill() + + return rng + + +def generate_step_query(tcurr, id_prefix): + out = '; {0}'.format(tcurr.isoformat()) + out += ''' +STEP {0}000000 QUERY +ENTRY_BEGIN +REPLY RD AD +SECTION QUESTION +test. IN TXT +ENTRY_END +'''.format(id_prefix) + return out + + +def generate_step_check(id_prefix): + return '''STEP {0}000001 CHECK_ANSWER +ENTRY_BEGIN +REPLY QR RD RA AD +MATCH opcode rcode flags question answer +SECTION QUESTION +test. IN TXT +SECTION ANSWER +test. IN TXT "it works" +ENTRY_END +'''.format(id_prefix) + + +def generate_step_elapse(tstep, id_prefix): + out = '; move time by {0}\n'.format(tstep) + out += '''STEP {0}000099 TIME_PASSES ELAPSE {1}\n\n'''.format( + id_prefix, int(tstep.total_seconds())) + return out + + +def main(): + resolver_init() + rng_templ, entry_templ = get_templates() + ranges = [] + + # transform data in zones files into RANGEs + files = os.listdir() + files.sort() + for fn in files: + if not fn.endswith('.db') or not fn.startswith('20'): + continue + ranges.append(generate_range(fn, rng_templ, entry_templ)) + + # connect ranges + for i in range(1, len(ranges)): + ranges[i - 1].b = ranges[i].a - 1 + ranges[-1].b = 99999999999999 + + # steps + steps = [] + tstart = datetime.datetime(year=2017, month=7, day=1) + tend = datetime.datetime(year=2017, month=12, day=31, hour=23, minute=59, second=59) + tstep = datetime.timedelta(days=1) + tcurr = tstart + while tcurr < tend: + id_prefix = tcurr.strftime('%Y%m%d') + steps.append(generate_step_query(tcurr, id_prefix)) + steps.append(generate_step_check(id_prefix)) + steps.append(generate_step_elapse(tstep, id_prefix)) + tcurr += tstep + + # generate output + with open('keys/ds') as dsfile: + ta = dsfile.read().strip() + + # constant RPL file header + print("""stub-addr: 2001:503:ba3e::2:30 + trust-anchor: {ta} + val-override-date: 20170701000000 + query-minimization: off + CONFIG_END + + SCENARIO_BEGIN Simulation of successfull RFC 5011 KSK roll-over during 2017 + """.format(ta=ta)) + for rng in ranges: + print(rng) + + for step in steps: + print(step) + + # constant RPL file footer + print(''' + SCENARIO_END + ''') + + +if __name__ == '__main__': + main() |