1
0
Fork 0
bind9/bin/tests/system/forward/ans6/ans.py
Daniel Baumann f66ff7eae6
Adding upstream version 1:9.20.9.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:32:37 +02:00

81 lines
2.7 KiB
Python

"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
import dns
from isctest.asyncserver import (
ControllableAsyncDnsServer,
DnsResponseSend,
QueryContext,
ResponseAction,
ResponseHandler,
ToggleResponsesCommand,
)
class ChaseDsHandler(ResponseHandler):
"""
Yield responses triggering DS chasing logic in `named`. These responses
cannot be served from a static zone file because most of them need to be
generated dynamically so that the owner name of the returned RRset is
copied from the QNAME sent by the client:
- A/AAAA queries for `ns1.sld.tld.` elicit responses with IP addresses,
- all NS queries below `sld.tld.` elicit a delegation to `ns1.sld.tld.`,
- all other queries elicit a negative response with a common SOA record.
"""
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[ResponseAction, None]:
ns1_sld_tld = dns.name.from_text("ns1.sld.tld.")
sld_tld = dns.name.from_text("sld.tld.")
if qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.A:
response_type = dns.rdatatype.A
response_rdata = "10.53.0.2"
response_section = qctx.response.answer
elif qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.AAAA:
response_type = dns.rdatatype.AAAA
response_rdata = "fd92:7065:b8e:ffff::2"
response_section = qctx.response.answer
elif qctx.qname.is_subdomain(sld_tld) and qctx.qtype == dns.rdatatype.NS:
response_type = dns.rdatatype.NS
response_rdata = "ns1.sld.tld."
response_section = qctx.response.answer
else:
response_type = dns.rdatatype.SOA
response_rdata = ". . 0 0 0 0 0"
response_section = qctx.response.authority
qctx.response.set_rcode(dns.rcode.NOERROR)
qctx.response.use_edns(None)
response_rrset = dns.rrset.from_text(
qctx.qname, 300, qctx.qclass, response_type, response_rdata
)
response_section.append(response_rrset)
yield DnsResponseSend(qctx.response, authoritative=True)
def main() -> None:
server = ControllableAsyncDnsServer([ToggleResponsesCommand])
server.install_response_handler(ChaseDsHandler())
server.run()
if __name__ == "__main__":
main()