diff options
Diffstat (limited to '')
-rwxr-xr-x | tests/integration/deckard/rplint.py | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/tests/integration/deckard/rplint.py b/tests/integration/deckard/rplint.py new file mode 100755 index 0000000..558045e --- /dev/null +++ b/tests/integration/deckard/rplint.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 + +import itertools +import os +import sys +from typing import Any, Callable, Iterable, Iterator, Optional, List, Union, Set # noqa + +import pydnstest.augwrap +import pydnstest.matchpart +import pydnstest.scenario + +Element = Union["Entry", "Step", pydnstest.scenario.Range] + +RCODES = {"NOERROR", "FORMERR", "SERVFAIL", "NXDOMAIN", "NOTIMP", "REFUSED", "YXDOMAIN", "YXRRSET", + "NXRRSET", "NOTAUTH", "NOTZONE", "BADVERS", "BADSIG", "BADKEY", "BADTIME", "BADMODE", + "BADNAME", "BADALG", "BADTRUNC", "BADCOOKIE"} +FLAGS = {"QR", "AA", "TC", "RD", "RA", "AD", "CD"} +SECTIONS = {"question", "answer", "authority", "additional"} + + +class RplintError(ValueError): + def __init__(self, fails): + msg = "" + for fail in fails: + msg += str(fail) + "\n" + super().__init__(msg) + + +def get_line_number(file: str, char_number: int) -> int: + pos = 0 + with open(file) as f: + for number, line in enumerate(f): + pos += len(line) + if pos >= char_number: + return number + 2 + return 0 + + +def is_empty(iterable: Iterator[Any]) -> bool: + try: + next(iterable) + except StopIteration: + return True + return False + + +class Entry: + def __init__(self, node: pydnstest.augwrap.AugeasNode) -> None: + self.match = {m.value for m in node.match("/match")} + self.adjust = {a.value for a in node.match("/adjust")} + self.answer = list(node.match("/section/answer/record")) + self.authority = list(node.match("/section/authority/record")) + self.additional = list(node.match("/section/additional/record")) + self.reply = {r.value for r in node.match("/reply")} + self.records = list(node.match("/section/*/record")) + self.node = node + + +class Step: + def __init__(self, node: pydnstest.augwrap.AugeasNode) -> None: + self.node = node + self.type = node["/type"].value + try: + self.entry = Entry(node["/entry"]) # type: Optional[Entry] + except KeyError: + self.entry = None + + +class RplintFail: + def __init__(self, test: "RplintTest", + element: Optional[Element] = None, + etc: str = "") -> None: + self.path = test.path + self.element = element # type: Optional[Element] + self.line = get_line_number(self.path, element.node.char if element is not None else 0) + self.etc = etc + self.check = None # type: Optional[Callable[[RplintTest], List[RplintFail]]] + + def __str__(self): + if self.etc: + return "{}:{} {}: {} ({})".format(os.path.basename(self.path), self.line, + self.check.__name__, self.check.__doc__, self.etc) + return "{}:{} {}: {}".format(os.path.basename(self.path), self.line, self.check.__name__, + self.check.__doc__) + + +class RplintTest: + def __init__(self, path: str) -> None: + aug = pydnstest.augwrap.AugeasWrapper(confpath=os.path.realpath(path), + lens='Deckard', + loadpath=os.path.join(os.path.dirname(__file__), + 'pydnstest')) + self.node = aug.tree + self.name = os.path.basename(path) + self.path = path + + _, self.config = pydnstest.scenario.parse_file(os.path.realpath(path)) + self.range_entries = [Entry(node) for node in self.node.match("/scenario/range/entry")] + self.steps = [Step(node) for node in self.node.match("/scenario/step")] + self.step_entries = [step.entry for step in self.steps if step.entry is not None] + self.entries = self.range_entries + self.step_entries + + self.ranges = [pydnstest.scenario.Range(n) for n in self.node.match("/scenario/range")] + + self.fails = None # type: Optional[List[RplintFail]] + self.checks = [ + entry_more_than_one_rcode, + entry_no_qname_qtype_copy_query, + # Commented out for now until we implement selective turning off of checks + # entry_ns_in_authority, + range_overlapping_ips, + range_shadowing_match_rules, + step_check_answer_no_match, + step_query_match, + step_query_sections, + step_query_qr, + step_section_unchecked, + step_unchecked_match, + step_unchecked_rcode, + scenario_ad_or_rrsig_no_ta, + scenario_timestamp, + config_trust_anchor_trailing_period_missing, + step_duplicate_id, + ] + + def run_checks(self) -> bool: + """returns True iff all tests passed""" + self.fails = [] + for check in self.checks: + fails = check(self) + for fail in fails: + fail.check = check + self.fails += fails + + if self.fails == []: + return True + return False + + def print_fails(self) -> None: + if self.fails is None: + raise RuntimeError("Maybe you should run some test first…") + for fail in self.fails: + print(fail) + + +def config_trust_anchor_trailing_period_missing(test: RplintTest) -> List[RplintFail]: + """Trust-anchor option in configuration contains domain without trailing period""" + for conf in test.config: + if conf[0] == "trust-anchor": + if conf[1].split()[0][-1] != ".": + return [RplintFail(test, etc=conf[1])] + return [] + + +def scenario_timestamp(test: RplintTest) -> List[RplintFail]: + """RRSSIG record present in test but no val-override-date or val-override-timestamp in config""" + rrsigs = [] + for entry in test.entries: + for record in entry.records: + if record["/type"].value == "RRSIG": + rrsigs.append(RplintFail(test, entry)) + if rrsigs: + for k in test.config: + if k[0] == "val-override-date" or k[0] == "val-override-timestamp": + return [] + return rrsigs + + +def entry_no_qname_qtype_copy_query(test: RplintTest) -> List[RplintFail]: + """ENTRY without qname and qtype in MATCH and without copy_query in ADJUST""" + fails = [] + for entry in test.range_entries: + if "question" not in entry.match and ("qname" not in entry.match or + "qtype" not in entry.match): + if "copy_query" not in entry.adjust: + fails.append(RplintFail(test, entry)) + return fails + + +def entry_ns_in_authority(test: RplintTest) -> List[RplintFail]: + """ENTRY has authority section with NS records, consider using MATCH subdomain""" + fails = [] + for entry in test.range_entries: + if entry.authority and "subdomain" not in entry.match: + for record in entry.authority: + if record["/type"].value == "NS": + fails.append(RplintFail(test, entry)) + return fails + + +def entry_more_than_one_rcode(test: RplintTest) -> List[RplintFail]: + """ENTRY has more than one rcode in MATCH""" + fails = [] + for entry in test.entries: + if len(RCODES & entry.reply) > 1: + fails.append(RplintFail(test, entry)) + return fails + + +def scenario_ad_or_rrsig_no_ta(test: RplintTest) -> List[RplintFail]: + """AD or RRSIG present in test but no trust-anchor present in config""" + dnssec = [] + for entry in test.entries: + if "AD" in entry.reply or "AD" in entry.match: + dnssec.append(RplintFail(test, entry)) + else: + for record in entry.records: + if record["/type"].value == "RRSIG": + dnssec.append(RplintFail(test, entry)) + + if dnssec: + for k in test.config: + if k[0] == "trust-anchor": + return [] + return dnssec + + +def step_query_match(test: RplintTest) -> List[RplintFail]: + """STEP QUERY has a MATCH rule""" + return [RplintFail(test, step) for step in test.steps if step.type == "QUERY" and + step.entry and step.entry.match] + + +def step_query_sections(test: RplintTest) -> List[RplintFail]: + """STEP QUERY has some records in sections other than QUESTION""" + return [RplintFail(test, step) for step in test.steps if step.type == "QUERY" and + step.entry and (step.entry.answer or step.entry.authority or step.entry.additional)] + + +def step_query_qr(test: RplintTest) -> List[RplintFail]: + """STEP QUERY specified QR=1 flag (i.e. message is an answer)""" + return [RplintFail(test, step) for step in test.steps if step.type == "QUERY" and + step.entry and step.entry.reply and 'QR' in step.entry.reply] + + +def step_check_answer_no_match(test: RplintTest) -> List[RplintFail]: + """ENTRY in STEP CHECK_ANSWER has no MATCH rule""" + return [RplintFail(test, step) for step in test.steps if step.type == "CHECK_ANSWER" and + step.entry and not step.entry.match] + + +def step_unchecked_rcode(test: RplintTest) -> List[RplintFail]: + """ENTRY specifies rcode but STEP MATCH does not check for it.""" + fails = [] + for step in test.steps: + if step.type == "CHECK_ANSWER" and step.entry and "all" not in step.entry.match: + if step.entry.reply & RCODES and "rcode" not in step.entry.match: + fails.append(RplintFail(test, step.entry)) + return fails + + +def step_unchecked_match(test: RplintTest) -> List[RplintFail]: + """ENTRY specifies flags but MATCH does not check for them""" + fails = [] + for step in test.steps: + if step.type == "CHECK_ANSWER": + entry = step.entry + if entry and "all" not in entry.match and entry.reply - RCODES and \ + "flags" not in entry.match: + fails.append(RplintFail(test, entry, str(entry.reply - RCODES))) + return fails + + +def step_section_unchecked(test: RplintTest) -> List[RplintFail]: + """ENTRY has non-empty sections but MATCH does not check for all of them""" + fails = [] + for step in test.steps: + if step.type == "CHECK_ANSWER" and step.entry and "all" not in step.entry.match: + for section in SECTIONS: + if not is_empty(step.node.match("/entry/section/" + section + "/*")): + if section not in step.entry.match: + fails.append(RplintFail(test, step.entry, section)) + return fails + + +def range_overlapping_ips(test: RplintTest) -> List[RplintFail]: + """RANGE has common IPs with some previous overlapping RANGE""" + fails = [] + for r1, r2 in itertools.combinations(test.ranges, 2): + # If the ranges overlap + if min(r1.b, r2.b) >= max(r1.a, r2.a): + if r1.addresses & r2.addresses: + info = "previous range on line %d" % get_line_number(test.path, r1.node.char) + fails.append(RplintFail(test, r2, info)) + return fails + + +def range_shadowing_match_rules(test: RplintTest) -> List[RplintFail]: + """ENTRY has no effect since one of previous entries has the same or broader match rules""" + fails = [] + for r in test.ranges: + for e1, e2 in itertools.combinations(r.stored, 2): + try: + e1.match(e2.message) + except ValueError: + pass + else: + info = "previous entry on line %d" % get_line_number(test.path, e1.node.char) + if e1.match_fields > e2.match_fields: + continue + if "subdomain" not in e1.match_fields and "subdomain" in e2.match_fields: + continue + fails.append(RplintFail(test, e2, info)) + return fails + + +def step_duplicate_id(test: RplintTest) -> List[RplintFail]: + """STEP has the same ID as one of previous ones""" + fails = [] + step_numbers = set() # type: Set[int] + for step in test.steps: + if step.node.value in step_numbers: + fails.append(RplintFail(test, step)) + else: + step_numbers.add(step.node.value) + return fails + + +# TODO: This will make sense after we fix how we handle defaults in deckard.aug and scenario.py +# We might just not use defaults altogether as testbound does +# if "copy_id" not in adjust: +# entry_error(test, entry, "copy_id should be in ADJUST") + +def test_run_rplint(rpl_path: str) -> None: + t = RplintTest(rpl_path) + passed = t.run_checks() + if not passed: + raise RplintError(t.fails) + + +def main(): + try: + test_path = sys.argv[1] + except IndexError: + print("usage: %s <path to rpl file>" % sys.argv[0]) + sys.exit(2) + if not os.path.isfile(test_path): + print("rplint.py works on single file only.") + print("Use rplint.sh with --scenarios=<directory with rpls> to run on rpls.") + sys.exit(2) + print("Linting %s" % test_path) + t = RplintTest(test_path) + passed = t.run_checks() + t.print_fails() + + if passed: + sys.exit(0) + sys.exit(1) + + +if __name__ == '__main__': + main() |