summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-20 04:07:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-20 04:07:29 +0000
commit5f0615a601e014ed2da5c8117a9bc6df0bc6aad8 (patch)
treec271a7d4ed696305c1e34c72ce426f14f9ee6be4 /python
parentReleasing progress-linux version 2:4.20.1+dfsg-5~progress7.99u1. (diff)
downloadsamba-5f0615a601e014ed2da5c8117a9bc6df0bc6aad8.tar.xz
samba-5f0615a601e014ed2da5c8117a9bc6df0bc6aad8.zip
Merging upstream version 2:4.20.2+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/gp/gpclass.py4
-rw-r--r--python/samba/tests/blackbox/misc_dfs_widelink.py86
-rw-r--r--python/samba/tests/dns_base.py213
-rw-r--r--python/samba/tests/dns_tkey.py325
-rw-r--r--python/samba/tests/join.py2
-rw-r--r--python/samba/tests/ntacls.py2
6 files changed, 531 insertions, 101 deletions
diff --git a/python/samba/gp/gpclass.py b/python/samba/gp/gpclass.py
index 08be472..d86aace 100644
--- a/python/samba/gp/gpclass.py
+++ b/python/samba/gp/gpclass.py
@@ -805,9 +805,7 @@ def site_dn_for_machine(samdb, dc_hostname, lp, creds, hostname):
samlogon_response = ndr_unpack(nbt.netlogon_samlogon_response,
bytes(res.msgs[0]['Netlogon'][0]))
- if samlogon_response.ntver not in [nbt.NETLOGON_NT_VERSION_5EX,
- (nbt.NETLOGON_NT_VERSION_1
- | nbt.NETLOGON_NT_VERSION_5EX)]:
+ if not (samlogon_response.ntver & nbt.NETLOGON_NT_VERSION_5EX):
raise RuntimeError('site_dn_for_machine: Invalid NtVer in '
+ 'netlogon_samlogon_response')
diff --git a/python/samba/tests/blackbox/misc_dfs_widelink.py b/python/samba/tests/blackbox/misc_dfs_widelink.py
new file mode 100644
index 0000000..7948590
--- /dev/null
+++ b/python/samba/tests/blackbox/misc_dfs_widelink.py
@@ -0,0 +1,86 @@
+# Blackbox tests for DFS (widelink)
+#
+# Copyright (C) Noel Power noel.power@suse.com
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+from samba.tests import BlackboxTestCase, BlackboxProcessError
+from samba.samba3 import param as s3param
+
+from samba.credentials import Credentials
+
+import os
+
+class DfsWidelinkBlockboxTestBase(BlackboxTestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.lp = s3param.get_context()
+ self.server = os.environ["SERVER"]
+ self.user = os.environ["USER"]
+ self.passwd = os.environ["PASSWORD"]
+ self.creds = Credentials()
+ self.creds.guess(self.lp)
+ self.creds.set_username(self.user)
+ self.creds.set_password(self.passwd)
+ self.testdir = os.getenv("TESTDIR", "msdfs-share-wl")
+ self.share = os.getenv("SHARE", "msdfs-share-wl")
+ self.dirpath = os.path.join(os.environ["LOCAL_PATH"],self.testdir)
+ # allow a custom teardown function to be defined
+ self.cleanup = None
+ self.cleanup_args = []
+
+ def tearDown(self):
+ try:
+ if (self.cleanup):
+ self.cleanup(self.cleanup_args)
+ except Exception as e:
+ print("remote remove failed: %s" % str(e))
+
+ def build_test_cmd(self, cmd, args):
+ cmd = [cmd, "-U%s%%%s" % (self.user, self.passwd)]
+ cmd.extend(args)
+ return cmd
+
+ def test_ci_chdir(self):
+ parent_dir = "msdfs-src1"
+ dirs = [parent_dir, parent_dir.upper()]
+ # try as named dir first then try upper-cased version
+ for adir in dirs:
+ smbclient_args = self.build_test_cmd("smbclient", ["//%s/%s" % (self.server, self.share), "-c", "cd %s" % (adir)])
+ try:
+ out_str = self.check_output(smbclient_args)
+ except BlackboxProcessError as e:
+ print(str(e))
+ self.fail(str(e))
+
+ def test_nested_chdir(self):
+ parent_dir = "dfshop1"
+ child_dir = "dfshop2"
+ smbclient_args = self.build_test_cmd("smbclient", ["//%s/%s" % (self.server, self.share), "-c", "cd %s/%s" % (parent_dir,child_dir)])
+ try:
+ out_str = self.check_output(smbclient_args)
+ except BlackboxProcessError as e:
+ print(str(e))
+ self.fail(str(e))
+
+ def test_enumerate_dfs_link(self):
+ smbclient_args = self.build_test_cmd("smbclient", ["//%s/%s" % (self.server, self.share), "-c", "dir"])
+ try:
+ out_str = self.check_output(smbclient_args)
+ except BlackboxProcessError as e:
+ print(str(e))
+ self.fail(str(e))
+ out_str = out_str.decode()
+ self.assertIn("msdfs-src1", out_str)
diff --git a/python/samba/tests/dns_base.py b/python/samba/tests/dns_base.py
index d320a0e..43a62b1 100644
--- a/python/samba/tests/dns_base.py
+++ b/python/samba/tests/dns_base.py
@@ -20,6 +20,7 @@ from samba.tests import TestCaseInTempDir
from samba.dcerpc import dns, dnsp
from samba import gensec, tests
from samba import credentials
+from samba import NTSTATUSError
import struct
import samba.ndr as ndr
import random
@@ -76,6 +77,24 @@ class DNSTest(TestCaseInTempDir):
self.assertEqual(p_opcode, opcode, "Expected OPCODE %s, got %s" %
(opcode, p_opcode))
+ def assert_dns_flags_equals(self, packet, flags):
+ "Helper function to check opcode"
+ p_flags = packet.operation & (~(dns.DNS_OPCODE|dns.DNS_RCODE))
+ self.assertEqual(p_flags, flags, "Expected FLAGS %02x, got %02x" %
+ (flags, p_flags))
+
+ def assert_echoed_dns_error(self, request, response, response_p, rcode):
+
+ request_p = ndr.ndr_pack(request)
+
+ self.assertEqual(response.id, request.id)
+ self.assert_dns_rcode_equals(response, rcode)
+ self.assert_dns_opcode_equals(response, request.operation & dns.DNS_OPCODE)
+ self.assert_dns_flags_equals(response,
+ (request.operation | dns.DNS_FLAG_REPLY) & (~(dns.DNS_OPCODE|dns.DNS_RCODE)))
+ self.assertEqual(len(response_p), len(request_p))
+ self.assertEqual(response_p[4:], request_p[4:])
+
def make_name_packet(self, opcode, qid=None):
"Helper creating a dns.name_packet"
p = dns.name_packet()
@@ -112,6 +131,8 @@ class DNSTest(TestCaseInTempDir):
return self.creds.get_realm().lower()
def dns_transaction_udp(self, packet, host,
+ allow_remaining=False,
+ allow_truncated=False,
dump=False, timeout=None):
"send a DNS query and read the reply"
s = None
@@ -128,8 +149,22 @@ class DNSTest(TestCaseInTempDir):
recv_packet = s.recv(2048, 0)
if dump:
print(self.hexdump(recv_packet))
- response = ndr.ndr_unpack(dns.name_packet, recv_packet)
+ if allow_truncated:
+ # with allow_remaining
+ # we add some zero bytes
+ # in order to also parse truncated
+ # responses
+ recv_packet_p = recv_packet + 32*b"\x00"
+ allow_remaining = True
+ else:
+ recv_packet_p = recv_packet
+ response = ndr.ndr_unpack(dns.name_packet, recv_packet_p,
+ allow_remaining=allow_remaining)
return (response, recv_packet)
+ except RuntimeError as re:
+ if s is not None:
+ s.close()
+ raise AssertionError(re)
finally:
if s is not None:
s.close()
@@ -151,11 +186,26 @@ class DNSTest(TestCaseInTempDir):
tcp_packet += send_packet
s.sendall(tcp_packet)
- recv_packet = s.recv(0xffff + 2, 0)
+ recv_packet = b''
+ length = None
+ for i in range(0, 2 + 0xffff):
+ if len(recv_packet) >= 2:
+ length, = struct.unpack('!H', recv_packet[0:2])
+ remaining = 2 + length
+ else:
+ remaining = 2 + 12
+ remaining -= len(recv_packet)
+ if remaining == 0:
+ break
+ recv_packet += s.recv(remaining, 0)
if dump:
print(self.hexdump(recv_packet))
response = ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
+ except RuntimeError as re:
+ if s is not None:
+ s.close()
+ raise AssertionError(re)
finally:
if s is not None:
s.close()
@@ -217,18 +267,41 @@ class DNSTKeyTest(DNSTest):
self.creds.set_username(tests.env_get_var_value('USERNAME'))
self.creds.set_password(tests.env_get_var_value('PASSWORD'))
self.creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
+
+ self.unpriv_creds = None
+
self.newrecname = "tkeytsig.%s" % self.get_dns_domain()
- def tkey_trans(self, creds=None):
+ def get_unpriv_creds(self):
+ if self.unpriv_creds is not None:
+ return self.unpriv_creds
+
+ self.unpriv_creds = credentials.Credentials()
+ self.unpriv_creds.guess(self.lp_ctx)
+ self.unpriv_creds.set_username(tests.env_get_var_value('USERNAME_UNPRIV'))
+ self.unpriv_creds.set_password(tests.env_get_var_value('PASSWORD_UNPRIV'))
+ self.unpriv_creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
+
+ return self.unpriv_creds
+
+ def tkey_trans(self, creds=None, algorithm_name="gss-tsig",
+ tkey_req_in_answers=False,
+ expected_rcode=dns.DNS_RCODE_OK):
"Do a TKEY transaction and establish a gensec context"
if creds is None:
creds = self.creds
- self.key_name = "%s.%s" % (uuid.uuid4(), self.get_dns_domain())
+ mech = 'spnego'
+
+ tkey = {}
+ tkey['name'] = "%s.%s" % (uuid.uuid4(), self.get_dns_domain())
+ tkey['creds'] = creds
+ tkey['mech'] = mech
+ tkey['algorithm'] = algorithm_name
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
- q = self.make_name_question(self.key_name,
+ q = self.make_name_question(tkey['name'],
dns.DNS_QTYPE_TKEY,
dns.DNS_QCLASS_IN)
questions = []
@@ -236,30 +309,30 @@ class DNSTKeyTest(DNSTest):
self.finish_name_packet(p, questions)
r = dns.res_rec()
- r.name = self.key_name
+ r.name = tkey['name']
r.rr_type = dns.DNS_QTYPE_TKEY
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 0
r.length = 0xffff
rdata = dns.tkey_record()
- rdata.algorithm = "gss-tsig"
+ rdata.algorithm = algorithm_name
rdata.inception = int(time.time())
rdata.expiration = int(time.time()) + 60 * 60
rdata.mode = dns.DNS_TKEY_MODE_GSSAPI
rdata.error = 0
rdata.other_size = 0
- self.g = gensec.Security.start_client(self.settings)
- self.g.set_credentials(creds)
- self.g.set_target_service("dns")
- self.g.set_target_hostname(self.server)
- self.g.want_feature(gensec.FEATURE_SIGN)
- self.g.start_mech_by_name("spnego")
+ tkey['gensec'] = gensec.Security.start_client(self.settings)
+ tkey['gensec'].set_credentials(creds)
+ tkey['gensec'].set_target_service("dns")
+ tkey['gensec'].set_target_hostname(self.server)
+ tkey['gensec'].want_feature(gensec.FEATURE_SIGN)
+ tkey['gensec'].start_mech_by_name(tkey['mech'])
finished = False
client_to_server = b""
- (finished, server_to_client) = self.g.update(client_to_server)
+ (finished, server_to_client) = tkey['gensec'].update(client_to_server)
self.assertFalse(finished)
data = [x if isinstance(x, int) else ord(x) for x in list(server_to_client)]
@@ -268,56 +341,76 @@ class DNSTKeyTest(DNSTest):
r.rdata = rdata
additional = [r]
- p.arcount = 1
- p.additional = additional
+ if tkey_req_in_answers:
+ p.ancount = 1
+ p.answers = additional
+ else:
+ p.arcount = 1
+ p.additional = additional
(response, response_packet) =\
self.dns_transaction_tcp(p, self.server_ip)
+ if expected_rcode != dns.DNS_RCODE_OK:
+ self.assert_echoed_dns_error(p, response, response_packet, expected_rcode)
+ return
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
tkey_record = response.answers[0].rdata
server_to_client = bytes(tkey_record.key_data)
- (finished, client_to_server) = self.g.update(server_to_client)
+ (finished, client_to_server) = tkey['gensec'].update(server_to_client)
self.assertTrue(finished)
+ self.tkey = tkey
+
self.verify_packet(response, response_packet)
def verify_packet(self, response, response_packet, request_mac=b""):
+ self.assertEqual(response.arcount, 1)
self.assertEqual(response.additional[0].rr_type, dns.DNS_QTYPE_TSIG)
+ if self.tkey['algorithm'] == "gss-tsig":
+ gss_tsig = True
+ else:
+ gss_tsig = False
+
+ request_mac_len = b""
+ if len(request_mac) > 0 and gss_tsig:
+ request_mac_len = struct.pack('!H', len(request_mac))
+
tsig_record = response.additional[0].rdata
mac = bytes(tsig_record.mac)
+ self.assertEqual(tsig_record.original_id, response.id)
+ self.assertEqual(tsig_record.mac_size, len(mac))
+
# Cut off tsig record from dns response packet for MAC verification
# and reset additional record count.
- key_name_len = len(self.key_name) + 2
- tsig_record_len = len(ndr.ndr_pack(tsig_record)) + key_name_len + 10
-
- # convert str/bytes to a list (of string char or int)
- # so it can be modified
- response_packet_list = [x if isinstance(x, int) else ord(x) for x in response_packet]
- del response_packet_list[-tsig_record_len:]
- response_packet_list[11] = 0
-
- # convert modified list (of string char or int) to str/bytes
- response_packet_wo_tsig = bytes(response_packet_list)
+ response_copy = ndr.ndr_deepcopy(response)
+ response_copy.arcount = 0
+ response_packet_wo_tsig = ndr.ndr_pack(response_copy)
fake_tsig = dns.fake_tsig_rec()
- fake_tsig.name = self.key_name
+ fake_tsig.name = self.tkey['name']
fake_tsig.rr_class = dns.DNS_QCLASS_ANY
fake_tsig.ttl = 0
fake_tsig.time_prefix = tsig_record.time_prefix
fake_tsig.time = tsig_record.time
fake_tsig.algorithm_name = tsig_record.algorithm_name
fake_tsig.fudge = tsig_record.fudge
- fake_tsig.error = 0
- fake_tsig.other_size = 0
+ fake_tsig.error = tsig_record.error
+ fake_tsig.other_size = tsig_record.other_size
+ fake_tsig.other_data = tsig_record.other_data
fake_tsig_packet = ndr.ndr_pack(fake_tsig)
- data = request_mac + response_packet_wo_tsig + fake_tsig_packet
- self.g.check_packet(data, data, mac)
+ data = request_mac_len + request_mac + response_packet_wo_tsig + fake_tsig_packet
+ try:
+ self.tkey['gensec'].check_packet(data, data, mac)
+ except NTSTATUSError as nt:
+ raise AssertionError(nt)
- def sign_packet(self, packet, key_name):
+ def sign_packet(self, packet, key_name,
+ algorithm_name="gss-tsig",
+ bad_sig=False):
"Sign a packet, calculate a MAC and add TSIG record"
packet_data = ndr.ndr_pack(packet)
@@ -327,18 +420,35 @@ class DNSTKeyTest(DNSTest):
fake_tsig.ttl = 0
fake_tsig.time_prefix = 0
fake_tsig.time = int(time.time())
- fake_tsig.algorithm_name = "gss-tsig"
+ fake_tsig.algorithm_name = algorithm_name
fake_tsig.fudge = 300
fake_tsig.error = 0
fake_tsig.other_size = 0
fake_tsig_packet = ndr.ndr_pack(fake_tsig)
data = packet_data + fake_tsig_packet
- mac = self.g.sign_packet(data, data)
+ mac = self.tkey['gensec'].sign_packet(data, data)
mac_list = [x if isinstance(x, int) else ord(x) for x in list(mac)]
+ if bad_sig:
+ if len(mac) > 8:
+ mac_list[-8] = mac_list[-8] ^ 0xff
+ if len(mac) > 7:
+ mac_list[-7] = ord('b')
+ if len(mac) > 6:
+ mac_list[-6] = ord('a')
+ if len(mac) > 5:
+ mac_list[-5] = ord('d')
+ if len(mac) > 4:
+ mac_list[-4] = ord('m')
+ if len(mac) > 3:
+ mac_list[-3] = ord('a')
+ if len(mac) > 2:
+ mac_list[-2] = ord('c')
+ if len(mac) > 1:
+ mac_list[-1] = mac_list[-1] ^ 0xff
rdata = dns.tsig_record()
- rdata.algorithm_name = "gss-tsig"
+ rdata.algorithm_name = algorithm_name
rdata.time_prefix = 0
rdata.time = fake_tsig.time
rdata.fudge = 300
@@ -363,33 +473,10 @@ class DNSTKeyTest(DNSTest):
return mac
def bad_sign_packet(self, packet, key_name):
- """Add bad signature for a packet by bitflipping
- the final byte in the MAC"""
-
- mac_list = [x if isinstance(x, int) else ord(x) for x in list("badmac")]
-
- rdata = dns.tsig_record()
- rdata.algorithm_name = "gss-tsig"
- rdata.time_prefix = 0
- rdata.time = int(time.time())
- rdata.fudge = 300
- rdata.original_id = packet.id
- rdata.error = 0
- rdata.other_size = 0
- rdata.mac = mac_list
- rdata.mac_size = len(mac_list)
+ """Add bad signature for a packet by
+ bitflipping and hardcoding bytes at the end of the MAC"""
- r = dns.res_rec()
- r.name = key_name
- r.rr_type = dns.DNS_QTYPE_TSIG
- r.rr_class = dns.DNS_QCLASS_ANY
- r.ttl = 0
- r.length = 0xffff
- r.rdata = rdata
-
- additional = [r]
- packet.additional = additional
- packet.arcount = 1
+ return self.sign_packet(packet, key_name, bad_sig=True)
def search_record(self, name):
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
diff --git a/python/samba/tests/dns_tkey.py b/python/samba/tests/dns_tkey.py
index 69af14d..f8417ea 100644
--- a/python/samba/tests/dns_tkey.py
+++ b/python/samba/tests/dns_tkey.py
@@ -19,6 +19,7 @@
import sys
import optparse
import samba.getopt as options
+import samba.ndr as ndr
from samba.dcerpc import dns
from samba.tests.subunitrun import SubunitOptions, TestProgram
from samba.tests.dns_base import DNSTKeyTest
@@ -55,17 +56,34 @@ class TestDNSUpdates(DNSTKeyTest):
self.server_ip = server_ip
super().setUp()
- def test_tkey(self):
- "test DNS TKEY handshake"
+ def test_tkey_gss_tsig(self):
+ "test DNS TKEY handshake with gss-tsig"
self.tkey_trans()
+ def test_tkey_gss_microsoft_com(self):
+ "test DNS TKEY handshake with gss.microsoft.com"
+
+ self.tkey_trans(algorithm_name="gss.microsoft.com")
+
+ def test_tkey_invalid_gss_TSIG(self):
+ "test DNS TKEY handshake with invalid gss-TSIG"
+
+ self.tkey_trans(algorithm_name="gss-TSIG",
+ expected_rcode=dns.DNS_RCODE_REFUSED)
+
+ def test_tkey_invalid_gss_MICROSOFT_com(self):
+ "test DNS TKEY handshake with invalid gss.MICROSOFT.com"
+
+ self.tkey_trans(algorithm_name="gss.MICROSOFT.com",
+ expected_rcode=dns.DNS_RCODE_REFUSED)
+
def test_update_wo_tsig(self):
"test DNS update without TSIG record"
p = self.make_update_request()
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED)
+ self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
rcode = self.search_record(self.newrecname)
self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
@@ -78,10 +96,7 @@ class TestDNSUpdates(DNSTKeyTest):
p = self.make_update_request()
self.sign_packet(p, "badkey")
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH)
- tsig_record = response.additional[0].rdata
- self.assertEqual(tsig_record.error, dns.DNS_RCODE_BADKEY)
- self.assertEqual(tsig_record.mac_size, 0)
+ self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
rcode = self.search_record(self.newrecname)
self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
@@ -92,23 +107,149 @@ class TestDNSUpdates(DNSTKeyTest):
self.tkey_trans()
p = self.make_update_request()
- self.bad_sign_packet(p, self.key_name)
+ self.bad_sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH)
- tsig_record = response.additional[0].rdata
- self.assertEqual(tsig_record.error, dns.DNS_RCODE_BADSIG)
- self.assertEqual(tsig_record.mac_size, 0)
+ self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
rcode = self.search_record(self.newrecname)
self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
- def test_update_tsig(self):
- "test DNS update with correct TSIG record"
+ def test_update_tsig_bad_algorithm(self):
+ "test DNS update with a TSIG record with a bad algorithm"
self.tkey_trans()
+ algorithm_name = "gss-TSIG"
p = self.make_update_request()
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
+
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_tsig_changed_algorithm1(self):
+ "test DNS update with a TSIG record with a changed algorithm"
+
+ algorithm_name = "gss-tsig"
+ self.tkey_trans(algorithm_name=algorithm_name)
+
+ # Now delete the record, it's most likely
+ # a no-op as it should not be there if the test
+ # runs the first time
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Now do an update with the algorithm_name
+ # changed in the requests TSIG message.
+ p = self.make_update_request()
+ algorithm_name = "gss.microsoft.com"
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
+ algorithm_name = "gss-tsig"
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip,
+ allow_remaining=True)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Check the record is around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now delete the record, with the original
+ # algorithm_name used in the tkey exchange
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_tsig_changed_algorithm2(self):
+ "test DNS update with a TSIG record with a changed algorithm"
+
+ algorithm_name = "gss.microsoft.com"
+ self.tkey_trans(algorithm_name=algorithm_name)
+
+ # Now delete the record, it's most likely
+ # a no-op as it should not be there if the test
+ # runs the first time
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Now do an update with the algorithm_name
+ # changed in the requests TSIG message.
+ p = self.make_update_request()
+ algorithm_name = "gss-tsig"
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
+ algorithm_name = "gss.microsoft.com"
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip,
+ allow_truncated=True)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ response_p_pack = ndr.ndr_pack(response)
+ if len(response_p_pack) == len(response_p):
+ self.verify_packet(response, response_p, mac)
+ else:
+ pass # Windows bug
+
+ # Check the record is around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now delete the record, with the original
+ # algorithm_name used in the tkey exchange
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'], algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_gss_tsig_tkey_req_additional(self):
+ "test DNS update with correct gss-tsig record tkey req in additional"
+
+ self.tkey_trans()
+
+ p = self.make_update_request()
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Check the record is around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now delete the record
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # check it's gone
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_gss_tsig_tkey_req_answers(self):
+ "test DNS update with correct gss-tsig record tsig req in answers"
+
+ self.tkey_trans(tkey_req_in_answers=True)
+
+ p = self.make_update_request()
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
@@ -119,7 +260,66 @@ class TestDNSUpdates(DNSTKeyTest):
# Now delete the record
p = self.make_update_request(delete=True)
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # check it's gone
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_gss_microsoft_com_tkey_req_additional(self):
+ "test DNS update with correct gss.microsoft.com record tsig req in additional"
+
+ algorithm_name = "gss.microsoft.com"
+ self.tkey_trans(algorithm_name=algorithm_name)
+
+ p = self.make_update_request()
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Check the record is around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now delete the record
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # check it's gone
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_gss_microsoft_com_tkey_req_answers(self):
+ "test DNS update with correct gss.microsoft.com record tsig req in answers"
+
+ algorithm_name = "gss.microsoft.com"
+ self.tkey_trans(algorithm_name=algorithm_name,
+ tkey_req_in_answers=True)
+
+ p = self.make_update_request()
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Check the record is around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now delete the record
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'],
+ algorithm_name=algorithm_name)
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
@@ -131,35 +331,28 @@ class TestDNSUpdates(DNSTKeyTest):
def test_update_tsig_windows(self):
"test DNS update with correct TSIG record (follow Windows pattern)"
- newrecname = "win" + self.newrecname
+ p = self.make_update_request()
+
rr_class = dns.DNS_QCLASS_IN
ttl = 1200
- p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
- q = self.make_name_question(self.get_dns_domain(),
- dns.DNS_QTYPE_SOA,
- dns.DNS_QCLASS_IN)
- questions = []
- questions.append(q)
- self.finish_name_packet(p, questions)
-
updates = []
r = dns.res_rec()
- r.name = newrecname
+ r.name = self.newrecname
r.rr_type = dns.DNS_QTYPE_A
r.rr_class = dns.DNS_QCLASS_ANY
r.ttl = 0
r.length = 0
updates.append(r)
r = dns.res_rec()
- r.name = newrecname
+ r.name = self.newrecname
r.rr_type = dns.DNS_QTYPE_AAAA
r.rr_class = dns.DNS_QCLASS_ANY
r.ttl = 0
r.length = 0
updates.append(r)
r = dns.res_rec()
- r.name = newrecname
+ r.name = self.newrecname
r.rr_type = dns.DNS_QTYPE_A
r.rr_class = rr_class
r.ttl = ttl
@@ -171,7 +364,7 @@ class TestDNSUpdates(DNSTKeyTest):
prereqs = []
r = dns.res_rec()
- r.name = newrecname
+ r.name = self.newrecname
r.rr_type = dns.DNS_QTYPE_CNAME
r.rr_class = dns.DNS_QCLASS_NONE
r.ttl = 0
@@ -181,21 +374,87 @@ class TestDNSUpdates(DNSTKeyTest):
p.answers = prereqs
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED)
+ self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
self.tkey_trans()
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
# Check the record is around
- rcode = self.search_record(newrecname)
+ rcode = self.search_record(self.newrecname)
self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
# Now delete the record
+ delete_updates = []
+ r = dns.res_rec()
+ r.name = self.newrecname
+ r.rr_type = dns.DNS_QTYPE_A
+ r.rr_class = dns.DNS_QCLASS_NONE
+ r.ttl = 0
+ r.length = 0xffff
+ r.rdata = "10.1.45.64"
+ delete_updates.append(r)
+ p = self.make_update_request(delete=True)
+ p.nscount = len(delete_updates)
+ p.nsrecs = delete_updates
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # check it's gone
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
+
+ def test_update_tsig_record_access_denied(self):
+ """test DNS update with a TSIG record where the user does not have
+ permissions to change the record"""
+
+ self.tkey_trans()
+ adm_tkey = self.tkey
+
+ # First create the record as admin
+ p = self.make_update_request()
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Check the record is around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now update the same values as normal user
+ # should work without error
+ self.tkey_trans(creds=self.get_unpriv_creds())
+ unpriv_tkey = self.tkey
+
+ p = self.make_update_request()
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.verify_packet(response, response_p, mac)
+
+ # Check the record is still around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now try to delete the record a normal user (should fail)
+ p = self.make_update_request(delete=True)
+ mac = self.sign_packet(p, self.tkey['name'])
+ (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
+ self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
+
+ # Check the record is still around
+ rcode = self.search_record(self.newrecname)
+ self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
+
+ # Now delete the record as admin
+ self.tkey = adm_tkey
p = self.make_update_request(delete=True)
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
diff --git a/python/samba/tests/join.py b/python/samba/tests/join.py
index b47bc70..b04cb4a 100644
--- a/python/samba/tests/join.py
+++ b/python/samba/tests/join.py
@@ -156,7 +156,7 @@ class JoinTestCase(DNSTKeyTest):
p.nscount = len(updates)
p.nsrecs = updates
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
diff --git a/python/samba/tests/ntacls.py b/python/samba/tests/ntacls.py
index 0b7963d..6e2adda 100644
--- a/python/samba/tests/ntacls.py
+++ b/python/samba/tests/ntacls.py
@@ -83,5 +83,5 @@ class NtaclsTests(TestCaseInTempDir):
lp = LoadParm()
open(self.tempf, 'w').write("empty")
lp.set("posix:eadb", os.path.join(self.tempdir, "eadbtest.tdb"))
- self.assertRaises(Exception, setntacl, lp, self.tempf, NTACL_SDDL,
+ self.assertRaises(PermissionError, setntacl, lp, self.tempf, NTACL_SDDL,
DOMAIN_SID, self.session_info, "native")