From 5f0615a601e014ed2da5c8117a9bc6df0bc6aad8 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 20 Jun 2024 06:07:29 +0200 Subject: Merging upstream version 2:4.20.2+dfsg. Signed-off-by: Daniel Baumann --- python/samba/gp/gpclass.py | 4 +- python/samba/tests/blackbox/misc_dfs_widelink.py | 86 ++++++ python/samba/tests/dns_base.py | 213 ++++++++++----- python/samba/tests/dns_tkey.py | 325 ++++++++++++++++++++--- python/samba/tests/join.py | 2 +- python/samba/tests/ntacls.py | 2 +- 6 files changed, 531 insertions(+), 101 deletions(-) create mode 100644 python/samba/tests/blackbox/misc_dfs_widelink.py (limited to 'python') diff --git a/python/samba/gp/gpclass.py b/python/samba/gp/gpclass.py index 08be472..d86aace 100644 --- a/python/samba/gp/gpclass.py +++ b/python/samba/gp/gpclass.py @@ -805,9 +805,7 @@ def site_dn_for_machine(samdb, dc_hostname, lp, creds, hostname): samlogon_response = ndr_unpack(nbt.netlogon_samlogon_response, bytes(res.msgs[0]['Netlogon'][0])) - if samlogon_response.ntver not in [nbt.NETLOGON_NT_VERSION_5EX, - (nbt.NETLOGON_NT_VERSION_1 - | nbt.NETLOGON_NT_VERSION_5EX)]: + if not (samlogon_response.ntver & nbt.NETLOGON_NT_VERSION_5EX): raise RuntimeError('site_dn_for_machine: Invalid NtVer in ' + 'netlogon_samlogon_response') diff --git a/python/samba/tests/blackbox/misc_dfs_widelink.py b/python/samba/tests/blackbox/misc_dfs_widelink.py new file mode 100644 index 0000000..7948590 --- /dev/null +++ b/python/samba/tests/blackbox/misc_dfs_widelink.py @@ -0,0 +1,86 @@ +# Blackbox tests for DFS (widelink) +# +# Copyright (C) Noel Power noel.power@suse.com +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +from samba.tests import BlackboxTestCase, BlackboxProcessError +from samba.samba3 import param as s3param + +from samba.credentials import Credentials + +import os + +class DfsWidelinkBlockboxTestBase(BlackboxTestCase): + + def setUp(self): + super().setUp() + self.lp = s3param.get_context() + self.server = os.environ["SERVER"] + self.user = os.environ["USER"] + self.passwd = os.environ["PASSWORD"] + self.creds = Credentials() + self.creds.guess(self.lp) + self.creds.set_username(self.user) + self.creds.set_password(self.passwd) + self.testdir = os.getenv("TESTDIR", "msdfs-share-wl") + self.share = os.getenv("SHARE", "msdfs-share-wl") + self.dirpath = os.path.join(os.environ["LOCAL_PATH"],self.testdir) + # allow a custom teardown function to be defined + self.cleanup = None + self.cleanup_args = [] + + def tearDown(self): + try: + if (self.cleanup): + self.cleanup(self.cleanup_args) + except Exception as e: + print("remote remove failed: %s" % str(e)) + + def build_test_cmd(self, cmd, args): + cmd = [cmd, "-U%s%%%s" % (self.user, self.passwd)] + cmd.extend(args) + return cmd + + def test_ci_chdir(self): + parent_dir = "msdfs-src1" + dirs = [parent_dir, parent_dir.upper()] + # try as named dir first then try upper-cased version + for adir in dirs: + smbclient_args = self.build_test_cmd("smbclient", ["//%s/%s" % (self.server, self.share), "-c", "cd %s" % (adir)]) + try: + out_str = self.check_output(smbclient_args) + except BlackboxProcessError as e: + print(str(e)) + self.fail(str(e)) + + def test_nested_chdir(self): + parent_dir = "dfshop1" + child_dir = "dfshop2" + smbclient_args = self.build_test_cmd("smbclient", ["//%s/%s" % (self.server, self.share), "-c", "cd %s/%s" % (parent_dir,child_dir)]) + try: + out_str = self.check_output(smbclient_args) + except BlackboxProcessError as e: + print(str(e)) + self.fail(str(e)) + + def test_enumerate_dfs_link(self): + smbclient_args = self.build_test_cmd("smbclient", ["//%s/%s" % (self.server, self.share), "-c", "dir"]) + try: + out_str = self.check_output(smbclient_args) + except BlackboxProcessError as e: + print(str(e)) + self.fail(str(e)) + out_str = out_str.decode() + self.assertIn("msdfs-src1", out_str) diff --git a/python/samba/tests/dns_base.py b/python/samba/tests/dns_base.py index d320a0e..43a62b1 100644 --- a/python/samba/tests/dns_base.py +++ b/python/samba/tests/dns_base.py @@ -20,6 +20,7 @@ from samba.tests import TestCaseInTempDir from samba.dcerpc import dns, dnsp from samba import gensec, tests from samba import credentials +from samba import NTSTATUSError import struct import samba.ndr as ndr import random @@ -76,6 +77,24 @@ class DNSTest(TestCaseInTempDir): self.assertEqual(p_opcode, opcode, "Expected OPCODE %s, got %s" % (opcode, p_opcode)) + def assert_dns_flags_equals(self, packet, flags): + "Helper function to check opcode" + p_flags = packet.operation & (~(dns.DNS_OPCODE|dns.DNS_RCODE)) + self.assertEqual(p_flags, flags, "Expected FLAGS %02x, got %02x" % + (flags, p_flags)) + + def assert_echoed_dns_error(self, request, response, response_p, rcode): + + request_p = ndr.ndr_pack(request) + + self.assertEqual(response.id, request.id) + self.assert_dns_rcode_equals(response, rcode) + self.assert_dns_opcode_equals(response, request.operation & dns.DNS_OPCODE) + self.assert_dns_flags_equals(response, + (request.operation | dns.DNS_FLAG_REPLY) & (~(dns.DNS_OPCODE|dns.DNS_RCODE))) + self.assertEqual(len(response_p), len(request_p)) + self.assertEqual(response_p[4:], request_p[4:]) + def make_name_packet(self, opcode, qid=None): "Helper creating a dns.name_packet" p = dns.name_packet() @@ -112,6 +131,8 @@ class DNSTest(TestCaseInTempDir): return self.creds.get_realm().lower() def dns_transaction_udp(self, packet, host, + allow_remaining=False, + allow_truncated=False, dump=False, timeout=None): "send a DNS query and read the reply" s = None @@ -128,8 +149,22 @@ class DNSTest(TestCaseInTempDir): recv_packet = s.recv(2048, 0) if dump: print(self.hexdump(recv_packet)) - response = ndr.ndr_unpack(dns.name_packet, recv_packet) + if allow_truncated: + # with allow_remaining + # we add some zero bytes + # in order to also parse truncated + # responses + recv_packet_p = recv_packet + 32*b"\x00" + allow_remaining = True + else: + recv_packet_p = recv_packet + response = ndr.ndr_unpack(dns.name_packet, recv_packet_p, + allow_remaining=allow_remaining) return (response, recv_packet) + except RuntimeError as re: + if s is not None: + s.close() + raise AssertionError(re) finally: if s is not None: s.close() @@ -151,11 +186,26 @@ class DNSTest(TestCaseInTempDir): tcp_packet += send_packet s.sendall(tcp_packet) - recv_packet = s.recv(0xffff + 2, 0) + recv_packet = b'' + length = None + for i in range(0, 2 + 0xffff): + if len(recv_packet) >= 2: + length, = struct.unpack('!H', recv_packet[0:2]) + remaining = 2 + length + else: + remaining = 2 + 12 + remaining -= len(recv_packet) + if remaining == 0: + break + recv_packet += s.recv(remaining, 0) if dump: print(self.hexdump(recv_packet)) response = ndr.ndr_unpack(dns.name_packet, recv_packet[2:]) + except RuntimeError as re: + if s is not None: + s.close() + raise AssertionError(re) finally: if s is not None: s.close() @@ -217,18 +267,41 @@ class DNSTKeyTest(DNSTest): self.creds.set_username(tests.env_get_var_value('USERNAME')) self.creds.set_password(tests.env_get_var_value('PASSWORD')) self.creds.set_kerberos_state(credentials.MUST_USE_KERBEROS) + + self.unpriv_creds = None + self.newrecname = "tkeytsig.%s" % self.get_dns_domain() - def tkey_trans(self, creds=None): + def get_unpriv_creds(self): + if self.unpriv_creds is not None: + return self.unpriv_creds + + self.unpriv_creds = credentials.Credentials() + self.unpriv_creds.guess(self.lp_ctx) + self.unpriv_creds.set_username(tests.env_get_var_value('USERNAME_UNPRIV')) + self.unpriv_creds.set_password(tests.env_get_var_value('PASSWORD_UNPRIV')) + self.unpriv_creds.set_kerberos_state(credentials.MUST_USE_KERBEROS) + + return self.unpriv_creds + + def tkey_trans(self, creds=None, algorithm_name="gss-tsig", + tkey_req_in_answers=False, + expected_rcode=dns.DNS_RCODE_OK): "Do a TKEY transaction and establish a gensec context" if creds is None: creds = self.creds - self.key_name = "%s.%s" % (uuid.uuid4(), self.get_dns_domain()) + mech = 'spnego' + + tkey = {} + tkey['name'] = "%s.%s" % (uuid.uuid4(), self.get_dns_domain()) + tkey['creds'] = creds + tkey['mech'] = mech + tkey['algorithm'] = algorithm_name p = self.make_name_packet(dns.DNS_OPCODE_QUERY) - q = self.make_name_question(self.key_name, + q = self.make_name_question(tkey['name'], dns.DNS_QTYPE_TKEY, dns.DNS_QCLASS_IN) questions = [] @@ -236,30 +309,30 @@ class DNSTKeyTest(DNSTest): self.finish_name_packet(p, questions) r = dns.res_rec() - r.name = self.key_name + r.name = tkey['name'] r.rr_type = dns.DNS_QTYPE_TKEY r.rr_class = dns.DNS_QCLASS_IN r.ttl = 0 r.length = 0xffff rdata = dns.tkey_record() - rdata.algorithm = "gss-tsig" + rdata.algorithm = algorithm_name rdata.inception = int(time.time()) rdata.expiration = int(time.time()) + 60 * 60 rdata.mode = dns.DNS_TKEY_MODE_GSSAPI rdata.error = 0 rdata.other_size = 0 - self.g = gensec.Security.start_client(self.settings) - self.g.set_credentials(creds) - self.g.set_target_service("dns") - self.g.set_target_hostname(self.server) - self.g.want_feature(gensec.FEATURE_SIGN) - self.g.start_mech_by_name("spnego") + tkey['gensec'] = gensec.Security.start_client(self.settings) + tkey['gensec'].set_credentials(creds) + tkey['gensec'].set_target_service("dns") + tkey['gensec'].set_target_hostname(self.server) + tkey['gensec'].want_feature(gensec.FEATURE_SIGN) + tkey['gensec'].start_mech_by_name(tkey['mech']) finished = False client_to_server = b"" - (finished, server_to_client) = self.g.update(client_to_server) + (finished, server_to_client) = tkey['gensec'].update(client_to_server) self.assertFalse(finished) data = [x if isinstance(x, int) else ord(x) for x in list(server_to_client)] @@ -268,56 +341,76 @@ class DNSTKeyTest(DNSTest): r.rdata = rdata additional = [r] - p.arcount = 1 - p.additional = additional + if tkey_req_in_answers: + p.ancount = 1 + p.answers = additional + else: + p.arcount = 1 + p.additional = additional (response, response_packet) =\ self.dns_transaction_tcp(p, self.server_ip) + if expected_rcode != dns.DNS_RCODE_OK: + self.assert_echoed_dns_error(p, response, response_packet, expected_rcode) + return self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) tkey_record = response.answers[0].rdata server_to_client = bytes(tkey_record.key_data) - (finished, client_to_server) = self.g.update(server_to_client) + (finished, client_to_server) = tkey['gensec'].update(server_to_client) self.assertTrue(finished) + self.tkey = tkey + self.verify_packet(response, response_packet) def verify_packet(self, response, response_packet, request_mac=b""): + self.assertEqual(response.arcount, 1) self.assertEqual(response.additional[0].rr_type, dns.DNS_QTYPE_TSIG) + if self.tkey['algorithm'] == "gss-tsig": + gss_tsig = True + else: + gss_tsig = False + + request_mac_len = b"" + if len(request_mac) > 0 and gss_tsig: + request_mac_len = struct.pack('!H', len(request_mac)) + tsig_record = response.additional[0].rdata mac = bytes(tsig_record.mac) + self.assertEqual(tsig_record.original_id, response.id) + self.assertEqual(tsig_record.mac_size, len(mac)) + # Cut off tsig record from dns response packet for MAC verification # and reset additional record count. - key_name_len = len(self.key_name) + 2 - tsig_record_len = len(ndr.ndr_pack(tsig_record)) + key_name_len + 10 - - # convert str/bytes to a list (of string char or int) - # so it can be modified - response_packet_list = [x if isinstance(x, int) else ord(x) for x in response_packet] - del response_packet_list[-tsig_record_len:] - response_packet_list[11] = 0 - - # convert modified list (of string char or int) to str/bytes - response_packet_wo_tsig = bytes(response_packet_list) + response_copy = ndr.ndr_deepcopy(response) + response_copy.arcount = 0 + response_packet_wo_tsig = ndr.ndr_pack(response_copy) fake_tsig = dns.fake_tsig_rec() - fake_tsig.name = self.key_name + fake_tsig.name = self.tkey['name'] fake_tsig.rr_class = dns.DNS_QCLASS_ANY fake_tsig.ttl = 0 fake_tsig.time_prefix = tsig_record.time_prefix fake_tsig.time = tsig_record.time fake_tsig.algorithm_name = tsig_record.algorithm_name fake_tsig.fudge = tsig_record.fudge - fake_tsig.error = 0 - fake_tsig.other_size = 0 + fake_tsig.error = tsig_record.error + fake_tsig.other_size = tsig_record.other_size + fake_tsig.other_data = tsig_record.other_data fake_tsig_packet = ndr.ndr_pack(fake_tsig) - data = request_mac + response_packet_wo_tsig + fake_tsig_packet - self.g.check_packet(data, data, mac) + data = request_mac_len + request_mac + response_packet_wo_tsig + fake_tsig_packet + try: + self.tkey['gensec'].check_packet(data, data, mac) + except NTSTATUSError as nt: + raise AssertionError(nt) - def sign_packet(self, packet, key_name): + def sign_packet(self, packet, key_name, + algorithm_name="gss-tsig", + bad_sig=False): "Sign a packet, calculate a MAC and add TSIG record" packet_data = ndr.ndr_pack(packet) @@ -327,18 +420,35 @@ class DNSTKeyTest(DNSTest): fake_tsig.ttl = 0 fake_tsig.time_prefix = 0 fake_tsig.time = int(time.time()) - fake_tsig.algorithm_name = "gss-tsig" + fake_tsig.algorithm_name = algorithm_name fake_tsig.fudge = 300 fake_tsig.error = 0 fake_tsig.other_size = 0 fake_tsig_packet = ndr.ndr_pack(fake_tsig) data = packet_data + fake_tsig_packet - mac = self.g.sign_packet(data, data) + mac = self.tkey['gensec'].sign_packet(data, data) mac_list = [x if isinstance(x, int) else ord(x) for x in list(mac)] + if bad_sig: + if len(mac) > 8: + mac_list[-8] = mac_list[-8] ^ 0xff + if len(mac) > 7: + mac_list[-7] = ord('b') + if len(mac) > 6: + mac_list[-6] = ord('a') + if len(mac) > 5: + mac_list[-5] = ord('d') + if len(mac) > 4: + mac_list[-4] = ord('m') + if len(mac) > 3: + mac_list[-3] = ord('a') + if len(mac) > 2: + mac_list[-2] = ord('c') + if len(mac) > 1: + mac_list[-1] = mac_list[-1] ^ 0xff rdata = dns.tsig_record() - rdata.algorithm_name = "gss-tsig" + rdata.algorithm_name = algorithm_name rdata.time_prefix = 0 rdata.time = fake_tsig.time rdata.fudge = 300 @@ -363,33 +473,10 @@ class DNSTKeyTest(DNSTest): return mac def bad_sign_packet(self, packet, key_name): - """Add bad signature for a packet by bitflipping - the final byte in the MAC""" - - mac_list = [x if isinstance(x, int) else ord(x) for x in list("badmac")] - - rdata = dns.tsig_record() - rdata.algorithm_name = "gss-tsig" - rdata.time_prefix = 0 - rdata.time = int(time.time()) - rdata.fudge = 300 - rdata.original_id = packet.id - rdata.error = 0 - rdata.other_size = 0 - rdata.mac = mac_list - rdata.mac_size = len(mac_list) + """Add bad signature for a packet by + bitflipping and hardcoding bytes at the end of the MAC""" - r = dns.res_rec() - r.name = key_name - r.rr_type = dns.DNS_QTYPE_TSIG - r.rr_class = dns.DNS_QCLASS_ANY - r.ttl = 0 - r.length = 0xffff - r.rdata = rdata - - additional = [r] - packet.additional = additional - packet.arcount = 1 + return self.sign_packet(packet, key_name, bad_sig=True) def search_record(self, name): p = self.make_name_packet(dns.DNS_OPCODE_QUERY) diff --git a/python/samba/tests/dns_tkey.py b/python/samba/tests/dns_tkey.py index 69af14d..f8417ea 100644 --- a/python/samba/tests/dns_tkey.py +++ b/python/samba/tests/dns_tkey.py @@ -19,6 +19,7 @@ import sys import optparse import samba.getopt as options +import samba.ndr as ndr from samba.dcerpc import dns from samba.tests.subunitrun import SubunitOptions, TestProgram from samba.tests.dns_base import DNSTKeyTest @@ -55,17 +56,34 @@ class TestDNSUpdates(DNSTKeyTest): self.server_ip = server_ip super().setUp() - def test_tkey(self): - "test DNS TKEY handshake" + def test_tkey_gss_tsig(self): + "test DNS TKEY handshake with gss-tsig" self.tkey_trans() + def test_tkey_gss_microsoft_com(self): + "test DNS TKEY handshake with gss.microsoft.com" + + self.tkey_trans(algorithm_name="gss.microsoft.com") + + def test_tkey_invalid_gss_TSIG(self): + "test DNS TKEY handshake with invalid gss-TSIG" + + self.tkey_trans(algorithm_name="gss-TSIG", + expected_rcode=dns.DNS_RCODE_REFUSED) + + def test_tkey_invalid_gss_MICROSOFT_com(self): + "test DNS TKEY handshake with invalid gss.MICROSOFT.com" + + self.tkey_trans(algorithm_name="gss.MICROSOFT.com", + expected_rcode=dns.DNS_RCODE_REFUSED) + def test_update_wo_tsig(self): "test DNS update without TSIG record" p = self.make_update_request() (response, response_p) = self.dns_transaction_udp(p, self.server_ip) - self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED) + self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED) rcode = self.search_record(self.newrecname) self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) @@ -78,10 +96,7 @@ class TestDNSUpdates(DNSTKeyTest): p = self.make_update_request() self.sign_packet(p, "badkey") (response, response_p) = self.dns_transaction_udp(p, self.server_ip) - self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH) - tsig_record = response.additional[0].rdata - self.assertEqual(tsig_record.error, dns.DNS_RCODE_BADKEY) - self.assertEqual(tsig_record.mac_size, 0) + self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED) rcode = self.search_record(self.newrecname) self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) @@ -92,23 +107,149 @@ class TestDNSUpdates(DNSTKeyTest): self.tkey_trans() p = self.make_update_request() - self.bad_sign_packet(p, self.key_name) + self.bad_sign_packet(p, self.tkey['name']) (response, response_p) = self.dns_transaction_udp(p, self.server_ip) - self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH) - tsig_record = response.additional[0].rdata - self.assertEqual(tsig_record.error, dns.DNS_RCODE_BADSIG) - self.assertEqual(tsig_record.mac_size, 0) + self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED) rcode = self.search_record(self.newrecname) self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) - def test_update_tsig(self): - "test DNS update with correct TSIG record" + def test_update_tsig_bad_algorithm(self): + "test DNS update with a TSIG record with a bad algorithm" self.tkey_trans() + algorithm_name = "gss-TSIG" p = self.make_update_request() - mac = self.sign_packet(p, self.key_name) + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED) + + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_tsig_changed_algorithm1(self): + "test DNS update with a TSIG record with a changed algorithm" + + algorithm_name = "gss-tsig" + self.tkey_trans(algorithm_name=algorithm_name) + + # Now delete the record, it's most likely + # a no-op as it should not be there if the test + # runs the first time + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Now do an update with the algorithm_name + # changed in the requests TSIG message. + p = self.make_update_request() + algorithm_name = "gss.microsoft.com" + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) + algorithm_name = "gss-tsig" + (response, response_p) = self.dns_transaction_udp(p, self.server_ip, + allow_remaining=True) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Check the record is around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now delete the record, with the original + # algorithm_name used in the tkey exchange + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_tsig_changed_algorithm2(self): + "test DNS update with a TSIG record with a changed algorithm" + + algorithm_name = "gss.microsoft.com" + self.tkey_trans(algorithm_name=algorithm_name) + + # Now delete the record, it's most likely + # a no-op as it should not be there if the test + # runs the first time + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Now do an update with the algorithm_name + # changed in the requests TSIG message. + p = self.make_update_request() + algorithm_name = "gss-tsig" + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) + algorithm_name = "gss.microsoft.com" + (response, response_p) = self.dns_transaction_udp(p, self.server_ip, + allow_truncated=True) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + response_p_pack = ndr.ndr_pack(response) + if len(response_p_pack) == len(response_p): + self.verify_packet(response, response_p, mac) + else: + pass # Windows bug + + # Check the record is around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now delete the record, with the original + # algorithm_name used in the tkey exchange + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_gss_tsig_tkey_req_additional(self): + "test DNS update with correct gss-tsig record tkey req in additional" + + self.tkey_trans() + + p = self.make_update_request() + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Check the record is around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now delete the record + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # check it's gone + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_gss_tsig_tkey_req_answers(self): + "test DNS update with correct gss-tsig record tsig req in answers" + + self.tkey_trans(tkey_req_in_answers=True) + + p = self.make_update_request() + mac = self.sign_packet(p, self.tkey['name']) (response, response_p) = self.dns_transaction_udp(p, self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.verify_packet(response, response_p, mac) @@ -119,7 +260,66 @@ class TestDNSUpdates(DNSTKeyTest): # Now delete the record p = self.make_update_request(delete=True) - mac = self.sign_packet(p, self.key_name) + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # check it's gone + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_gss_microsoft_com_tkey_req_additional(self): + "test DNS update with correct gss.microsoft.com record tsig req in additional" + + algorithm_name = "gss.microsoft.com" + self.tkey_trans(algorithm_name=algorithm_name) + + p = self.make_update_request() + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Check the record is around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now delete the record + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # check it's gone + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_gss_microsoft_com_tkey_req_answers(self): + "test DNS update with correct gss.microsoft.com record tsig req in answers" + + algorithm_name = "gss.microsoft.com" + self.tkey_trans(algorithm_name=algorithm_name, + tkey_req_in_answers=True) + + p = self.make_update_request() + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Check the record is around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now delete the record + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name'], + algorithm_name=algorithm_name) (response, response_p) = self.dns_transaction_udp(p, self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.verify_packet(response, response_p, mac) @@ -131,35 +331,28 @@ class TestDNSUpdates(DNSTKeyTest): def test_update_tsig_windows(self): "test DNS update with correct TSIG record (follow Windows pattern)" - newrecname = "win" + self.newrecname + p = self.make_update_request() + rr_class = dns.DNS_QCLASS_IN ttl = 1200 - p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) - q = self.make_name_question(self.get_dns_domain(), - dns.DNS_QTYPE_SOA, - dns.DNS_QCLASS_IN) - questions = [] - questions.append(q) - self.finish_name_packet(p, questions) - updates = [] r = dns.res_rec() - r.name = newrecname + r.name = self.newrecname r.rr_type = dns.DNS_QTYPE_A r.rr_class = dns.DNS_QCLASS_ANY r.ttl = 0 r.length = 0 updates.append(r) r = dns.res_rec() - r.name = newrecname + r.name = self.newrecname r.rr_type = dns.DNS_QTYPE_AAAA r.rr_class = dns.DNS_QCLASS_ANY r.ttl = 0 r.length = 0 updates.append(r) r = dns.res_rec() - r.name = newrecname + r.name = self.newrecname r.rr_type = dns.DNS_QTYPE_A r.rr_class = rr_class r.ttl = ttl @@ -171,7 +364,7 @@ class TestDNSUpdates(DNSTKeyTest): prereqs = [] r = dns.res_rec() - r.name = newrecname + r.name = self.newrecname r.rr_type = dns.DNS_QTYPE_CNAME r.rr_class = dns.DNS_QCLASS_NONE r.ttl = 0 @@ -181,21 +374,87 @@ class TestDNSUpdates(DNSTKeyTest): p.answers = prereqs (response, response_p) = self.dns_transaction_udp(p, self.server_ip) - self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED) + self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED) self.tkey_trans() - mac = self.sign_packet(p, self.key_name) + mac = self.sign_packet(p, self.tkey['name']) (response, response_p) = self.dns_transaction_udp(p, self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.verify_packet(response, response_p, mac) # Check the record is around - rcode = self.search_record(newrecname) + rcode = self.search_record(self.newrecname) self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) # Now delete the record + delete_updates = [] + r = dns.res_rec() + r.name = self.newrecname + r.rr_type = dns.DNS_QTYPE_A + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + r.rdata = "10.1.45.64" + delete_updates.append(r) + p = self.make_update_request(delete=True) + p.nscount = len(delete_updates) + p.nsrecs = delete_updates + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # check it's gone + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN) + + def test_update_tsig_record_access_denied(self): + """test DNS update with a TSIG record where the user does not have + permissions to change the record""" + + self.tkey_trans() + adm_tkey = self.tkey + + # First create the record as admin + p = self.make_update_request() + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Check the record is around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now update the same values as normal user + # should work without error + self.tkey_trans(creds=self.get_unpriv_creds()) + unpriv_tkey = self.tkey + + p = self.make_update_request() + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.verify_packet(response, response_p, mac) + + # Check the record is still around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now try to delete the record a normal user (should fail) + p = self.make_update_request(delete=True) + mac = self.sign_packet(p, self.tkey['name']) + (response, response_p) = self.dns_transaction_udp(p, self.server_ip) + self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED) + + # Check the record is still around + rcode = self.search_record(self.newrecname) + self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK) + + # Now delete the record as admin + self.tkey = adm_tkey p = self.make_update_request(delete=True) - mac = self.sign_packet(p, self.key_name) + mac = self.sign_packet(p, self.tkey['name']) (response, response_p) = self.dns_transaction_udp(p, self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.verify_packet(response, response_p, mac) diff --git a/python/samba/tests/join.py b/python/samba/tests/join.py index b47bc70..b04cb4a 100644 --- a/python/samba/tests/join.py +++ b/python/samba/tests/join.py @@ -156,7 +156,7 @@ class JoinTestCase(DNSTKeyTest): p.nscount = len(updates) p.nsrecs = updates - mac = self.sign_packet(p, self.key_name) + mac = self.sign_packet(p, self.tkey['name']) (response, response_p) = self.dns_transaction_udp(p, self.server_ip) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.verify_packet(response, response_p, mac) diff --git a/python/samba/tests/ntacls.py b/python/samba/tests/ntacls.py index 0b7963d..6e2adda 100644 --- a/python/samba/tests/ntacls.py +++ b/python/samba/tests/ntacls.py @@ -83,5 +83,5 @@ class NtaclsTests(TestCaseInTempDir): lp = LoadParm() open(self.tempf, 'w').write("empty") lp.set("posix:eadb", os.path.join(self.tempdir, "eadbtest.tdb")) - self.assertRaises(Exception, setntacl, lp, self.tempf, NTACL_SDDL, + self.assertRaises(PermissionError, setntacl, lp, self.tempf, NTACL_SDDL, DOMAIN_SID, self.session_info, "native") -- cgit v1.2.3