summaryrefslogtreecommitdiffstats
path: root/tests/integration/deckard/tools/invalid_dsa.py
blob: fd1c845ea856dfbca6f9c72dd18ab12ac7c4fff3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Returns 1 if there is a DNSSEC DSA signature which is not 41 bytes long.\
0 otherwise.
"""

import os
import sys
import argparse
import dns
import pydnstest
import pydnstest.scenario
import pydnstest.augwrap


def parse(test):
    """ Parse the test"""
    _, config = pydnstest.scenario.parse_file(os.path.realpath(test))
    aug = pydnstest.augwrap.AugeasWrapper(
        confpath=os.path.realpath(test),
        lens='Deckard', loadpath="../pydnstest")
    node = aug.tree
    return config, node


def get_dsakeys(config, node):
    """ Make list of all DSA keys in the test"""
    dsakeys = []
    for conf in config:
        if conf[0] == "trust-anchor":
            conf[1] = conf[1][1:-1]
            trust_anchor = conf[1].split()
            for i, word in enumerate(trust_anchor):
                if word == "DS":
                    algorithm = trust_anchor[i + 2]
                    if algorithm in ("3", "DSA"):
                        dsakeys.append(trust_anchor[i + 1])

    for entry in node.match("/scenario/range/entry"):
        records = list(entry.match("/section/answer/record"))
        records.extend(list(entry.match("/section/authority/record")))
        records.extend(list(entry.match("/section/additional/record")))

        for record in records:
            if record["/type"].value == "DS":
                if record["/data"].value[1] in ["3", "DSA"]:
                    dsakeys.append(record["/data"].value[2])
    return dsakeys


def check_rrsig(node, dsakeys):
    """ Find records with wrong lenght of rrsig"""
    for key in dsakeys:  # pylint: disable=too-many-nested-blocks
        for entry in node.match("/scenario/range/entry"):
            records = list(entry.match("/section/answer/record"))
            records.extend(list(entry.match("/section/authority/record")))
            records.extend(list(entry.match("/section/additional/record")))

            for record in records:
                if record["/type"].value == "RRSIG":
                    rrset = dns.rrset.from_text(record["/domain"].value, 300,
                                                1, dns.rdatatype.RRSIG,
                                                record["/data"].value)
                    if rrset.items[0].key_tag == int(key):
                        if len(rrset.items[0].signature) != 41:
                            return True
    return False


def main():
    """Returns 1 if there is a DNSSEC DSA signature which is not 41 bytes long. \
    0 otherwise."""
    argparser = argparse.ArgumentParser()
    argparser.add_argument("file")
    args = argparser.parse_args()
    config, node = parse(args.file)
    dsakeys = get_dsakeys(config, node)
    bad_rrsig = check_rrsig(node, dsakeys)
    if bad_rrsig:
        sys.exit(1)
    else:
        sys.exit(0)


main()