# Unix SMB/CIFS implementation. # Copyright (C) Amitay Isaacs 2011 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """Tests for samba.dcerpc.dnsserver""" import os import ldb from samba.auth import system_session from samba.samdb import SamDB from samba.ndr import ndr_unpack from samba.dcerpc import dnsp, dnsserver, security from samba.tests import RpcInterfaceTestCase, env_get_var_value from samba.dnsserver import record_from_string, flag_from_string, ARecord from samba import sd_utils, descriptor from samba import WERRORError, werror class DnsserverTests(RpcInterfaceTestCase): @classmethod def setUpClass(cls): super().setUpClass() good_dns = ["SAMDOM.EXAMPLE.COM", "1.EXAMPLE.COM", "%sEXAMPLE.COM" % ("1." * 100), "EXAMPLE", "\n.COM", "!@#$%^&*()_", "HIGH\xFFBYTE", "@.EXAMPLE.COM", "."] bad_dns = ["...", ".EXAMPLE.COM", ".EXAMPLE.", "", "SAMDOM..EXAMPLE.COM"] good_mx = ["SAMDOM.EXAMPLE.COM 65535"] bad_mx = [] good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"] bad_srv = [] for bad_dn in bad_dns: bad_mx.append("%s 1" % bad_dn) bad_srv.append("%s 0 0 0" % bad_dn) for good_dn in good_dns: good_mx.append("%s 1" % good_dn) good_srv.append("%s 0 0 0" % good_dn) cls.good_records = { "A": ["192.168.0.1", "255.255.255.255"], "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000", "0000:0000:0000:0000:0000:0000:0000:0000", "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0", "1234:1234:1234::", "1234:1234:1234:1234:1234::", "1234:5678:9ABC:DEF0::", "0000:0000::0000", "1234::5678:9ABC:0000:0000:0000:0000", "::1", "::", "1:1:1:1:1:1:1:1"], "PTR": good_dns, "CNAME": good_dns, "NS": good_dns, "MX": good_mx, "SRV": good_srv, "TXT": ["text", "", "@#!", "\n"] } cls.bad_records = { "A": ["192.168.0.500", "255.255.255.255/32"], "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000", "0000:0000:0000:0000:0000:0000:0000:0000/1", "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234", "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234", "1234:5678:9ABC:DEF0:1234:5678:9ABC", "1111::1111::1111"], "PTR": bad_dns, "CNAME": bad_dns, "NS": bad_dns, "MX": bad_mx, "SRV": bad_srv } # Because we use uint16_t for these numbers, we can't # actually create these records. invalid_mx = ["SAMDOM.EXAMPLE.COM -1", "SAMDOM.EXAMPLE.COM 65536", "%s 1" % ("A" * 256)] invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0", "SAMDOM.EXAMPLE.COM 0 0 65536", "SAMDOM.EXAMPLE.COM 65536 0 0"] cls.invalid_records = { "MX": invalid_mx, "SRV": invalid_srv } def setUp(self): super().setUp() self.server = os.environ["DC_SERVER"] self.zone = env_get_var_value("REALM").lower() self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server), self.get_loadparm(), self.get_credentials()) self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"], lp=self.get_loadparm(), session_info=system_session(), credentials=self.get_credentials()) self.custom_zone = "zone" zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() zone_create_info.pszZoneName = self.custom_zone zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY zone_create_info.fAging = 0 zone_create_info.fDsIntegrated = 1 zone_create_info.fLoadExisting = 1 zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, None, 0, 'ZoneCreate', dnsserver.DNSSRV_TYPEID_ZONE_CREATE, zone_create_info) def tearDown(self): self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, self.custom_zone, 0, 'DeleteZoneFromDs', dnsserver.DNSSRV_TYPEID_NULL, None) super().tearDown() def test_enum_is_sorted(self): """ Confirm the zone is sorted """ record_str = "192.168.50.50" record_type_str = "A" self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str) # This becomes an extra A on the zone itself by server-side magic self.add_record(self.custom_zone, self.custom_zone, record_type_str, record_str) _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, self.custom_zone, "@", None, flag_from_string(record_type_str), dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, None, None) self.assertEqual(len(result.rec), 6) self.assertEqual(result.rec[0].dnsNodeName.str, "") self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0") self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1") self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2") self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3") self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4") def test_enum_is_sorted_with_zone_dup(self): """ Confirm the zone is sorted """ record_str = "192.168.50.50" record_type_str = "A" self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str) # This triggers a bug in old Samba self.add_record(self.custom_zone, self.custom_zone + "1", record_type_str, record_str) dn, record = self.get_record_from_db(self.custom_zone, self.custom_zone + "1") new_dn = ldb.Dn(self.samdb, str(dn)) new_dn.set_component(0, "dc", self.custom_zone) self.samdb.rename(dn, new_dn) _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, self.custom_zone, "@", None, flag_from_string(record_type_str), dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, None, None) self.assertEqual(len(result.rec), 7) self.assertEqual(result.rec[0].dnsNodeName.str, "") self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0") self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1") self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2") self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3") self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4") # Windows doesn't reload the zone fast enough, but doesn't # have the bug anyway, it will sort last on both names (where # it should) if result.rec[6].dnsNodeName.str != (self.custom_zone + "1"): self.assertEqual(result.rec[6].dnsNodeName.str, self.custom_zone) def test_enum_is_sorted_children_prefix_first(self): """ Confirm the zone returns the selected prefix first but no more as Samba is flappy for the full sort """ record_str = "192.168.50.50" record_type_str = "A" self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str) # Not expected to be returned self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str) _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, self.custom_zone, "a.b", None, flag_from_string(record_type_str), dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, None, None) self.assertEqual(len(result.rec), 6) self.assertEqual(result.rec[0].dnsNodeName.str, "") def test_enum_is_sorted_children(self): """ Confirm the zone is sorted """ record_str = "192.168.50.50" record_type_str = "A" self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str) self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str) # Not expected to be returned self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str) _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, self.custom_zone, "a.b", None, flag_from_string(record_type_str), dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, None, None) self.assertEqual(len(result.rec), 6) self.assertEqual(result.rec[0].dnsNodeName.str, "") self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0") self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1") self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2") self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3") self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4") # This test fails against Samba (but passes against Windows), # because Samba does not return the record when we enum records. # Records can be given DNS_RANK_NONE when the zone they are in # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be # deleted, however, we do not consider this urgent to fix and # so this test is a knownfail. def test_rank_none(self): """ See what happens when we set a record's rank to DNS_RANK_NONE. """ record_str = "192.168.50.50" record_type_str = "A" self.add_record(self.custom_zone, "testrecord", record_type_str, record_str) dn, record = self.get_record_from_db(self.custom_zone, "testrecord") record.rank = 0 # DNS_RANK_NONE res = self.samdb.dns_replace_by_dn(dn, [record]) if res is not None: self.fail("Unable to update dns record to have DNS_RANK_NONE.") self.assert_num_records(self.custom_zone, "testrecord", record_type_str) self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str) self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0) def test_dns_tombstoned_zero_timestamp(self): """What happens with a zero EntombedTime tombstone?""" # A zero-timestamp tombstone record has a special meaning for # dns_common_replace(), which is the function exposed by # samdb.dns_replace_by_dn(), and which is *NOT* a general # purpose record replacement function but a specialised part # of the dns update mechanism (for both DLZ and internal). # # In the earlier stages of handling updates, a record that # needs to be deleted is set to be a tombstone with a zero # timestamp. dns_common_replace() notices this specific # marker, and if there are no other records, marks the node as # tombstoned, in the process adding a "real" tombstone. # # If the tombstone has a non-zero timestamp, as you'll see in # the next test, dns_common_replace will decide that the node # is already tombstoned, and that no action needs to be taken. # # This test has worked historically, entirely by accident, as # changing the wType appears to record_str = "192.168.50.50" self.add_record(self.custom_zone, "testrecord", 'A', record_str) dn, record = self.get_record_from_db(self.custom_zone, "testrecord") record.wType = dnsp.DNS_TYPE_TOMBSTONE record.data = 0 self.samdb.dns_replace_by_dn(dn, [record]) # there should be no A record, and one TOMBSTONE record. self.assert_num_records(self.custom_zone, "testrecord", 'A', 0) # we can't make assertions about the tombstone count based on # RPC calls, as there are no tombstones in RPCs (there is # "DNS_TYPE_ZERO" instead). Nor do tombstones show up if we # use DNS_TYPE_ALL. self.assert_num_records(self.custom_zone, "testrecord", 'ALL', 0) # But we can use LDAP: records = self.ldap_get_records(self.custom_zone, "testrecord") self.assertEqual(len(records), 1) r = records[0] self.assertEqual(r.wType, dnsp.DNS_TYPE_TOMBSTONE) self.assertGreater(r.data, 1e17) # ~ October 1916 # this should fail, because no A records. self.delete_record(self.custom_zone, "testrecord", 'A', record_str, assertion=False) def test_dns_tombstoned_nonzero_timestamp(self): """See what happens when we set a record to be tombstoned with an EntombedTime timestamp. """ # Because this tombstone has a non-zero EntombedTime, # dns_common_replace() will decide the node was already # tombstoned and there is nothing to be done, leaving the A # record where it was. record_str = "192.168.50.50" self.add_record(self.custom_zone, "testrecord", 'A', record_str) dn, record = self.get_record_from_db(self.custom_zone, "testrecord") record.wType = dnsp.DNS_TYPE_TOMBSTONE record.data = 0x123456789A self.samdb.dns_replace_by_dn(dn, [record]) # there should be the A record and no TOMBSTONE self.assert_num_records(self.custom_zone, "testrecord", 'A', 1) self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0) # this should succeed self.delete_record(self.custom_zone, "testrecord", 'A', record_str, assertion=True) self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0) self.assert_num_records(self.custom_zone, "testrecord", 'A', 0) def get_record_from_db(self, zone_name, record_name): """ Returns (dn of record, record) """ zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, expression="(objectClass=dnsZone)", attrs=["cn"]) zone_dn = None for zone in zones: if "DC=%s," % zone_name in str(zone.dn): zone_dn = zone.dn break if zone_dn is None: raise AssertionError("Couldn't find zone '%s'." % zone_name) records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE, expression="(objectClass=dnsNode)", attrs=["dnsRecord"]) for old_packed_record in records: if record_name in str(old_packed_record.dn): rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0]) return (old_packed_record.dn, rec) def ldap_get_records(self, zone, name): zone_dn = (f"DC={zone},CN=MicrosoftDNS,DC=DomainDNSZones," f"{self.samdb.get_default_basedn()}") expr = f"(&(objectClass=dnsNode)(name={name}))" nodes = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["dnsRecord"]) records = nodes[0].get('dnsRecord') return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records] def test_duplicate_matching(self): """ Make sure that records which should be distinct from each other or duplicate to each other behave as expected. """ distinct_dns = [("SAMDOM.EXAMPLE.COM", "SAMDOM.EXAMPLE.CO", "EXAMPLE.COM", "SAMDOM.EXAMPLE")] duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"), ("EXAMPLE.", "EXAMPLE")] # Every tuple has entries which should be considered duplicate to one another. duplicates = { "AAAA": [("AAAA::", "aaaa::"), ("AAAA::", "AAAA:0000::"), ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"), ("AAAA::", "AAAA:0:0:0:0:0:0:0"), ("0123::", "123::"), ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")], } # Every tuple has entries which should be considered distinct from one another. distinct = { "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")], "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"), ("1000::", "::1000"), ("::1", "::11", "::1111"), ("1234::", "0234::")], "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1", "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")], "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")], "TXT": [("A RECORD", "B RECORD", "a record")] } for record_type_str in ("PTR", "CNAME", "NS"): distinct[record_type_str] = distinct_dns duplicates[record_type_str] = duplicate_dns for record_type_str in duplicates: for duplicate_tuple in duplicates[record_type_str]: # Attempt to add duplicates and make sure that all after the first fails self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0]) for record in duplicate_tuple: self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False) self.assert_num_records(self.custom_zone, "testrecord", record_type_str) self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0]) # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds for record in duplicate_tuple: self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0]) self.delete_record(self.custom_zone, "testrecord", record_type_str, record) for record_type_str in distinct: for distinct_tuple in distinct[record_type_str]: # Attempt to add distinct and make sure that they all succeed within a tuple i = 0 for record in distinct_tuple: i = i + 1 try: self.add_record(self.custom_zone, "testrecord", record_type_str, record) # All records should have been added. self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i) except AssertionError as e: raise AssertionError("Failed to add %s, which should be distinct from all others in the set. " "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple)) for record in distinct_tuple: self.delete_record(self.custom_zone, "testrecord", record_type_str, record) # CNAMEs should not have been added, since they conflict. if record_type_str == 'CNAME': continue # Add the first distinct and attempt to remove all of the others, making sure this fails # Windows fails this test. This is probably due to weird tombstoning behavior. self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0]) for record in distinct_tuple: if record == distinct_tuple[0]: continue try: self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False) except AssertionError as e: raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s" % (distinct_tuple[0], record, e)) self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0]) def test_accept_valid_commands(self): """ Make sure that we can add, update and delete a variety of valid records. """ for record_type_str in self.good_records: for record_str in self.good_records[record_type_str]: self.add_record(self.custom_zone, "testrecord", record_type_str, record_str) self.assert_num_records(self.custom_zone, "testrecord", record_type_str) self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str) def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data, wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"): res = self.get_record_from_db(zone, rec_name) self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data) (rec_dn, rec) = res self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength)) self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank)) self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags)) self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds)) self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved)) self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower())) self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType)) self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp)) def test_record_params(self): """ Make sure that, when we add records to the database, they're added with reasonable parameters. """ self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50") self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1) self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50") self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::") self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28) self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::") self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest") self.check_params(13, 240, 0, 900, 0, "cnamedest", 5) self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest") def test_reject_invalid_commands(self): """ Make sure that we can't add a variety of invalid records, and that we can't update valid records to invalid ones. """ num_failures = 0 for record_type_str in self.bad_records: for record_str in self.bad_records[record_type_str]: # Attempt to add the bad record, which should fail. Then, attempt to query for and delete # it. Since it shouldn't exist, these should fail too. try: self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0) self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) except AssertionError as e: print(e) num_failures = num_failures + 1 # Also try to update valid records to invalid ones, making sure this fails for record_type_str in self.bad_records: for record_str in self.bad_records[record_type_str]: good_record_str = self.good_records[record_type_str][0] self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str) try: self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) except AssertionError as e: print(e) num_failures = num_failures + 1 self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str) self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures) def test_add_duplicate_different_type(self): """ Attempt to add some values which have the same name as existing ones, just a different type. """ num_failures = 0 for record_type_str_1 in self.good_records: record1 = self.good_records[record_type_str_1][0] self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1) for record_type_str_2 in self.good_records: if record_type_str_1 == record_type_str_2: continue record2 = self.good_records[record_type_str_2][0] has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A' has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA' has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME' has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR' has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX' has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV' has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT' # If we attempt to add any record except A or AAAA when we already have an NS record, # the add should fail. add_error_ok = False if record_type_str_1 == 'NS' and not has_a and not has_aaaa: add_error_ok = True # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail. if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa): add_error_ok = True # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail. # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail. if has_cname and (has_a or has_aaaa or has_srv or has_txt): add_error_ok = True try: self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2) if add_error_ok: num_failures = num_failures + 1 print("Expected error when adding %s while a %s existed." % (record_type_str_2, record_type_str_1)) except AssertionError as e: if not add_error_ok: num_failures = num_failures + 1 print("Didn't expect error when adding %s while a %s existed." % (record_type_str_2, record_type_str_1)) if not add_error_ok: # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards. expected_num_type_1 = 1 expected_num_type_2 = 1 # If we have an MX record, a PTR record should replace it when added. # If we have a PTR record, an MX record should replace it when added. if has_ptr and has_mx: expected_num_type_1 = 0 # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added. if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'): expected_num_type_1 = 0 if (record_type_str_1 == 'NS' and (has_a or has_aaaa)): expected_num_type_2 = 0 try: self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1) except AssertionError as e: num_failures = num_failures + 1 print("Expected %s %s records after adding a %s record and a %s record already existed." % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1)) try: self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2) except AssertionError as e: num_failures = num_failures + 1 print("Expected %s %s records after adding a %s record and a %s record already existed." % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1)) try: self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2) except AssertionError as e: pass self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1) self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures) # Windows fails this test in the same way we do. def _test_cname(self): """ Test some special properties of CNAME records. """ # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias cname_record = self.good_records["CNAME"][1] self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record) for record_type_str in self.good_records: other_record = self.good_records[record_type_str][0] self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False) self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0) # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias mx_record = "testrecord 1" ns_record = "testrecord" self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False) self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False) self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record) def test_add_duplicate_value(self): """ Make sure that we can't add duplicate values of any type. """ for record_type_str in self.good_records: record = self.good_records[record_type_str][0] self.add_record(self.custom_zone, "testrecord", record_type_str, record) self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False) self.assert_num_records(self.custom_zone, "testrecord", record_type_str) self.delete_record(self.custom_zone, "testrecord", record_type_str, record) def test_add_similar_value(self): """ Attempt to add values with the same name and type in the same zone. This should work, and should result in both values existing (except with some types). """ for record_type_str in self.good_records: for i in range(1, len(self.good_records[record_type_str])): record1 = self.good_records[record_type_str][i - 1] record2 = self.good_records[record_type_str][i] if record_type_str == 'CNAME': continue # We expect CNAME records to override one another, as # an alias can only map to one CNAME record. # Also, on Windows, when the empty string is added and # another record is added afterwards, the empty string # will be silently overridden by the new one, so it # fails this test for the empty string. expected_num = 1 if record_type_str == 'CNAME' else 2 self.add_record(self.custom_zone, "testrecord", record_type_str, record1) self.add_record(self.custom_zone, "testrecord", record_type_str, record2) self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num) self.delete_record(self.custom_zone, "testrecord", record_type_str, record1) self.delete_record(self.custom_zone, "testrecord", record_type_str, record2) def assert_record(self, zone, name, record_type_str, expected_record_str, assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): """ Asserts whether or not the given record with the given type exists in the given zone. """ try: _, result = self.query_records(zone, name, record_type_str) except RuntimeError as e: if assertion: raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str)) else: return found = False for record in result.rec[0].records: if record.data == expected_record_str: found = True break if found and not assertion: raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str)) elif not found and assertion: raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str)) def assert_num_records(self, zone, name, record_type_str, expected_num=1, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): """ Asserts that there are a given amount of records with the given type in the given zone. """ try: _, result = self.query_records(zone, name, record_type_str) num_results = len(result.rec[0].records) if not num_results == expected_num: raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected." % (num_results, record_type_str, name, expected_num)) except RuntimeError: if not expected_num == 0: raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected." % (record_type_str, name, expected_num)) def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): return self.conn.DnssrvEnumRecords2(client_version, 0, self.server, zone, name, None, flag_from_string(record_type_str), dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN, None, None) def add_record(self, zone, name, record_type_str, record_str, assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): """ Attempts to add a map from the given name to a record of the given type, in the given zone. Also asserts whether or not the add was successful. This can also update existing records if they have the same name. """ record = record_from_string(record_type_str, record_str, sep=' ') add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = record try: self.conn.DnssrvUpdateRecord2(client_version, 0, self.server, zone, name, add_rec_buf, None) if not assertion: raise AssertionError("Successfully added record '%s' of type '%s', which should have failed." % (record_str, record_type_str)) except RuntimeError as e: if assertion: raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e))) def delete_record(self, zone, name, record_type_str, record_str, assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): """ Attempts to delete a record with the given name, record and record type from the given zone. Also asserts whether or not the deletion was successful. """ record = record_from_string(record_type_str, record_str, sep=' ') del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() del_rec_buf.rec = record try: self.conn.DnssrvUpdateRecord2(client_version, 0, self.server, zone, name, None, del_rec_buf) if not assertion: raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str)) except RuntimeError as e: if assertion: raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e))) def test_query2(self): typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K, 0, self.server, None, 'ServerInfo') self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid) typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET, 0, self.server, None, 'ServerInfo') self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid) typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, None, 'ServerInfo') self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid) # This test is to confirm that we do not support multizone operations, # which are designated by a non-zero dwContext value (the 3rd argument # to DnssrvOperation). def test_operation_invalid(self): non_zone = 'a-zone-that-does-not-exist' typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM() name_and_param.pszNodeName = 'AllowUpdate' name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE try: res = self.conn.DnssrvOperation(self.server, non_zone, 1, 'ResetDwordProperty', typeid, name_and_param) except WERRORError as e: if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: return # We should always encounter a DOES_NOT_EXIST error. self.fail() # This test is to confirm that we do not support multizone operations, # which are designated by a non-zero dwContext value (the 5th argument # to DnssrvOperation2). def test_operation2_invalid(self): client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN non_zone = 'a-zone-that-does-not-exist' typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM() name_and_param.pszNodeName = 'AllowUpdate' name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE try: res = self.conn.DnssrvOperation2(client_version, 0, self.server, non_zone, 1, 'ResetDwordProperty', typeid, name_and_param) except WERRORError as e: if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: return # We should always encounter a DOES_NOT_EXIST error. self.fail() def test_operation2(self): client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN rev_zone = '1.168.192.in-addr.arpa' zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() zone_create.pszZoneName = rev_zone zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE zone_create.fAging = 0 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT # Create zone self.conn.DnssrvOperation2(client_version, 0, self.server, None, 0, 'ZoneCreate', dnsserver.DNSSRV_TYPEID_ZONE_CREATE, zone_create) request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | dnsserver.DNS_ZONE_REQUEST_PRIMARY) _, zones = self.conn.DnssrvComplexOperation2(client_version, 0, self.server, None, 'EnumZones', dnsserver.DNSSRV_TYPEID_DWORD, request_filter) self.assertEqual(1, zones.dwZoneCount) # Delete zone self.conn.DnssrvOperation2(client_version, 0, self.server, rev_zone, 0, 'DeleteZoneFromDs', dnsserver.DNSSRV_TYPEID_NULL, None) typeid, zones = self.conn.DnssrvComplexOperation2(client_version, 0, self.server, None, 'EnumZones', dnsserver.DNSSRV_TYPEID_DWORD, request_filter) self.assertEqual(0, zones.dwZoneCount) def test_complexoperation2(self): client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD | dnsserver.DNS_ZONE_REQUEST_PRIMARY) typeid, zones = self.conn.DnssrvComplexOperation2(client_version, 0, self.server, None, 'EnumZones', dnsserver.DNSSRV_TYPEID_DWORD, request_filter) self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) self.assertEqual(3, zones.dwZoneCount) request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | dnsserver.DNS_ZONE_REQUEST_PRIMARY) typeid, zones = self.conn.DnssrvComplexOperation2(client_version, 0, self.server, None, 'EnumZones', dnsserver.DNSSRV_TYPEID_DWORD, request_filter) self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) self.assertEqual(0, zones.dwZoneCount) def test_enumrecords2(self): client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN record_type = dnsp.DNS_TYPE_NS select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA | dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA) _, roothints = self.conn.DnssrvEnumRecords2(client_version, 0, self.server, '..RootHints', '.', None, record_type, select_flags, None, None) self.assertEqual(14, roothints.count) # 1 NS + 13 A records (a-m) def test_updaterecords2(self): client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN record_type = dnsp.DNS_TYPE_A select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA name = 'dummy' rec = ARecord('1.2.3.4') rec2 = ARecord('5.6.7.8') # Add record add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec self.conn.DnssrvUpdateRecord2(client_version, 0, self.server, self.zone, name, add_rec_buf, None) _, result = self.conn.DnssrvEnumRecords2(client_version, 0, self.server, self.zone, name, None, record_type, select_flags, None, None) self.assertEqual(1, result.count) self.assertEqual(1, result.rec[0].wRecordCount) self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) self.assertEqual('1.2.3.4', result.rec[0].records[0].data) # Update record add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() add_rec_buf.rec = rec2 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() del_rec_buf.rec = rec self.conn.DnssrvUpdateRecord2(client_version, 0, self.server, self.zone, name, add_rec_buf, del_rec_buf) buflen, result = self.conn.DnssrvEnumRecords2(client_version, 0, self.server, self.zone, name, None, record_type, select_flags, None, None) self.assertEqual(1, result.count) self.assertEqual(1, result.rec[0].wRecordCount) self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) self.assertEqual('5.6.7.8', result.rec[0].records[0].data) # Delete record del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() del_rec_buf.rec = rec2 self.conn.DnssrvUpdateRecord2(client_version, 0, self.server, self.zone, name, None, del_rec_buf) self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2, client_version, 0, self.server, self.zone, name, None, record_type, select_flags, None, None) # The following tests do not pass against Samba because the owner and # group are not consistent with Windows, as well as some ACEs. # # The following ACE are also required for 2012R2: # # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS) # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)" # # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity] def test_security_descriptor_msdcs_zone(self): """ Make sure that security descriptors of the msdcs zone is as expected. """ zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, expression="(&(objectClass=dnsZone)(name=_msdcs*))", attrs=["nTSecurityDescriptor", "objectClass"]) self.assertEqual(len(zones), 1) self.assertIn("nTSecurityDescriptor", zones[0]) tmp = zones[0]["nTSecurityDescriptor"][0] utils = sd_utils.SDUtils(self.samdb) sd = ndr_unpack(security.descriptor, tmp) domain_sid = security.dom_sid(self.samdb.get_domain_sid()) res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, expression="(sAMAccountName=DnsAdmins)", attrs=["objectSid"]) dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0])) packed_sd = descriptor.sddl2binary("O:SYG:BA" "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" "(A;;CC;;;AU)" "(A;;RPLCLORC;;;WD)" "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)", domain_sid, {"DnsAdmins": dns_admin}) expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd)) diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid) self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n" "Difference was:\n%s\nExpected: %s\nGot: %s" % (diff, expected_sd.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid))) def test_security_descriptor_forest_zone(self): """ Make sure that security descriptors of forest dns zones are as expected. """ forest_zone = "test_forest_zone" zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY zone_create_info.fAging = 0 zone_create_info.fDsIntegrated = 1 zone_create_info.fLoadExisting = 1 zone_create_info.pszZoneName = forest_zone zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, None, 0, 'ZoneCreate', dnsserver.DNSSRV_TYPEID_ZONE_CREATE, zone_create_info) partition_dn = self.samdb.get_default_basedn() partition_dn.add_child("DC=ForestDnsZones") zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE, expression="(name=%s)" % forest_zone, attrs=["nTSecurityDescriptor"]) self.assertEqual(len(zones), 1) current_dn = zones[0].dn self.assertIn("nTSecurityDescriptor", zones[0]) tmp = zones[0]["nTSecurityDescriptor"][0] utils = sd_utils.SDUtils(self.samdb) sd = ndr_unpack(security.descriptor, tmp) domain_sid = security.dom_sid(self.samdb.get_domain_sid()) res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, expression="(sAMAccountName=DnsAdmins)", attrs=["objectSid"]) dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0])) packed_sd = descriptor.sddl2binary("O:DAG:DA" "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" "(A;;CC;;;AU)" "(A;;RPLCLORC;;;WD)" "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)", domain_sid, {"DnsAdmins": dns_admin}) expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd)) packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid, {"DnsAdmins": dns_admin}) expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns)) packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid) expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_part_sd)) try: msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn)) security_desc_dict = [(current_dn.get_linearized(), expected_sd), (msdns_dn.get_linearized(), expected_msdns_sd), (partition_dn.get_linearized(), expected_part_sd)] for (key, sec_desc) in security_desc_dict: zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE, attrs=["nTSecurityDescriptor"]) self.assertIn("nTSecurityDescriptor", zones[0]) tmp = zones[0]["nTSecurityDescriptor"][0] utils = sd_utils.SDUtils(self.samdb) sd = ndr_unpack(security.descriptor, tmp) diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid) self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s" % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid))) finally: self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, self.server, forest_zone, 0, 'DeleteZoneFromDs', dnsserver.DNSSRV_TYPEID_NULL, None) def test_security_descriptor_domain_zone(self): """ Make sure that security descriptors of domain dns zones are as expected. """ partition_dn = self.samdb.get_default_basedn() partition_dn.add_child("DC=DomainDnsZones") zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE, expression="(name=%s)" % self.custom_zone, attrs=["nTSecurityDescriptor"]) self.assertEqual(len(zones), 1) current_dn = zones[0].dn self.assertIn("nTSecurityDescriptor", zones[0]) tmp = zones[0]["nTSecurityDescriptor"][0] utils = sd_utils.SDUtils(self.samdb) sd = ndr_unpack(security.descriptor, tmp) sddl = sd.as_sddl(utils.domain_sid) domain_sid = security.dom_sid(self.samdb.get_domain_sid()) res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, expression="(sAMAccountName=DnsAdmins)", attrs=["objectSid"]) dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0])) packed_sd = descriptor.sddl2binary("O:DAG:DA" "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" "(A;;CC;;;AU)" "(A;;RPLCLORC;;;WD)" "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)", domain_sid, {"DnsAdmins": dns_admin}) expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd)) packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid, {"DnsAdmins": dns_admin}) expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns)) packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid) expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_part_sd)) msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn)) security_desc_dict = [(current_dn.get_linearized(), expected_sd), (msdns_dn.get_linearized(), expected_msdns_sd), (partition_dn.get_linearized(), expected_part_sd)] for (key, sec_desc) in security_desc_dict: zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE, attrs=["nTSecurityDescriptor"]) self.assertIn("nTSecurityDescriptor", zones[0]) tmp = zones[0]["nTSecurityDescriptor"][0] utils = sd_utils.SDUtils(self.samdb) sd = ndr_unpack(security.descriptor, tmp) diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid) self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s" % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))