# Unix SMB/CIFS implementation. # Copyright (C) Kai Blin 2011 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # from samba import dsdb from samba.ndr import ndr_unpack, ndr_pack from samba.samdb import SamDB from samba.auth import system_session import ldb from ldb import ERR_OPERATIONS_ERROR import os import sys import struct import socket import samba.ndr as ndr from samba import credentials from samba.dcerpc import dns, dnsp, dnsserver from samba.dnsserver import TXTRecord from samba.dnsserver import record_from_string, dns_record_match from samba.tests.subunitrun import SubunitOptions, TestProgram from samba import werror, WERRORError from samba.tests.dns_base import DNSTest import samba.getopt as options import optparse parser = optparse.OptionParser("dns.py [options]") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) # This timeout only has relevance when testing against Windows # Format errors tend to return patchy responses, so a timeout is needed. parser.add_option("--timeout", type="int", dest="timeout", help="Specify timeout for DNS requests") # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) subunitopts = SubunitOptions(parser) parser.add_option_group(subunitopts) opts, args = parser.parse_args() lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) timeout = opts.timeout if len(args) < 2: parser.print_usage() sys.exit(1) server_name = args[0] server_ip = args[1] creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE) class TestSimpleQueries(DNSTest): def setUp(self): super().setUp() global server, server_ip, lp, creds, timeout self.server = server_name self.server_ip = server_ip self.lp = lp self.creds = creds self.timeout = timeout def test_one_a_query(self): "create a query packet containing one query record" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 1) self.assertEqual(response.answers[0].rdata, self.server_ip) def test_one_SOA_query(self): "create a query packet containing one query record for the SOA" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s" % (self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 1) self.assertEqual( response.answers[0].rdata.mname.upper(), ("%s.%s" % (self.server, self.get_dns_domain())).upper()) def test_one_a_query_tcp(self): "create a query packet containing one query record via TCP" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_tcp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 1) self.assertEqual(response.answers[0].rdata, self.server_ip) def test_one_mx_query(self): "create a query packet causing an empty RCODE_OK answer" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 0) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "invalid-%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 0) def test_two_queries(self): "create a query packet containing two query records" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) questions.append(q) name = "%s.%s" % ('bogusname', self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) try: (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) except socket.timeout: # Windows chooses not to respond to incorrectly formatted queries. # Although this appears to be non-deterministic even for the same # request twice, it also appears to be based on a how poorly the # request is formatted. pass def test_qtype_all_query(self): "create a QTYPE_ALL query" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) num_answers = 1 dc_ipv6 = os.getenv('SERVER_IPV6') if dc_ipv6 is not None: num_answers += 1 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, num_answers) self.assertEqual(response.answers[0].rdata, self.server_ip) if dc_ipv6 is not None: self.assertEqual(response.answers[1].rdata, dc_ipv6) def test_qclass_none_query(self): "create a QCLASS_NONE query" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question( name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE) questions.append(q) self.finish_name_packet(p, questions) try: (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP) except socket.timeout: # Windows chooses not to respond to incorrectly formatted queries. # Although this appears to be non-deterministic even for the same # request twice, it also appears to be based on a how poorly the # request is formatted. pass def test_soa_hostname_query(self): "create a SOA query for a hostname" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) # We don't get SOA records for single hosts self.assertEqual(response.ancount, 0) # But we do respond with an authority section self.assertEqual(response.nscount, 1) def test_soa_unknown_hostname_query(self): "create a SOA query for an unknown hostname" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "foobar.%s" % (self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) # We don't get SOA records for single hosts self.assertEqual(response.ancount, 0) # But we do respond with an authority section self.assertEqual(response.nscount, 1) def test_soa_domain_query(self): "create a SOA query for a domain" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = self.get_dns_domain() q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 1) self.assertEqual(response.answers[0].rdata.minimum, 3600) class TestDNSUpdates(DNSTest): def setUp(self): super().setUp() global server, server_ip, lp, creds, timeout self.server = server_name self.server_ip = server_ip self.lp = lp self.creds = creds self.timeout = timeout def test_two_updates(self): "create two update requests" p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = "%s.%s" % (self.server, self.get_dns_domain()) u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) updates.append(u) name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) try: (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) except socket.timeout: # Windows chooses not to respond to incorrectly formatted queries. # Although this appears to be non-deterministic even for the same # request twice, it also appears to be based on a how poorly the # request is formatted. pass def test_update_wrong_qclass(self): "create update with DNS_QCLASS_NONE" p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE) updates.append(u) self.finish_name_packet(p, updates) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP) def test_update_prereq_with_non_null_ttl(self): "test update with a non-null TTL" p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) prereqs = [] r = dns.res_rec() r.name = "%s.%s" % (self.server, self.get_dns_domain()) r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_NONE r.ttl = 1 r.length = 0 prereqs.append(r) p.ancount = len(prereqs) p.answers = prereqs try: (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) except socket.timeout: # Windows chooses not to respond to incorrectly formatted queries. # Although this appears to be non-deterministic even for the same # request twice, it also appears to be based on a how poorly the # request is formatted. pass def test_update_prereq_with_non_null_length(self): "test update with a non-null length" p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) prereqs = [] r = dns.res_rec() r.name = "%s.%s" % (self.server, self.get_dns_domain()) r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_ANY r.ttl = 0 r.length = 1 prereqs.append(r) p.ancount = len(prereqs) p.answers = prereqs (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET) def test_update_prereq_nonexisting_name(self): "test update with a nonexisting name" p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) prereqs = [] r = dns.res_rec() r.name = "idontexist.%s" % self.get_dns_domain() r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_ANY r.ttl = 0 r.length = 0 prereqs.append(r) p.ancount = len(prereqs) p.answers = prereqs (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET) def test_update_add_txt_record(self): "test adding records works" prefix, txt = 'textrec', ['"This is a test"'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) def test_delete_record(self): "Test if deleting records works" NAME = "deleterec.%s" % self.get_dns_domain() # First, create a record to make sure we have a record to delete. p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = NAME r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 r.length = 0xffff rdata = self.make_txt_record(['"This is a test"']) r.rdata = rdata updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # Now check the record is around p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # Now delete the record p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = NAME r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_NONE r.ttl = 0 r.length = 0xffff rdata = self.make_txt_record(['"This is a test"']) r.rdata = rdata updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # And finally check it's gone p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) def test_readd_record(self): "Test if adding, deleting and then re-adding a records works" NAME = "readdrec.%s" % self.get_dns_domain() # Create the record p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = NAME r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 r.length = 0xffff rdata = self.make_txt_record(['"This is a test"']) r.rdata = rdata updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # Now check the record is around p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # Now delete the record p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = NAME r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_NONE r.ttl = 0 r.length = 0xffff rdata = self.make_txt_record(['"This is a test"']) r.rdata = rdata updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # check it's gone p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) # recreate the record p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = NAME r.rr_type = dns.DNS_QTYPE_TXT r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 r.length = 0xffff rdata = self.make_txt_record(['"This is a test"']) r.rdata = rdata updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) # Now check the record is around p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) def test_update_add_mx_record(self): "test adding MX records works" p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = "%s" % self.get_dns_domain() r.rr_type = dns.DNS_QTYPE_MX r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 r.length = 0xffff rdata = dns.mx_record() rdata.preference = 10 rdata.exchange = 'mail.%s' % self.get_dns_domain() r.rdata = rdata updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s" % self.get_dns_domain() q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assertEqual(response.ancount, 1) ans = response.answers[0] self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX) self.assertEqual(ans.rdata.preference, 10) self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain()) class TestComplexQueries(DNSTest): def make_dns_update(self, key, value, qtype): p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) self.finish_name_packet(p, [u]) r = dns.res_rec() r.name = key r.rr_type = qtype r.rr_class = dns.DNS_QCLASS_IN r.ttl = 900 r.length = 0xffff r.rdata = value p.nscount = 1 p.nsrecs = [r] (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) def setUp(self): super().setUp() global server, server_ip, lp, creds, timeout self.server = server_name self.server_ip = server_ip self.lp = lp self.creds = creds self.timeout = timeout def test_one_a_query(self): "create a query packet containing one query record" try: # Create the record name = "cname_test.%s" % self.get_dns_domain() rdata = "%s.%s" % (self.server, self.get_dns_domain()) self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] # Check the record name = "cname_test.%s" % self.get_dns_domain() q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 2) self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) self.assertEqual(response.answers[0].rdata, "%s.%s" % (self.server, self.get_dns_domain())) self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_A) self.assertEqual(response.answers[1].rdata, self.server_ip) finally: # Delete the record p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) updates = [] name = self.get_dns_domain() u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) updates.append(u) self.finish_name_packet(p, updates) updates = [] r = dns.res_rec() r.name = "cname_test.%s" % self.get_dns_domain() r.rr_type = dns.DNS_QTYPE_CNAME r.rr_class = dns.DNS_QCLASS_NONE r.ttl = 0 r.length = 0xffff r.rdata = "%s.%s" % (self.server, self.get_dns_domain()) updates.append(r) p.nscount = len(updates) p.nsrecs = updates (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) def test_cname_two_chain(self): name0 = "cnamechain0.%s" % self.get_dns_domain() name1 = "cnamechain1.%s" % self.get_dns_domain() name2 = "cnamechain2.%s" % self.get_dns_domain() self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME) self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME) self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(name1, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 3) self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) self.assertEqual(response.answers[0].name, name1) self.assertEqual(response.answers[0].rdata, name2) self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME) self.assertEqual(response.answers[1].name, name2) self.assertEqual(response.answers[1].rdata, name0) self.assertEqual(response.answers[2].rr_type, dns.DNS_QTYPE_A) self.assertEqual(response.answers[2].rdata, self.server_ip) def test_invalid_empty_cname(self): name0 = "cnamedotprefix0.%s" % self.get_dns_domain() try: self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME) except AssertionError: pass else: self.fail("Successfully added empty CNAME, which is invalid.") def test_cname_two_chain_not_matching_qtype(self): name0 = "cnamechain0.%s" % self.get_dns_domain() name1 = "cnamechain1.%s" % self.get_dns_domain() name2 = "cnamechain2.%s" % self.get_dns_domain() self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME) self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME) self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(name1, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) # CNAME should return all intermediate results! # Only the A records exists, not the TXT. self.assertEqual(response.ancount, 2) self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) self.assertEqual(response.answers[0].name, name1) self.assertEqual(response.answers[0].rdata, name2) self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME) self.assertEqual(response.answers[1].name, name2) self.assertEqual(response.answers[1].rdata, name0) def test_cname_loop(self): cname1 = "cnamelooptestrec." + self.get_dns_domain() cname2 = "cnamelooptestrec2." + self.get_dns_domain() cname3 = "cnamelooptestrec3." + self.get_dns_domain() self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME) self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME) self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(cname1, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) max_recursion_depth = 20 self.assertEqual(len(response.answers), max_recursion_depth) # Make sure cname limit doesn't count other records. This is a generic # test called in tests below def max_rec_test(self, rtype, rec_gen): name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain()) limit = 20 num_recs_to_enter = limit + 5 for i in range(1, num_recs_to_enter+1): ip = rec_gen(i) self.make_dns_update(name, ip, rtype) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(name, rtype, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) self.assertEqual(len(response.answers), num_recs_to_enter) def test_record_limit_A(self): def ip4_gen(i): return "127.0.0." + str(i) self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen) def test_record_limit_AAAA(self): def ip6_gen(i): return "AAAA:0:0:0:0:0:0:" + str(i) self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen) def test_record_limit_SRV(self): def srv_gen(i): rec = dns.srv_record() rec.priority = 1 rec.weight = 1 rec.port = 92 rec.target = "srvtestrec" + str(i) return rec self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen) # Same as test_record_limit_A but with a preceding CNAME follow def test_cname_limit(self): cname1 = "cnamelimittestrec." + self.get_dns_domain() cname2 = "cnamelimittestrec2." + self.get_dns_domain() cname3 = "cnamelimittestrec3." + self.get_dns_domain() ip_prefix = '127.0.0.' limit = 20 num_recs_to_enter = limit + 5 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME) self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME) num_arecs_to_enter = num_recs_to_enter - 2 for i in range(1, num_arecs_to_enter+1): ip = ip_prefix + str(i) self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(cname1, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) self.assertEqual(len(response.answers), num_recs_to_enter) # ANY query on cname record shouldn't follow the link def test_cname_any_query(self): cname1 = "cnameanytestrec." + self.get_dns_domain() cname2 = "cnameanytestrec2." + self.get_dns_domain() cname3 = "cnameanytestrec3." + self.get_dns_domain() self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME) self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME) p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(cname1, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) self.assertEqual(len(response.answers), 1) self.assertEqual(response.answers[0].name, cname1) self.assertEqual(response.answers[0].rdata, cname2) class TestInvalidQueries(DNSTest): def setUp(self): super().setUp() global server, server_ip, lp, creds, timeout self.server = server_name self.server_ip = server_ip self.lp = lp self.creds = creds self.timeout = timeout def test_one_a_query(self): """send 0 bytes follows by create a query packet containing one query record""" s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) s.connect((self.server_ip, 53)) s.send(b"", 0) finally: if s is not None: s.close() p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % (self.server, self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 1) self.assertEqual(response.answers[0].rdata, self.server_ip) def test_one_a_reply(self): "send a reply instead of a query" global timeout p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] name = "%s.%s" % ('fakefakefake', self.get_dns_domain()) q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) print("asking for ", q.name) questions.append(q) self.finish_name_packet(p, questions) p.operation |= dns.DNS_FLAG_REPLY s = None try: send_packet = ndr.ndr_pack(p) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) s.settimeout(timeout) host = self.server_ip s.connect((host, 53)) tcp_packet = struct.pack('!H', len(send_packet)) tcp_packet += send_packet s.send(tcp_packet, 0) recv_packet = s.recv(0xffff + 2, 0) self.assertEqual(0, len(recv_packet)) except socket.timeout: # Windows chooses not to respond to incorrectly formatted queries. # Although this appears to be non-deterministic even for the same # request twice, it also appears to be based on a how poorly the # request is formatted. pass finally: if s is not None: s.close() class TestZones(DNSTest): def setUp(self): super().setUp() global server, server_ip, lp, creds, timeout self.server = server_name self.server_ip = server_ip self.lp = lp self.creds = creds self.timeout = timeout self.zone = "test.lan" self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip), self.lp, self.creds) self.samdb = SamDB(url="ldap://" + self.server_ip, lp=self.get_loadparm(), session_info=system_session(), credentials=self.creds) self.zone_dn = "DC=" + self.zone +\ ",CN=MicrosoftDNS,DC=DomainDNSZones," +\ str(self.samdb.get_default_basedn()) def tearDown(self): super().tearDown() try: self.delete_zone(self.zone) except RuntimeError as e: (num, string) = e.args if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: raise def make_zone_obj(self, zone, aging_enabled=False): zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() zone_create.pszZoneName = zone zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY zone_create.fAging = int(aging_enabled) zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT zone_create.fDsIntegrated = 1 zone_create.fLoadExisting = 1 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE return zone_create def create_zone(self, zone, aging_enabled=False): zone_create = self.make_zone_obj(zone, aging_enabled) try: client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN self.rpc_conn.DnssrvOperation2(client_version, 0, self.server_ip, None, 0, 'ZoneCreate', dnsserver.DNSSRV_TYPEID_ZONE_CREATE, zone_create) except WERRORError as e: self.fail(e) def set_params(self, **kwargs): for key, val in kwargs.items(): name_param = dnsserver.DNS_RPC_NAME_AND_PARAM() name_param.dwParam = val name_param.pszNodeName = key client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM try: self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, self.zone, 0, 'ResetDwordProperty', nap_type, name_param) except WERRORError as e: self.fail(str(e)) def ldap_modify_dnsrecs(self, name, func): dn = 'DC={0},{1}'.format(name, self.zone_dn) dns_recs = self.ldap_get_dns_records(name) for rec in dns_recs: func(rec) update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]} self.samdb.modify(ldb.Message.from_dict(self.samdb, update_dict, ldb.FLAG_MOD_REPLACE)) def dns_update_record(self, prefix, txt): p = self.make_txt_update(prefix, txt, self.zone) (code, response) = self.dns_transaction_udp(p, host=self.server_ip) self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK) recs = self.ldap_get_dns_records(prefix) recs = [r for r in recs if r.data.str == txt] self.assertEqual(len(recs), 1) return recs[0] def dns_tombstone(self, prefix, txt, zone): name = prefix + "." + zone to = dnsp.DnssrvRpcRecord() to.dwTimeStamp = 1000 to.wType = dnsp.DNS_TYPE_TOMBSTONE self.samdb.dns_replace(name, [to]) def ldap_get_records(self, name): # The use of SCOPE_SUBTREE here avoids raising an exception in the # 0 results case for a test below. expr = "(&(objectClass=dnsNode)(name={0}))".format(name) return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) def ldap_get_dns_records(self, name): records = self.ldap_get_records(name) return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records[0].get('dnsRecord')] def ldap_get_zone_settings(self): records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE, expression="(&(objectClass=dnsZone)" + "(name={0}))".format(self.zone), attrs=["dNSProperty"]) self.assertEqual(len(records), 1) props = [ndr_unpack(dnsp.DnsProperty, r) for r in records[0].get('dNSProperty')] # We have no choice but to repeat these here. zone_prop_ids = {0x00: "EMPTY", 0x01: "TYPE", 0x02: "ALLOW_UPDATE", 0x08: "SECURE_TIME", 0x10: "NOREFRESH_INTERVAL", 0x11: "SCAVENGING_SERVERS", 0x12: "AGING_ENABLED_TIME", 0x20: "REFRESH_INTERVAL", 0x40: "AGING_STATE", 0x80: "DELETED_FROM_HOSTNAME", 0x81: "MASTER_SERVERS", 0x82: "AUTO_NS_SERVERS", 0x83: "DCPROMO_CONVERT", 0x90: "SCAVENGING_SERVERS_DA", 0x91: "MASTER_SERVERS_DA", 0x92: "NS_SERVERS_DA", 0x100: "NODE_DBFLAGS"} return {zone_prop_ids[p.id].lower(): p.data for p in props} def set_aging(self, enable=False): self.create_zone(self.zone, aging_enabled=enable) self.set_params(NoRefreshInterval=1, RefreshInterval=1, Aging=int(bool(enable)), AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) def test_set_aging(self, enable=True, name='agingtest', txt=None): if txt is None: txt = ['test txt'] self.set_aging(enable=True) settings = self.ldap_get_zone_settings() self.assertTrue(settings['aging_state'] is not None) self.assertTrue(settings['aging_state']) rec = self.dns_update_record('agingtest', ['test txt']) self.assertNotEqual(rec.dwTimeStamp, 0) def test_set_aging_disabled(self): self.set_aging(enable=False) settings = self.ldap_get_zone_settings() self.assertTrue(settings['aging_state'] is not None) self.assertFalse(settings['aging_state']) rec = self.dns_update_record('agingtest', ['test txt']) self.assertNotEqual(rec.dwTimeStamp, 0) def test_aging_update(self, enable=True): name, txt = 'agingtest', ['test txt'] self.set_aging(enable=True) before_mod = self.dns_update_record(name, txt) if not enable: self.set_params(Aging=0) dec = 2 def mod_ts(rec): self.assertTrue(rec.dwTimeStamp > 0) rec.dwTimeStamp -= dec self.ldap_modify_dnsrecs(name, mod_ts) after_mod = self.ldap_get_dns_records(name) self.assertEqual(len(after_mod), 1) after_mod = after_mod[0] self.assertEqual(after_mod.dwTimeStamp, before_mod.dwTimeStamp - dec) after_update = self.dns_update_record(name, txt) after_should_equal = before_mod if enable else after_mod self.assertEqual(after_should_equal.dwTimeStamp, after_update.dwTimeStamp) def test_aging_update_disabled(self): self.test_aging_update(enable=False) def test_aging_refresh(self): name, txt = 'agingtest', ['test txt'] self.create_zone(self.zone, aging_enabled=True) interval = 10 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval, Aging=1, AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) before_mod = self.dns_update_record(name, txt) def mod_ts(rec): self.assertTrue(rec.dwTimeStamp > 0) rec.dwTimeStamp -= interval // 2 self.ldap_modify_dnsrecs(name, mod_ts) update_during_norefresh = self.dns_update_record(name, txt) def mod_ts(rec): self.assertTrue(rec.dwTimeStamp > 0) rec.dwTimeStamp -= interval + interval // 2 self.ldap_modify_dnsrecs(name, mod_ts) update_during_refresh = self.dns_update_record(name, txt) self.assertEqual(update_during_norefresh.dwTimeStamp, before_mod.dwTimeStamp - interval / 2) self.assertEqual(update_during_refresh.dwTimeStamp, before_mod.dwTimeStamp) def test_rpc_add_no_timestamp(self): name, txt = 'agingtest', ['test txt'] self.set_aging(enable=True) rec_buf = dnsserver.DNS_RPC_RECORD_BUF() rec_buf.rec = TXTRecord(txt) self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.zone, name, rec_buf, None) recs = self.ldap_get_dns_records(name) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].dwTimeStamp, 0) def test_static_record_dynamic_update(self): name, txt = 'agingtest', ['test txt'] txt2 = ['test txt2'] self.set_aging(enable=True) rec_buf = dnsserver.DNS_RPC_RECORD_BUF() rec_buf.rec = TXTRecord(txt) self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.zone, name, rec_buf, None) rec2 = self.dns_update_record(name, txt2) self.assertEqual(rec2.dwTimeStamp, 0) def test_dynamic_record_static_update(self): name, txt = 'agingtest', ['test txt'] txt2 = ['test txt2'] txt3 = ['test txt3'] self.set_aging(enable=True) self.dns_update_record(name, txt) rec_buf = dnsserver.DNS_RPC_RECORD_BUF() rec_buf.rec = TXTRecord(txt2) self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.zone, name, rec_buf, None) self.dns_update_record(name, txt3) recs = self.ldap_get_dns_records(name) # Put in dict because ldap recs might be out of order recs = {str(r.data.str): r for r in recs} self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0) self.assertEqual(recs[str(txt2)].dwTimeStamp, 0) self.assertEqual(recs[str(txt3)].dwTimeStamp, 0) def test_dns_tombstone_custom_match_rule(self): lp = self.get_loadparm() self.samdb = SamDB(url=lp.samdb_url(), lp=lp, session_info=system_session(), credentials=self.creds) name, txt = 'agingtest', ['test txt'] name2, txt2 = 'agingtest2', ['test txt2'] name3, txt3 = 'agingtest3', ['test txt3'] name4, txt4 = 'agingtest4', ['test txt4'] name5, txt5 = 'agingtest5', ['test txt5'] self.create_zone(self.zone, aging_enabled=True) interval = 10 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval, Aging=1, AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) self.dns_update_record(name, txt) self.dns_update_record(name2, txt) self.dns_update_record(name2, txt2) self.dns_update_record(name3, txt) self.dns_update_record(name3, txt2) last_update = self.dns_update_record(name3, txt3) # Modify txt1 of the first 2 names def mod_ts(rec): if rec.data.str == txt: rec.dwTimeStamp -= 2 self.ldap_modify_dnsrecs(name, mod_ts) self.ldap_modify_dnsrecs(name2, mod_ts) # create a static dns record. rec_buf = dnsserver.DNS_RPC_RECORD_BUF() rec_buf.rec = TXTRecord(txt4) self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.zone, name4, rec_buf, None) # Create a tomb stoned record. self.dns_update_record(name5, txt5) self.dns_tombstone(name5, txt5, self.zone) self.ldap_get_dns_records(name3) expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})" expr = expr.format(int(last_update.dwTimeStamp) - 1) try: res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) except ldb.LdbError as e: self.fail(str(e)) updated_names = {str(r.get('name')) for r in res} self.assertEqual(updated_names, set([name, name2])) def test_dns_tombstone_custom_match_rule_no_records(self): lp = self.get_loadparm() self.samdb = SamDB(url=lp.samdb_url(), lp=lp, session_info=system_session(), credentials=self.creds) self.create_zone(self.zone, aging_enabled=True) interval = 10 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval, Aging=1, AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})" expr = expr.format(1) try: res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) except ldb.LdbError as e: self.fail(str(e)) self.assertEqual(0, len(res)) def test_dns_tombstone_custom_match_rule_fail(self): self.create_zone(self.zone, aging_enabled=True) samdb = SamDB(url=lp.samdb_url(), lp=lp, session_info=system_session(), credentials=self.creds) # Property name in not dnsRecord expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)" res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) self.assertEqual(len(res), 0) # No value for tombstone time try: expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)" res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) self.assertEqual(len(res), 0) self.fail("Exception: ldb.ldbError not generated") except ldb.LdbError as e: (num, msg) = e.args self.assertEqual(num, ERR_OPERATIONS_ERROR) # Tombstone time = - try: expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)" res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) self.assertEqual(len(res), 0) self.fail("Exception: ldb.ldbError not generated") except ldb.LdbError as e: (num, _) = e.args self.assertEqual(num, ERR_OPERATIONS_ERROR) # Tombstone time longer than 64 characters try: expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})" expr = expr.format("1" * 65) res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) self.assertEqual(len(res), 0) self.fail("Exception: ldb.ldbError not generated") except ldb.LdbError as e: (num, _) = e.args self.assertEqual(num, ERR_OPERATIONS_ERROR) # Non numeric Tombstone time try: expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)" res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) self.assertEqual(len(res), 0) self.fail("Exception: ldb.ldbError not generated") except ldb.LdbError as e: (num, _) = e.args self.assertEqual(num, ERR_OPERATIONS_ERROR) # Non system session try: db = SamDB(url="ldap://" + self.server_ip, lp=self.get_loadparm(), credentials=self.creds) expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)" res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["*"]) self.assertEqual(len(res), 0) self.fail("Exception: ldb.ldbError not generated") except ldb.LdbError as e: (num, _) = e.args self.assertEqual(num, ERR_OPERATIONS_ERROR) def test_basic_scavenging(self): lp = self.get_loadparm() self.samdb = SamDB(url=lp.samdb_url(), lp=lp, session_info=system_session(), credentials=self.creds) self.create_zone(self.zone, aging_enabled=True) interval = 1 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval, Aging=1, AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) name, txt = 'agingtest', ['test txt'] name2, txt2 = 'agingtest2', ['test txt2'] name3, txt3 = 'agingtest3', ['test txt3'] name4, txt4 = 'agingtest4', ['test txt4'] name5, txt5 = 'agingtest5', ['test txt5'] self.dns_update_record(name, txt) self.dns_update_record(name2, txt) self.dns_update_record(name2, txt2) self.dns_update_record(name3, txt) self.dns_update_record(name3, txt2) # Create a tombstoned record. self.dns_update_record(name4, txt4) self.dns_tombstone(name4, txt4, self.zone) records = self.ldap_get_records(name4) self.assertIn("dNSTombstoned", records[0]) self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") # Create an un-tombstoned record, with dnsTombstoned: FALSE self.dns_update_record(name5, txt5) self.dns_tombstone(name5, txt5, self.zone) self.dns_update_record(name5, txt5) records = self.ldap_get_records(name5) self.assertIn("dNSTombstoned", records[0]) self.assertEqual(records[0]["dNSTombstoned"][0], b"FALSE") last_add = self.dns_update_record(name3, txt3) def mod_ts(rec): self.assertTrue(rec.dwTimeStamp > 0) if rec.data.str == txt: rec.dwTimeStamp -= interval * 5 def mod_ts_all(rec): rec.dwTimeStamp -= interval * 5 self.ldap_modify_dnsrecs(name, mod_ts) self.ldap_modify_dnsrecs(name2, mod_ts) self.ldap_modify_dnsrecs(name3, mod_ts) self.ldap_modify_dnsrecs(name5, mod_ts_all) self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None))) dsdb._scavenge_dns_records(self.samdb) recs = self.ldap_get_dns_records(name) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) records = self.ldap_get_records(name) self.assertIn("dNSTombstoned", records[0]) self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") recs = self.ldap_get_dns_records(name2) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT) self.assertEqual(recs[0].data.str, txt2) recs = self.ldap_get_dns_records(name3) self.assertEqual(len(recs), 2) txts = {str(r.data.str) for r in recs} self.assertEqual(txts, {str(txt2), str(txt3)}) self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT) self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT) recs = self.ldap_get_dns_records(name4) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) records = self.ldap_get_records(name4) self.assertIn("dNSTombstoned", records[0]) self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") recs = self.ldap_get_dns_records(name5) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) records = self.ldap_get_records(name5) self.assertIn("dNSTombstoned", records[0]) self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE") for make_it_work in [False, True]: inc = -1 if make_it_work else 1 def mod_ts(rec): rec.data = (last_add.dwTimeStamp - 24 * 14) + inc self.ldap_modify_dnsrecs(name, mod_ts) dsdb._dns_delete_tombstones(self.samdb) recs = self.ldap_get_records(name) if make_it_work: self.assertEqual(len(recs), 0) else: self.assertEqual(len(recs), 1) def test_fully_qualified_zone(self): def create_zone_expect_exists(zone): try: zone_create = self.make_zone_obj(zone) client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE self.rpc_conn.DnssrvOperation2(client_version, 0, self.server_ip, None, 0, 'ZoneCreate', zc_type, zone_create) except WERRORError as e: enum, _ = e.args if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS: self.fail(e) return self.fail("Zone {} should already exist".format(zone)) # Create unqualified, then check creating qualified fails. self.create_zone(self.zone) create_zone_expect_exists(self.zone + '.') # Same again, but the other way around. self.create_zone(self.zone + '2.') create_zone_expect_exists(self.zone + '2') client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY tid = dnsserver.DNSSRV_TYPEID_DWORD typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version, 0, self.server_ip, None, 'EnumZones', tid, request_filter) self.delete_zone(self.zone) self.delete_zone(self.zone + '2') # Two zones should've been created, neither of them fully qualified. zones_we_just_made = [] zones = [str(z.pszZoneName) for z in res.ZoneArray] for zone in zones: if zone.startswith(self.zone): zones_we_just_made.append(zone) self.assertEqual(len(zones_we_just_made), 2) self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone}) def delete_zone(self, zone): self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, zone, 0, 'DeleteZoneFromDs', dnsserver.DNSSRV_TYPEID_NULL, None) def test_soa_query(self): zone = "test.lan" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) questions = [] q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) questions.append(q) self.finish_name_packet(p, questions) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) # Windows returns OK while BIND logically seems to return NXDOMAIN self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 0) self.create_zone(zone) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 1) self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_SOA) self.delete_zone(zone) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) self.assertEqual(response.ancount, 0) def set_dnsProperty_zero_length(self, dnsproperty_id): records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE, expression="(&(objectClass=dnsZone)" + "(name={0}))".format(self.zone), attrs=["dNSProperty"]) self.assertEqual(len(records), 1) props = [ndr_unpack(dnsp.DnsProperty, r) for r in records[0].get('dNSProperty')] new_props = [ndr.ndr_pack(p) for p in props if p.id == dnsproperty_id] zero_length_p = dnsp.DnsProperty_short() zero_length_p.id = dnsproperty_id zero_length_p.namelength = 1 zero_length_p.name = 1 new_props += [ndr.ndr_pack(zero_length_p)] dn = records[0].dn update_dict = {'dn': dn, 'dnsProperty': new_props} self.samdb.modify(ldb.Message.from_dict(self.samdb, update_dict, ldb.FLAG_MOD_REPLACE)) def test_update_while_dnsProperty_zero_length(self): self.create_zone(self.zone) self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE) rec = self.dns_update_record('dnspropertytest', ['test txt']) self.assertNotEqual(rec.dwTimeStamp, 0) def test_enum_zones_while_dnsProperty_zero_length(self): self.create_zone(self.zone) self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE) client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY tid = dnsserver.DNSSRV_TYPEID_DWORD typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version, 0, self.server_ip, None, 'EnumZones', tid, request_filter) def test_rpc_zone_update_while_dnsProperty_zero_length(self): self.create_zone(self.zone) self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE) self.set_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_SECURE) def test_rpc_zone_update_while_other_dnsProperty_zero_length(self): self.create_zone(self.zone) self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_MASTER_SERVERS_DA) self.set_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_SECURE) class TestRPCRoundtrip(DNSTest): def setUp(self): super().setUp() global server, server_ip, lp, creds self.server = server_name self.server_ip = server_ip self.lp = lp self.creds = creds self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip), self.lp, self.creds) def rpc_update(self, fqn=None, data=None, wType=None, delete=False): fqn = fqn or ("rpctestrec." + self.get_dns_domain()) rec = record_from_string(wType, data) add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec add_arg = add_rec_buf del_arg = None if delete: add_arg = None del_arg = add_rec_buf self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), fqn, add_arg, del_arg) def test_rpc_self_referencing_cname(self): cname = "cnametest2_unqual_rec_loop" cname_fqn = "%s.%s" % (cname, self.get_dns_domain()) try: self.rpc_update(fqn=cname, data=cname_fqn, wType=dnsp.DNS_TYPE_CNAME, delete=True) except WERRORError as e: if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST: self.fail("RPC DNS gaven wrong error on pre-test cleanup " "for self referencing CNAME: %s" % e.args[0]) try: self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn) except WERRORError as e: if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP: self.fail("RPC DNS gaven wrong error on insertion of " "self referencing CNAME: %s" % e.args[0]) return self.fail("RPC DNS allowed insertion of self referencing CNAME") def test_update_add_txt_rpc_to_dns(self): prefix, txt = 'rpctextrec', ['"This is a test"'] name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) def test_update_add_null_padded_txt_record(self): "test adding records works" prefix, txt = 'pad1textrec', ['"This is a test"', '', ''] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone( dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')) prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone( dns_record_match( self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')) prefix, txt = 'pad3textrec', ['', '', '"This is a test"'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone( dns_record_match( self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')) def test_update_add_padding_rpc_to_dns(self): prefix, txt = 'pad1textrec', ['"This is a test"', '', ''] prefix = 'rpc' + prefix name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text'] prefix = 'rpc' + prefix name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) prefix, txt = 'pad3textrec', ['', '', '"This is a test"'] prefix = 'rpc' + prefix name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) # Test is incomplete due to strlen against txt records def test_update_add_null_char_txt_record(self): "test adding records works" prefix, txt = 'nulltextrec', ['NULL\x00BYTE'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, ['NULL']) self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"NULL"')) prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, ['NULL', 'NULL']) self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"NULL" "NULL"')) def test_update_add_null_char_rpc_to_dns(self): prefix = 'rpcnulltextrec' name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, ['NULL']) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) def test_update_add_hex_char_txt_record(self): "test adding records works" prefix, txt = 'hextextrec', ['HIGH\xFFBYTE'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')) def test_update_add_hex_rpc_to_dns(self): prefix, txt = 'hextextrec', ['HIGH\xFFBYTE'] prefix = 'rpc' + prefix name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) def test_update_add_slash_txt_record(self): "test adding records works" prefix, txt = 'slashtextrec', ['Th\\=is=is a test'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')) # This test fails against Windows as it eliminates slashes in RPC # One typical use for a slash is in records like 'var=value' to # escape '=' characters. def test_update_add_slash_rpc_to_dns(self): prefix, txt = 'slashtextrec', ['Th\\=is=is a test'] prefix = 'rpc' + prefix name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) def test_update_add_two_txt_records(self): "test adding two txt records works" prefix, txt = 'textrec2', ['"This is a test"', '"and this is a test, too"'] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' + ' "\\"and this is a test, too\\""')) def test_update_add_two_rpc_to_dns(self): prefix, txt = 'textrec2', ['"This is a test"', '"and this is a test, too"'] prefix = 'rpc' + prefix name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' + ' "\\"and this is a test, too\\""') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) def test_update_add_empty_txt_records(self): "test adding two txt records works" prefix, txt = 'emptytextrec', [] p = self.make_txt_update(prefix, txt) (response, response_packet) =\ self.dns_transaction_udp(p, host=server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '')) def test_update_add_empty_rpc_to_dns(self): prefix, txt = 'rpcemptytextrec', [] name = "%s.%s" % (prefix, self.get_dns_domain()) rec = record_from_string(dnsp.DNS_TYPE_TXT, '') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, add_rec_buf, None) except WERRORError as e: self.fail(str(e)) try: self.check_query_txt(prefix, txt) finally: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server_ip, self.get_dns_domain(), name, None, add_rec_buf) TestProgram(module=__name__, opts=subunitopts)