diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
commit | 8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch) | |
tree | 4099e8021376c7d8c05bdf8503093d80e9c7bad0 /python/samba/tests/dns.py | |
parent | Initial commit. (diff) | |
download | samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip |
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python/samba/tests/dns.py')
-rw-r--r-- | python/samba/tests/dns.py | 2247 |
1 files changed, 2247 insertions, 0 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py new file mode 100644 index 0000000..a331e26 --- /dev/null +++ b/python/samba/tests/dns.py @@ -0,0 +1,2247 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Kai Blin <kai@samba.org> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from samba import dsdb +from samba.ndr import ndr_unpack, ndr_pack +from samba.samdb import SamDB +from samba.auth import system_session +import ldb +from ldb import ERR_OPERATIONS_ERROR +import os +import sys +import struct +import socket +import samba.ndr as ndr +from samba import credentials +from samba.dcerpc import dns, dnsp, dnsserver +from samba.dnsserver import TXTRecord +from samba.dnsserver import record_from_string, dns_record_match +from samba.tests.subunitrun import SubunitOptions, TestProgram +from samba import werror, WERRORError +from samba.tests.dns_base import DNSTest +import samba.getopt as options +import optparse + + +parser = optparse.OptionParser("dns.py <server name> <server ip> [options]") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) + +# This timeout only has relevance when testing against Windows +# Format errors tend to return patchy responses, so a timeout is needed. +parser.add_option("--timeout", type="int", dest="timeout", + help="Specify timeout for DNS requests") + +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) + +opts, args = parser.parse_args() + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) + +timeout = opts.timeout + +if len(args) < 2: + parser.print_usage() + sys.exit(1) + +server_name = args[0] +server_ip = args[1] +creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE) + + +class TestSimpleQueries(DNSTest): + def setUp(self): + super().setUp() + global server, server_ip, lp, creds, timeout + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.timeout = timeout + + def test_one_a_query(self): + "create a query packet containing one query record" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 1) + self.assertEqual(response.answers[0].rdata, + self.server_ip) + + def test_one_SOA_query(self): + "create a query packet containing one query record for the SOA" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s" % (self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 1) + self.assertEqual( + response.answers[0].rdata.mname.upper(), + ("%s.%s" % (self.server, self.get_dns_domain())).upper()) + + def test_one_a_query_tcp(self): + "create a query packet containing one query record via TCP" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_tcp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 1) + self.assertEqual(response.answers[0].rdata, + self.server_ip) + + def test_one_mx_query(self): + "create a query packet causing an empty RCODE_OK answer" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 0) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "invalid-%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 0) + + def test_two_queries(self): + "create a query packet containing two query records" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + questions.append(q) + + name = "%s.%s" % ('bogusname', self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + try: + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + except socket.timeout: + # Windows chooses not to respond to incorrectly formatted queries. + # Although this appears to be non-deterministic even for the same + # request twice, it also appears to be based on a how poorly the + # request is formatted. + pass + + def test_qtype_all_query(self): + "create a QTYPE_ALL query" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + + num_answers = 1 + dc_ipv6 = os.getenv('SERVER_IPV6') + if dc_ipv6 is not None: + num_answers += 1 + + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, num_answers) + self.assertEqual(response.answers[0].rdata, + self.server_ip) + if dc_ipv6 is not None: + self.assertEqual(response.answers[1].rdata, dc_ipv6) + + def test_qclass_none_query(self): + "create a QCLASS_NONE query" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question( + name, + dns.DNS_QTYPE_ALL, + dns.DNS_QCLASS_NONE) + questions.append(q) + + self.finish_name_packet(p, questions) + try: + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP) + except socket.timeout: + # Windows chooses not to respond to incorrectly formatted queries. + # Although this appears to be non-deterministic even for the same + # request twice, it also appears to be based on a how poorly the + # request is formatted. + pass + + def test_soa_hostname_query(self): + "create a SOA query for a hostname" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + # We don't get SOA records for single hosts + self.assertEqual(response.ancount, 0) + # But we do respond with an authority section + self.assertEqual(response.nscount, 1) + + def test_soa_unknown_hostname_query(self): + "create a SOA query for an unknown hostname" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "foobar.%s" % (self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + # We don't get SOA records for single hosts + self.assertEqual(response.ancount, 0) + # But we do respond with an authority section + self.assertEqual(response.nscount, 1) + + def test_soa_domain_query(self): + "create a SOA query for a domain" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = self.get_dns_domain() + q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 1) + self.assertEqual(response.answers[0].rdata.minimum, 3600) + + +class TestDNSUpdates(DNSTest): + def setUp(self): + super().setUp() + global server, server_ip, lp, creds, timeout + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.timeout = timeout + + def test_two_updates(self): + "create two update requests" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + updates.append(u) + + name = self.get_dns_domain() + u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + updates.append(u) + + self.finish_name_packet(p, updates) + try: + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + except socket.timeout: + # Windows chooses not to respond to incorrectly formatted queries. + # Although this appears to be non-deterministic even for the same + # request twice, it also appears to be based on a how poorly the + # request is formatted. + pass + + def test_update_wrong_qclass(self): + "create update with DNS_QCLASS_NONE" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE) + updates.append(u) + + self.finish_name_packet(p, updates) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP) + + def test_update_prereq_with_non_null_ttl(self): + "test update with a non-null TTL" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + prereqs = [] + r = dns.res_rec() + r.name = "%s.%s" % (self.server, self.get_dns_domain()) + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 1 + r.length = 0 + prereqs.append(r) + + p.ancount = len(prereqs) + p.answers = prereqs + + try: + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + except socket.timeout: + # Windows chooses not to respond to incorrectly formatted queries. + # Although this appears to be non-deterministic even for the same + # request twice, it also appears to be based on a how poorly the + # request is formatted. + pass + + def test_update_prereq_with_non_null_length(self): + "test update with a non-null length" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + prereqs = [] + r = dns.res_rec() + r.name = "%s.%s" % (self.server, self.get_dns_domain()) + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_ANY + r.ttl = 0 + r.length = 1 + prereqs.append(r) + + p.ancount = len(prereqs) + p.answers = prereqs + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET) + + def test_update_prereq_nonexisting_name(self): + "test update with a nonexisting name" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + prereqs = [] + r = dns.res_rec() + r.name = "idontexist.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_ANY + r.ttl = 0 + r.length = 0 + prereqs.append(r) + + p.ancount = len(prereqs) + p.answers = prereqs + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET) + + def test_update_add_txt_record(self): + "test adding records works" + prefix, txt = 'textrec', ['"This is a test"'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + + def test_delete_record(self): + "Test if deleting records works" + + NAME = "deleterec.%s" % self.get_dns_domain() + + # First, create a record to make sure we have a record to delete. + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + rdata = self.make_txt_record(['"This is a test"']) + r.rdata = rdata + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now check the record is around + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now delete the record + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + rdata = self.make_txt_record(['"This is a test"']) + r.rdata = rdata + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # And finally check it's gone + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + + def test_readd_record(self): + "Test if adding, deleting and then re-adding a records works" + + NAME = "readdrec.%s" % self.get_dns_domain() + + # Create the record + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + rdata = self.make_txt_record(['"This is a test"']) + r.rdata = rdata + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now check the record is around + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now delete the record + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + rdata = self.make_txt_record(['"This is a test"']) + r.rdata = rdata + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # check it's gone + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + + # recreate the record + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + rdata = self.make_txt_record(['"This is a test"']) + r.rdata = rdata + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now check the record is around + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + def test_update_add_mx_record(self): + "test adding MX records works" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = "%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_MX + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + rdata = dns.mx_record() + rdata.preference = 10 + rdata.exchange = 'mail.%s' % self.get_dns_domain() + r.rdata = rdata + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s" % self.get_dns_domain() + q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assertEqual(response.ancount, 1) + ans = response.answers[0] + self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX) + self.assertEqual(ans.rdata.preference, 10) + self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain()) + + +class TestComplexQueries(DNSTest): + def make_dns_update(self, key, value, qtype): + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + + name = self.get_dns_domain() + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + self.finish_name_packet(p, [u]) + + r = dns.res_rec() + r.name = key + r.rr_type = qtype + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + r.rdata = value + p.nscount = 1 + p.nsrecs = [r] + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + def setUp(self): + super().setUp() + + global server, server_ip, lp, creds, timeout + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.timeout = timeout + + def test_one_a_query(self): + "create a query packet containing one query record" + + try: + + # Create the record + name = "cname_test.%s" % self.get_dns_domain() + rdata = "%s.%s" % (self.server, self.get_dns_domain()) + self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + # Check the record + name = "cname_test.%s" % self.get_dns_domain() + q = self.make_name_question(name, + dns.DNS_QTYPE_A, + dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 2) + self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) + self.assertEqual(response.answers[0].rdata, "%s.%s" % + (self.server, self.get_dns_domain())) + self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_A) + self.assertEqual(response.answers[1].rdata, + self.server_ip) + + finally: + # Delete the record + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, + dns.DNS_QTYPE_SOA, + dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = "cname_test.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_CNAME + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + r.rdata = "%s.%s" % (self.server, self.get_dns_domain()) + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + def test_cname_two_chain(self): + name0 = "cnamechain0.%s" % self.get_dns_domain() + name1 = "cnamechain1.%s" % self.get_dns_domain() + name2 = "cnamechain2.%s" % self.get_dns_domain() + self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME) + self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME) + self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + q = self.make_name_question(name1, dns.DNS_QTYPE_A, + dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 3) + + self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) + self.assertEqual(response.answers[0].name, name1) + self.assertEqual(response.answers[0].rdata, name2) + + self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME) + self.assertEqual(response.answers[1].name, name2) + self.assertEqual(response.answers[1].rdata, name0) + + self.assertEqual(response.answers[2].rr_type, dns.DNS_QTYPE_A) + self.assertEqual(response.answers[2].rdata, + self.server_ip) + + def test_invalid_empty_cname(self): + name0 = "cnamedotprefix0.%s" % self.get_dns_domain() + try: + self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME) + except AssertionError: + pass + else: + self.fail("Successfully added empty CNAME, which is invalid.") + + def test_cname_two_chain_not_matching_qtype(self): + name0 = "cnamechain0.%s" % self.get_dns_domain() + name1 = "cnamechain1.%s" % self.get_dns_domain() + name2 = "cnamechain2.%s" % self.get_dns_domain() + self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME) + self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME) + self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + q = self.make_name_question(name1, dns.DNS_QTYPE_TXT, + dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + + # CNAME should return all intermediate results! + # Only the A records exists, not the TXT. + self.assertEqual(response.ancount, 2) + + self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) + self.assertEqual(response.answers[0].name, name1) + self.assertEqual(response.answers[0].rdata, name2) + + self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME) + self.assertEqual(response.answers[1].name, name2) + self.assertEqual(response.answers[1].rdata, name0) + + def test_cname_loop(self): + cname1 = "cnamelooptestrec." + self.get_dns_domain() + cname2 = "cnamelooptestrec2." + self.get_dns_domain() + cname3 = "cnamelooptestrec3." + self.get_dns_domain() + self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME) + self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME) + self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(cname1, + dns.DNS_QTYPE_A, + dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + + max_recursion_depth = 20 + self.assertEqual(len(response.answers), max_recursion_depth) + + # Make sure cname limit doesn't count other records. This is a generic + # test called in tests below + def max_rec_test(self, rtype, rec_gen): + name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain()) + limit = 20 + num_recs_to_enter = limit + 5 + + for i in range(1, num_recs_to_enter+1): + ip = rec_gen(i) + self.make_dns_update(name, ip, rtype) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(name, + rtype, + dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + + self.assertEqual(len(response.answers), num_recs_to_enter) + + def test_record_limit_A(self): + def ip4_gen(i): + return "127.0.0." + str(i) + self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen) + + def test_record_limit_AAAA(self): + def ip6_gen(i): + return "AAAA:0:0:0:0:0:0:" + str(i) + self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen) + + def test_record_limit_SRV(self): + def srv_gen(i): + rec = dns.srv_record() + rec.priority = 1 + rec.weight = 1 + rec.port = 92 + rec.target = "srvtestrec" + str(i) + return rec + self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen) + + # Same as test_record_limit_A but with a preceding CNAME follow + def test_cname_limit(self): + cname1 = "cnamelimittestrec." + self.get_dns_domain() + cname2 = "cnamelimittestrec2." + self.get_dns_domain() + cname3 = "cnamelimittestrec3." + self.get_dns_domain() + ip_prefix = '127.0.0.' + limit = 20 + num_recs_to_enter = limit + 5 + + self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME) + self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME) + num_arecs_to_enter = num_recs_to_enter - 2 + for i in range(1, num_arecs_to_enter+1): + ip = ip_prefix + str(i) + self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(cname1, + dns.DNS_QTYPE_A, + dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + + self.assertEqual(len(response.answers), num_recs_to_enter) + + # ANY query on cname record shouldn't follow the link + def test_cname_any_query(self): + cname1 = "cnameanytestrec." + self.get_dns_domain() + cname2 = "cnameanytestrec2." + self.get_dns_domain() + cname3 = "cnameanytestrec3." + self.get_dns_domain() + + self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME) + self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(cname1, + dns.DNS_QTYPE_ALL, + dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + + self.assertEqual(len(response.answers), 1) + self.assertEqual(response.answers[0].name, cname1) + self.assertEqual(response.answers[0].rdata, cname2) + + +class TestInvalidQueries(DNSTest): + def setUp(self): + super().setUp() + global server, server_ip, lp, creds, timeout + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.timeout = timeout + + def test_one_a_query(self): + """send 0 bytes follows by create a query packet + containing one query record""" + + s = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + s.connect((self.server_ip, 53)) + s.send(b"", 0) + finally: + if s is not None: + s.close() + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (self.server, self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 1) + self.assertEqual(response.answers[0].rdata, + self.server_ip) + + def test_one_a_reply(self): + "send a reply instead of a query" + global timeout + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % ('fakefakefake', self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print("asking for ", q.name) + questions.append(q) + + self.finish_name_packet(p, questions) + p.operation |= dns.DNS_FLAG_REPLY + s = None + try: + send_packet = ndr.ndr_pack(p) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + s.settimeout(timeout) + host = self.server_ip + s.connect((host, 53)) + tcp_packet = struct.pack('!H', len(send_packet)) + tcp_packet += send_packet + s.send(tcp_packet, 0) + recv_packet = s.recv(0xffff + 2, 0) + self.assertEqual(0, len(recv_packet)) + except socket.timeout: + # Windows chooses not to respond to incorrectly formatted queries. + # Although this appears to be non-deterministic even for the same + # request twice, it also appears to be based on a how poorly the + # request is formatted. + pass + finally: + if s is not None: + s.close() + + +class TestZones(DNSTest): + def setUp(self): + super().setUp() + global server, server_ip, lp, creds, timeout + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.timeout = timeout + + self.zone = "test.lan" + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % + (self.server_ip), + self.lp, self.creds) + + self.samdb = SamDB(url="ldap://" + self.server_ip, + lp=self.get_loadparm(), + session_info=system_session(), + credentials=self.creds) + self.zone_dn = "DC=" + self.zone +\ + ",CN=MicrosoftDNS,DC=DomainDNSZones," +\ + str(self.samdb.get_default_basedn()) + + def tearDown(self): + super().tearDown() + + try: + self.delete_zone(self.zone) + except RuntimeError as e: + (num, string) = e.args + if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: + raise + + def make_zone_obj(self, zone, aging_enabled=False): + zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create.pszZoneName = zone + zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create.fAging = int(aging_enabled) + zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + zone_create.fDsIntegrated = 1 + zone_create.fLoadExisting = 1 + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE + return zone_create + + def create_zone(self, zone, aging_enabled=False): + zone_create = self.make_zone_obj(zone, aging_enabled) + try: + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + self.rpc_conn.DnssrvOperation2(client_version, + 0, + self.server_ip, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create) + except WERRORError as e: + self.fail(e) + + def set_params(self, **kwargs): + for key, val in kwargs.items(): + name_param = dnsserver.DNS_RPC_NAME_AND_PARAM() + name_param.dwParam = val + name_param.pszNodeName = key + + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM + try: + self.rpc_conn.DnssrvOperation2(client_version, + 0, + self.server, + self.zone, + 0, + 'ResetDwordProperty', + nap_type, + name_param) + except WERRORError as e: + self.fail(str(e)) + + def ldap_modify_dnsrecs(self, name, func): + dn = 'DC={0},{1}'.format(name, self.zone_dn) + dns_recs = self.ldap_get_dns_records(name) + for rec in dns_recs: + func(rec) + update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]} + self.samdb.modify(ldb.Message.from_dict(self.samdb, + update_dict, + ldb.FLAG_MOD_REPLACE)) + + def dns_update_record(self, prefix, txt): + p = self.make_txt_update(prefix, txt, self.zone) + (code, response) = self.dns_transaction_udp(p, host=self.server_ip) + self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK) + recs = self.ldap_get_dns_records(prefix) + recs = [r for r in recs if r.data.str == txt] + self.assertEqual(len(recs), 1) + return recs[0] + + def dns_tombstone(self, prefix, txt, zone): + name = prefix + "." + zone + + to = dnsp.DnssrvRpcRecord() + to.dwTimeStamp = 1000 + to.wType = dnsp.DNS_TYPE_TOMBSTONE + + self.samdb.dns_replace(name, [to]) + + def ldap_get_records(self, name): + # The use of SCOPE_SUBTREE here avoids raising an exception in the + # 0 results case for a test below. + + expr = "(&(objectClass=dnsNode)(name={0}))".format(name) + return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + + def ldap_get_dns_records(self, name): + records = self.ldap_get_records(name) + return [ndr_unpack(dnsp.DnssrvRpcRecord, r) + for r in records[0].get('dnsRecord')] + + def ldap_get_zone_settings(self): + records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE, + expression="(&(objectClass=dnsZone)" + + "(name={0}))".format(self.zone), + attrs=["dNSProperty"]) + self.assertEqual(len(records), 1) + props = [ndr_unpack(dnsp.DnsProperty, r) + for r in records[0].get('dNSProperty')] + + # We have no choice but to repeat these here. + zone_prop_ids = {0x00: "EMPTY", + 0x01: "TYPE", + 0x02: "ALLOW_UPDATE", + 0x08: "SECURE_TIME", + 0x10: "NOREFRESH_INTERVAL", + 0x11: "SCAVENGING_SERVERS", + 0x12: "AGING_ENABLED_TIME", + 0x20: "REFRESH_INTERVAL", + 0x40: "AGING_STATE", + 0x80: "DELETED_FROM_HOSTNAME", + 0x81: "MASTER_SERVERS", + 0x82: "AUTO_NS_SERVERS", + 0x83: "DCPROMO_CONVERT", + 0x90: "SCAVENGING_SERVERS_DA", + 0x91: "MASTER_SERVERS_DA", + 0x92: "NS_SERVERS_DA", + 0x100: "NODE_DBFLAGS"} + return {zone_prop_ids[p.id].lower(): p.data for p in props} + + def set_aging(self, enable=False): + self.create_zone(self.zone, aging_enabled=enable) + self.set_params(NoRefreshInterval=1, + RefreshInterval=1, + Aging=int(bool(enable)), + AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) + + def test_set_aging(self, enable=True, name='agingtest', txt=None): + if txt is None: + txt = ['test txt'] + self.set_aging(enable=True) + settings = self.ldap_get_zone_settings() + self.assertTrue(settings['aging_state'] is not None) + self.assertTrue(settings['aging_state']) + + rec = self.dns_update_record('agingtest', ['test txt']) + self.assertNotEqual(rec.dwTimeStamp, 0) + + def test_set_aging_disabled(self): + self.set_aging(enable=False) + settings = self.ldap_get_zone_settings() + self.assertTrue(settings['aging_state'] is not None) + self.assertFalse(settings['aging_state']) + + rec = self.dns_update_record('agingtest', ['test txt']) + self.assertNotEqual(rec.dwTimeStamp, 0) + + def test_aging_update(self, enable=True): + name, txt = 'agingtest', ['test txt'] + self.set_aging(enable=True) + before_mod = self.dns_update_record(name, txt) + if not enable: + self.set_params(Aging=0) + dec = 2 + + def mod_ts(rec): + self.assertTrue(rec.dwTimeStamp > 0) + rec.dwTimeStamp -= dec + self.ldap_modify_dnsrecs(name, mod_ts) + after_mod = self.ldap_get_dns_records(name) + self.assertEqual(len(after_mod), 1) + after_mod = after_mod[0] + self.assertEqual(after_mod.dwTimeStamp, + before_mod.dwTimeStamp - dec) + after_update = self.dns_update_record(name, txt) + after_should_equal = before_mod if enable else after_mod + self.assertEqual(after_should_equal.dwTimeStamp, + after_update.dwTimeStamp) + + def test_aging_update_disabled(self): + self.test_aging_update(enable=False) + + def test_aging_refresh(self): + name, txt = 'agingtest', ['test txt'] + self.create_zone(self.zone, aging_enabled=True) + interval = 10 + self.set_params(NoRefreshInterval=interval, + RefreshInterval=interval, + Aging=1, + AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) + before_mod = self.dns_update_record(name, txt) + + def mod_ts(rec): + self.assertTrue(rec.dwTimeStamp > 0) + rec.dwTimeStamp -= interval // 2 + self.ldap_modify_dnsrecs(name, mod_ts) + update_during_norefresh = self.dns_update_record(name, txt) + + def mod_ts(rec): + self.assertTrue(rec.dwTimeStamp > 0) + rec.dwTimeStamp -= interval + interval // 2 + self.ldap_modify_dnsrecs(name, mod_ts) + update_during_refresh = self.dns_update_record(name, txt) + self.assertEqual(update_during_norefresh.dwTimeStamp, + before_mod.dwTimeStamp - interval / 2) + self.assertEqual(update_during_refresh.dwTimeStamp, + before_mod.dwTimeStamp) + + def test_rpc_add_no_timestamp(self): + name, txt = 'agingtest', ['test txt'] + self.set_aging(enable=True) + rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + rec_buf.rec = TXTRecord(txt) + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.zone, + name, + rec_buf, + None) + recs = self.ldap_get_dns_records(name) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].dwTimeStamp, 0) + + def test_static_record_dynamic_update(self): + name, txt = 'agingtest', ['test txt'] + txt2 = ['test txt2'] + self.set_aging(enable=True) + rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + rec_buf.rec = TXTRecord(txt) + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.zone, + name, + rec_buf, + None) + + rec2 = self.dns_update_record(name, txt2) + self.assertEqual(rec2.dwTimeStamp, 0) + + def test_dynamic_record_static_update(self): + name, txt = 'agingtest', ['test txt'] + txt2 = ['test txt2'] + txt3 = ['test txt3'] + self.set_aging(enable=True) + + self.dns_update_record(name, txt) + + rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + rec_buf.rec = TXTRecord(txt2) + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.zone, + name, + rec_buf, + None) + + self.dns_update_record(name, txt3) + + recs = self.ldap_get_dns_records(name) + # Put in dict because ldap recs might be out of order + recs = {str(r.data.str): r for r in recs} + self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0) + self.assertEqual(recs[str(txt2)].dwTimeStamp, 0) + self.assertEqual(recs[str(txt3)].dwTimeStamp, 0) + + def test_dns_tombstone_custom_match_rule(self): + lp = self.get_loadparm() + self.samdb = SamDB(url=lp.samdb_url(), lp=lp, + session_info=system_session(), + credentials=self.creds) + + name, txt = 'agingtest', ['test txt'] + name2, txt2 = 'agingtest2', ['test txt2'] + name3, txt3 = 'agingtest3', ['test txt3'] + name4, txt4 = 'agingtest4', ['test txt4'] + name5, txt5 = 'agingtest5', ['test txt5'] + + self.create_zone(self.zone, aging_enabled=True) + interval = 10 + self.set_params(NoRefreshInterval=interval, + RefreshInterval=interval, + Aging=1, + AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) + + self.dns_update_record(name, txt) + + self.dns_update_record(name2, txt) + self.dns_update_record(name2, txt2) + + self.dns_update_record(name3, txt) + self.dns_update_record(name3, txt2) + last_update = self.dns_update_record(name3, txt3) + + # Modify txt1 of the first 2 names + def mod_ts(rec): + if rec.data.str == txt: + rec.dwTimeStamp -= 2 + self.ldap_modify_dnsrecs(name, mod_ts) + self.ldap_modify_dnsrecs(name2, mod_ts) + + # create a static dns record. + rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + rec_buf.rec = TXTRecord(txt4) + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.zone, + name4, + rec_buf, + None) + + # Create a tomb stoned record. + self.dns_update_record(name5, txt5) + self.dns_tombstone(name5, txt5, self.zone) + + self.ldap_get_dns_records(name3) + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})" + expr = expr.format(int(last_update.dwTimeStamp) - 1) + try: + res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + except ldb.LdbError as e: + self.fail(str(e)) + updated_names = {str(r.get('name')) for r in res} + self.assertEqual(updated_names, set([name, name2])) + + def test_dns_tombstone_custom_match_rule_no_records(self): + lp = self.get_loadparm() + self.samdb = SamDB(url=lp.samdb_url(), lp=lp, + session_info=system_session(), + credentials=self.creds) + + self.create_zone(self.zone, aging_enabled=True) + interval = 10 + self.set_params(NoRefreshInterval=interval, + RefreshInterval=interval, + Aging=1, + AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) + + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})" + expr = expr.format(1) + + try: + res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + except ldb.LdbError as e: + self.fail(str(e)) + self.assertEqual(0, len(res)) + + def test_dns_tombstone_custom_match_rule_fail(self): + self.create_zone(self.zone, aging_enabled=True) + samdb = SamDB(url=lp.samdb_url(), + lp=lp, + session_info=system_session(), + credentials=self.creds) + + # Property name in not dnsRecord + expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)" + res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + self.assertEqual(len(res), 0) + + # No value for tombstone time + try: + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)" + res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + self.assertEqual(len(res), 0) + self.fail("Exception: ldb.ldbError not generated") + except ldb.LdbError as e: + (num, msg) = e.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + + # Tombstone time = - + try: + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)" + res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + self.assertEqual(len(res), 0) + self.fail("Exception: ldb.ldbError not generated") + except ldb.LdbError as e: + (num, _) = e.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + + # Tombstone time longer than 64 characters + try: + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})" + expr = expr.format("1" * 65) + res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + self.assertEqual(len(res), 0) + self.fail("Exception: ldb.ldbError not generated") + except ldb.LdbError as e: + (num, _) = e.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + + # Non numeric Tombstone time + try: + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)" + res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + self.assertEqual(len(res), 0) + self.fail("Exception: ldb.ldbError not generated") + except ldb.LdbError as e: + (num, _) = e.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + + # Non system session + try: + db = SamDB(url="ldap://" + self.server_ip, + lp=self.get_loadparm(), + credentials=self.creds) + + expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)" + res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, + expression=expr, attrs=["*"]) + self.assertEqual(len(res), 0) + self.fail("Exception: ldb.ldbError not generated") + except ldb.LdbError as e: + (num, _) = e.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + + def test_basic_scavenging(self): + lp = self.get_loadparm() + self.samdb = SamDB(url=lp.samdb_url(), lp=lp, + session_info=system_session(), + credentials=self.creds) + + self.create_zone(self.zone, aging_enabled=True) + interval = 1 + self.set_params(NoRefreshInterval=interval, + RefreshInterval=interval, + Aging=1, + AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) + name, txt = 'agingtest', ['test txt'] + name2, txt2 = 'agingtest2', ['test txt2'] + name3, txt3 = 'agingtest3', ['test txt3'] + name4, txt4 = 'agingtest4', ['test txt4'] + name5, txt5 = 'agingtest5', ['test txt5'] + self.dns_update_record(name, txt) + self.dns_update_record(name2, txt) + self.dns_update_record(name2, txt2) + self.dns_update_record(name3, txt) + self.dns_update_record(name3, txt2) + + # Create a tombstoned record. + self.dns_update_record(name4, txt4) + self.dns_tombstone(name4, txt4, self.zone) + records = self.ldap_get_records(name4) + self.assertIn("dNSTombstoned", records[0]) + self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") + + # Create an un-tombstoned record, with dnsTombstoned: FALSE + self.dns_update_record(name5, txt5) + self.dns_tombstone(name5, txt5, self.zone) + self.dns_update_record(name5, txt5) + records = self.ldap_get_records(name5) + self.assertIn("dNSTombstoned", records[0]) + self.assertEqual(records[0]["dNSTombstoned"][0], b"FALSE") + + last_add = self.dns_update_record(name3, txt3) + + def mod_ts(rec): + self.assertTrue(rec.dwTimeStamp > 0) + if rec.data.str == txt: + rec.dwTimeStamp -= interval * 5 + + def mod_ts_all(rec): + rec.dwTimeStamp -= interval * 5 + self.ldap_modify_dnsrecs(name, mod_ts) + self.ldap_modify_dnsrecs(name2, mod_ts) + self.ldap_modify_dnsrecs(name3, mod_ts) + self.ldap_modify_dnsrecs(name5, mod_ts_all) + self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None))) + dsdb._scavenge_dns_records(self.samdb) + + recs = self.ldap_get_dns_records(name) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) + records = self.ldap_get_records(name) + self.assertIn("dNSTombstoned", records[0]) + self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") + + recs = self.ldap_get_dns_records(name2) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT) + self.assertEqual(recs[0].data.str, txt2) + + recs = self.ldap_get_dns_records(name3) + self.assertEqual(len(recs), 2) + txts = {str(r.data.str) for r in recs} + self.assertEqual(txts, {str(txt2), str(txt3)}) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT) + self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT) + + recs = self.ldap_get_dns_records(name4) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) + records = self.ldap_get_records(name4) + self.assertIn("dNSTombstoned", records[0]) + self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") + + recs = self.ldap_get_dns_records(name5) + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) + records = self.ldap_get_records(name5) + self.assertIn("dNSTombstoned", records[0]) + self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") + + for make_it_work in [False, True]: + inc = -1 if make_it_work else 1 + + def mod_ts(rec): + rec.data = (last_add.dwTimeStamp - 24 * 14) + inc + self.ldap_modify_dnsrecs(name, mod_ts) + dsdb._dns_delete_tombstones(self.samdb) + recs = self.ldap_get_records(name) + if make_it_work: + self.assertEqual(len(recs), 0) + else: + self.assertEqual(len(recs), 1) + + def test_fully_qualified_zone(self): + + def create_zone_expect_exists(zone): + try: + zone_create = self.make_zone_obj(zone) + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE + self.rpc_conn.DnssrvOperation2(client_version, + 0, + self.server_ip, + None, + 0, + 'ZoneCreate', + zc_type, + zone_create) + except WERRORError as e: + enum, _ = e.args + if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS: + self.fail(e) + return + self.fail("Zone {} should already exist".format(zone)) + + # Create unqualified, then check creating qualified fails. + self.create_zone(self.zone) + create_zone_expect_exists(self.zone + '.') + + # Same again, but the other way around. + self.create_zone(self.zone + '2.') + create_zone_expect_exists(self.zone + '2') + + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY + tid = dnsserver.DNSSRV_TYPEID_DWORD + typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version, + 0, + self.server_ip, + None, + 'EnumZones', + tid, + request_filter) + + self.delete_zone(self.zone) + self.delete_zone(self.zone + '2') + + # Two zones should've been created, neither of them fully qualified. + zones_we_just_made = [] + zones = [str(z.pszZoneName) for z in res.ZoneArray] + for zone in zones: + if zone.startswith(self.zone): + zones_we_just_made.append(zone) + self.assertEqual(len(zones_we_just_made), 2) + self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone}) + + def delete_zone(self, zone): + self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + def test_soa_query(self): + zone = "test.lan" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + # Windows returns OK while BIND logically seems to return NXDOMAIN + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 0) + + self.create_zone(zone) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 1) + self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_SOA) + + self.delete_zone(zone) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEqual(response.ancount, 0) + + def set_dnsProperty_zero_length(self, dnsproperty_id): + records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE, + expression="(&(objectClass=dnsZone)" + + "(name={0}))".format(self.zone), + attrs=["dNSProperty"]) + self.assertEqual(len(records), 1) + props = [ndr_unpack(dnsp.DnsProperty, r) + for r in records[0].get('dNSProperty')] + new_props = [ndr.ndr_pack(p) for p in props if p.id == dnsproperty_id] + + zero_length_p = dnsp.DnsProperty_short() + zero_length_p.id = dnsproperty_id + zero_length_p.namelength = 1 + zero_length_p.name = 1 + new_props += [ndr.ndr_pack(zero_length_p)] + + dn = records[0].dn + update_dict = {'dn': dn, 'dnsProperty': new_props} + self.samdb.modify(ldb.Message.from_dict(self.samdb, + update_dict, + ldb.FLAG_MOD_REPLACE)) + + def test_update_while_dnsProperty_zero_length(self): + self.create_zone(self.zone) + self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE) + rec = self.dns_update_record('dnspropertytest', ['test txt']) + self.assertNotEqual(rec.dwTimeStamp, 0) + + def test_enum_zones_while_dnsProperty_zero_length(self): + self.create_zone(self.zone) + self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE) + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY + tid = dnsserver.DNSSRV_TYPEID_DWORD + typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version, + 0, + self.server_ip, + None, + 'EnumZones', + tid, + request_filter) + + def test_rpc_zone_update_while_dnsProperty_zero_length(self): + self.create_zone(self.zone) + self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE) + self.set_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_SECURE) + + def test_rpc_zone_update_while_other_dnsProperty_zero_length(self): + self.create_zone(self.zone) + self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_MASTER_SERVERS_DA) + self.set_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_SECURE) + +class TestRPCRoundtrip(DNSTest): + def setUp(self): + super().setUp() + global server, server_ip, lp, creds + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % + (self.server_ip), + self.lp, + self.creds) + + def rpc_update(self, fqn=None, data=None, wType=None, delete=False): + fqn = fqn or ("rpctestrec." + self.get_dns_domain()) + + rec = record_from_string(wType, data) + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + + add_arg = add_rec_buf + del_arg = None + if delete: + add_arg = None + del_arg = add_rec_buf + + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + fqn, + add_arg, + del_arg) + + def test_rpc_self_referencing_cname(self): + cname = "cnametest2_unqual_rec_loop" + cname_fqn = "%s.%s" % (cname, self.get_dns_domain()) + + try: + self.rpc_update(fqn=cname, data=cname_fqn, + wType=dnsp.DNS_TYPE_CNAME, delete=True) + except WERRORError as e: + if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST: + self.fail("RPC DNS gaven wrong error on pre-test cleanup " + "for self referencing CNAME: %s" % e.args[0]) + + try: + self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn) + except WERRORError as e: + if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP: + self.fail("RPC DNS gaven wrong error on insertion of " + "self referencing CNAME: %s" % e.args[0]) + return + + self.fail("RPC DNS allowed insertion of self referencing CNAME") + + def test_update_add_txt_rpc_to_dns(self): + prefix, txt = 'rpctextrec', ['"This is a test"'] + + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + def test_update_add_null_padded_txt_record(self): + "test adding records works" + prefix, txt = 'pad1textrec', ['"This is a test"', '', ''] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone( + dns_record_match(self.rpc_conn, + self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, + '"\\"This is a test\\"" "" ""')) + + prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone( + dns_record_match( + self.rpc_conn, + self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, + '"\\"This is a test\\"" "" "" "more text"')) + + prefix, txt = 'pad3textrec', ['', '', '"This is a test"'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone( + dns_record_match( + self.rpc_conn, + self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, + '"" "" "\\"This is a test\\""')) + + def test_update_add_padding_rpc_to_dns(self): + prefix, txt = 'pad1textrec', ['"This is a test"', '', ''] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, + '"\\"This is a test\\"" "" ""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, + '"\\"This is a test\\"" "" "" "more text"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + prefix, txt = 'pad3textrec', ['', '', '"This is a test"'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, + '"" "" "\\"This is a test\\""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + # Test is incomplete due to strlen against txt records + def test_update_add_null_char_txt_record(self): + "test adding records works" + prefix, txt = 'nulltextrec', ['NULL\x00BYTE'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, ['NULL']) + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, '"NULL"')) + + prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, ['NULL', 'NULL']) + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, '"NULL" "NULL"')) + + def test_update_add_null_char_rpc_to_dns(self): + prefix = 'rpcnulltextrec' + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, ['NULL']) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + def test_update_add_hex_char_txt_record(self): + "test adding records works" + prefix, txt = 'hextextrec', ['HIGH\xFFBYTE'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')) + + def test_update_add_hex_rpc_to_dns(self): + prefix, txt = 'hextextrec', ['HIGH\xFFBYTE'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + def test_update_add_slash_txt_record(self): + "test adding records works" + prefix, txt = 'slashtextrec', ['Th\\=is=is a test'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')) + + # This test fails against Windows as it eliminates slashes in RPC + # One typical use for a slash is in records like 'var=value' to + # escape '=' characters. + def test_update_add_slash_rpc_to_dns(self): + prefix, txt = 'slashtextrec', ['Th\\=is=is a test'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + def test_update_add_two_txt_records(self): + "test adding two txt records works" + prefix, txt = 'textrec2', ['"This is a test"', + '"and this is a test, too"'] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' + + ' "\\"and this is a test, too\\""')) + + def test_update_add_two_rpc_to_dns(self): + prefix, txt = 'textrec2', ['"This is a test"', + '"and this is a test, too"'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, + '"\\"This is a test\\""' + + ' "\\"and this is a test, too\\""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + def test_update_add_empty_txt_records(self): + "test adding two txt records works" + prefix, txt = 'emptytextrec', [] + p = self.make_txt_update(prefix, txt) + (response, response_packet) =\ + self.dns_transaction_udp(p, host=server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.check_query_txt(prefix, txt) + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, + self.get_dns_domain(), + "%s.%s" % (prefix, self.get_dns_domain()), + dnsp.DNS_TYPE_TXT, '')) + + def test_update_add_empty_rpc_to_dns(self): + prefix, txt = 'rpcemptytextrec', [] + + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = record_from_string(dnsp.DNS_TYPE_TXT, '') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + add_rec_buf, + None) + except WERRORError as e: + self.fail(str(e)) + + try: + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + name, + None, + add_rec_buf) + + +TestProgram(module=__name__, opts=subunitopts) |