81 lines
2.7 KiB
Python
81 lines
2.7 KiB
Python
"""
|
|
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
|
|
SPDX-License-Identifier: MPL-2.0
|
|
|
|
This Source Code Form is subject to the terms of the Mozilla Public
|
|
License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
See the COPYRIGHT file distributed with this work for additional
|
|
information regarding copyright ownership.
|
|
"""
|
|
|
|
from typing import AsyncGenerator
|
|
|
|
import dns
|
|
|
|
from isctest.asyncserver import (
|
|
ControllableAsyncDnsServer,
|
|
DnsResponseSend,
|
|
QueryContext,
|
|
ResponseAction,
|
|
ResponseHandler,
|
|
ToggleResponsesCommand,
|
|
)
|
|
|
|
|
|
class ChaseDsHandler(ResponseHandler):
|
|
"""
|
|
Yield responses triggering DS chasing logic in `named`. These responses
|
|
cannot be served from a static zone file because most of them need to be
|
|
generated dynamically so that the owner name of the returned RRset is
|
|
copied from the QNAME sent by the client:
|
|
|
|
- A/AAAA queries for `ns1.sld.tld.` elicit responses with IP addresses,
|
|
- all NS queries below `sld.tld.` elicit a delegation to `ns1.sld.tld.`,
|
|
- all other queries elicit a negative response with a common SOA record.
|
|
"""
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[ResponseAction, None]:
|
|
ns1_sld_tld = dns.name.from_text("ns1.sld.tld.")
|
|
sld_tld = dns.name.from_text("sld.tld.")
|
|
|
|
if qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.A:
|
|
response_type = dns.rdatatype.A
|
|
response_rdata = "10.53.0.2"
|
|
response_section = qctx.response.answer
|
|
elif qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.AAAA:
|
|
response_type = dns.rdatatype.AAAA
|
|
response_rdata = "fd92:7065:b8e:ffff::2"
|
|
response_section = qctx.response.answer
|
|
elif qctx.qname.is_subdomain(sld_tld) and qctx.qtype == dns.rdatatype.NS:
|
|
response_type = dns.rdatatype.NS
|
|
response_rdata = "ns1.sld.tld."
|
|
response_section = qctx.response.answer
|
|
else:
|
|
response_type = dns.rdatatype.SOA
|
|
response_rdata = ". . 0 0 0 0 0"
|
|
response_section = qctx.response.authority
|
|
|
|
qctx.response.set_rcode(dns.rcode.NOERROR)
|
|
qctx.response.use_edns(None)
|
|
|
|
response_rrset = dns.rrset.from_text(
|
|
qctx.qname, 300, qctx.qclass, response_type, response_rdata
|
|
)
|
|
response_section.append(response_rrset)
|
|
|
|
yield DnsResponseSend(qctx.response, authoritative=True)
|
|
|
|
|
|
def main() -> None:
|
|
server = ControllableAsyncDnsServer([ToggleResponsesCommand])
|
|
server.install_response_handler(ChaseDsHandler())
|
|
server.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|