diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
commit | 8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch) | |
tree | 4099e8021376c7d8c05bdf8503093d80e9c7bad0 /python/samba/tests/dcerpc/dnsserver.py | |
parent | Initial commit. (diff) | |
download | samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip |
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python/samba/tests/dcerpc/dnsserver.py')
-rw-r--r-- | python/samba/tests/dcerpc/dnsserver.py | 1314 |
1 files changed, 1314 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/dnsserver.py b/python/samba/tests/dcerpc/dnsserver.py new file mode 100644 index 0000000..13c9af8 --- /dev/null +++ b/python/samba/tests/dcerpc/dnsserver.py @@ -0,0 +1,1314 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.dnsserver""" + +import os +import ldb + +from samba.auth import system_session +from samba.samdb import SamDB +from samba.ndr import ndr_unpack +from samba.dcerpc import dnsp, dnsserver, security +from samba.tests import RpcInterfaceTestCase, env_get_var_value +from samba.dnsserver import record_from_string, flag_from_string, ARecord +from samba import sd_utils, descriptor +from samba import WERRORError, werror + + +class DnsserverTests(RpcInterfaceTestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + good_dns = ["SAMDOM.EXAMPLE.COM", + "1.EXAMPLE.COM", + "%sEXAMPLE.COM" % ("1." * 100), + "EXAMPLE", + "\n.COM", + "!@#$%^&*()_", + "HIGH\xFFBYTE", + "@.EXAMPLE.COM", + "."] + bad_dns = ["...", + ".EXAMPLE.COM", + ".EXAMPLE.", + "", + "SAMDOM..EXAMPLE.COM"] + + good_mx = ["SAMDOM.EXAMPLE.COM 65535"] + bad_mx = [] + + good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"] + bad_srv = [] + + for bad_dn in bad_dns: + bad_mx.append("%s 1" % bad_dn) + bad_srv.append("%s 0 0 0" % bad_dn) + for good_dn in good_dns: + good_mx.append("%s 1" % good_dn) + good_srv.append("%s 0 0 0" % good_dn) + + cls.good_records = { + "A": ["192.168.0.1", + "255.255.255.255"], + "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000", + "0000:0000:0000:0000:0000:0000:0000:0000", + "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0", + "1234:1234:1234::", + "1234:1234:1234:1234:1234::", + "1234:5678:9ABC:DEF0::", + "0000:0000::0000", + "1234::5678:9ABC:0000:0000:0000:0000", + "::1", + "::", + "1:1:1:1:1:1:1:1"], + "PTR": good_dns, + "CNAME": good_dns, + "NS": good_dns, + "MX": good_mx, + "SRV": good_srv, + "TXT": ["text", "", "@#!", "\n"] + } + + cls.bad_records = { + "A": ["192.168.0.500", + "255.255.255.255/32"], + "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000", + "0000:0000:0000:0000:0000:0000:0000:0000/1", + "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234", + "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234", + "1234:5678:9ABC:DEF0:1234:5678:9ABC", + "1111::1111::1111"], + "PTR": bad_dns, + "CNAME": bad_dns, + "NS": bad_dns, + "MX": bad_mx, + "SRV": bad_srv + } + + # Because we use uint16_t for these numbers, we can't + # actually create these records. + invalid_mx = ["SAMDOM.EXAMPLE.COM -1", + "SAMDOM.EXAMPLE.COM 65536", + "%s 1" % ("A" * 256)] + invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0", + "SAMDOM.EXAMPLE.COM 0 0 65536", + "SAMDOM.EXAMPLE.COM 65536 0 0"] + cls.invalid_records = { + "MX": invalid_mx, + "SRV": invalid_srv + } + + def setUp(self): + super().setUp() + self.server = os.environ["DC_SERVER"] + self.zone = env_get_var_value("REALM").lower() + self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server), + self.get_loadparm(), + self.get_credentials()) + + self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"], + lp=self.get_loadparm(), + session_info=system_session(), + credentials=self.get_credentials()) + + self.custom_zone = "zone" + zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create_info.pszZoneName = self.custom_zone + zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create_info.fAging = 0 + zone_create_info.fDsIntegrated = 1 + zone_create_info.fLoadExisting = 1 + zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + + self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create_info) + + def tearDown(self): + self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + self.custom_zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + super().tearDown() + + def test_enum_is_sorted(self): + """ + Confirm the zone is sorted + """ + + record_str = "192.168.50.50" + record_type_str = "A" + self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str) + + # This becomes an extra A on the zone itself by server-side magic + self.add_record(self.custom_zone, self.custom_zone, record_type_str, record_str) + + _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + self.custom_zone, + "@", + None, + flag_from_string(record_type_str), + dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, + None, + None) + + self.assertEqual(len(result.rec), 6) + self.assertEqual(result.rec[0].dnsNodeName.str, "") + self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0") + self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1") + self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2") + self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3") + self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4") + + def test_enum_is_sorted_with_zone_dup(self): + """ + Confirm the zone is sorted + """ + + record_str = "192.168.50.50" + record_type_str = "A" + self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str) + + # This triggers a bug in old Samba + self.add_record(self.custom_zone, self.custom_zone + "1", record_type_str, record_str) + + dn, record = self.get_record_from_db(self.custom_zone, self.custom_zone + "1") + + new_dn = ldb.Dn(self.samdb, str(dn)) + new_dn.set_component(0, "dc", self.custom_zone) + self.samdb.rename(dn, new_dn) + + _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + self.custom_zone, + "@", + None, + flag_from_string(record_type_str), + dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, + None, + None) + + self.assertEqual(len(result.rec), 7) + self.assertEqual(result.rec[0].dnsNodeName.str, "") + self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0") + self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1") + self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2") + self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3") + self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4") + + # Windows doesn't reload the zone fast enough, but doesn't + # have the bug anyway, it will sort last on both names (where + # it should) + if result.rec[6].dnsNodeName.str != (self.custom_zone + "1"): + self.assertEqual(result.rec[6].dnsNodeName.str, self.custom_zone) + + def test_enum_is_sorted_children_prefix_first(self): + """ + Confirm the zone returns the selected prefix first but no more + as Samba is flappy for the full sort + """ + + record_str = "192.168.50.50" + record_type_str = "A" + self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str) + + # Not expected to be returned + self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str) + + _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + self.custom_zone, + "a.b", + None, + flag_from_string(record_type_str), + dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, + None, + None) + + self.assertEqual(len(result.rec), 6) + self.assertEqual(result.rec[0].dnsNodeName.str, "") + + def test_enum_is_sorted_children(self): + """ + Confirm the zone is sorted + """ + + record_str = "192.168.50.50" + record_type_str = "A" + self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str) + self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str) + + # Not expected to be returned + self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str) + + _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + self.custom_zone, + "a.b", + None, + flag_from_string(record_type_str), + dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA, + None, + None) + + self.assertEqual(len(result.rec), 6) + self.assertEqual(result.rec[0].dnsNodeName.str, "") + self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0") + self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1") + self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2") + self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3") + self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4") + + # This test fails against Samba (but passes against Windows), + # because Samba does not return the record when we enum records. + # Records can be given DNS_RANK_NONE when the zone they are in + # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be + # deleted, however, we do not consider this urgent to fix and + # so this test is a knownfail. + def test_rank_none(self): + """ + See what happens when we set a record's rank to + DNS_RANK_NONE. + """ + + record_str = "192.168.50.50" + record_type_str = "A" + self.add_record(self.custom_zone, "testrecord", record_type_str, record_str) + + dn, record = self.get_record_from_db(self.custom_zone, "testrecord") + record.rank = 0 # DNS_RANK_NONE + res = self.samdb.dns_replace_by_dn(dn, [record]) + if res is not None: + self.fail("Unable to update dns record to have DNS_RANK_NONE.") + + self.assert_num_records(self.custom_zone, "testrecord", record_type_str) + self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0) + + def test_dns_tombstoned_zero_timestamp(self): + """What happens with a zero EntombedTime tombstone?""" + # A zero-timestamp tombstone record has a special meaning for + # dns_common_replace(), which is the function exposed by + # samdb.dns_replace_by_dn(), and which is *NOT* a general + # purpose record replacement function but a specialised part + # of the dns update mechanism (for both DLZ and internal). + # + # In the earlier stages of handling updates, a record that + # needs to be deleted is set to be a tombstone with a zero + # timestamp. dns_common_replace() notices this specific + # marker, and if there are no other records, marks the node as + # tombstoned, in the process adding a "real" tombstone. + # + # If the tombstone has a non-zero timestamp, as you'll see in + # the next test, dns_common_replace will decide that the node + # is already tombstoned, and that no action needs to be taken. + # + # This test has worked historically, entirely by accident, as + # changing the wType appears to + + record_str = "192.168.50.50" + self.add_record(self.custom_zone, "testrecord", 'A', record_str) + + dn, record = self.get_record_from_db(self.custom_zone, "testrecord") + record.wType = dnsp.DNS_TYPE_TOMBSTONE + record.data = 0 + self.samdb.dns_replace_by_dn(dn, [record]) + + # there should be no A record, and one TOMBSTONE record. + self.assert_num_records(self.custom_zone, "testrecord", 'A', 0) + # we can't make assertions about the tombstone count based on + # RPC calls, as there are no tombstones in RPCs (there is + # "DNS_TYPE_ZERO" instead). Nor do tombstones show up if we + # use DNS_TYPE_ALL. + self.assert_num_records(self.custom_zone, "testrecord", 'ALL', 0) + + # But we can use LDAP: + records = self.ldap_get_records(self.custom_zone, "testrecord") + self.assertEqual(len(records), 1) + r = records[0] + self.assertEqual(r.wType, dnsp.DNS_TYPE_TOMBSTONE) + self.assertGreater(r.data, 1e17) # ~ October 1916 + + # this should fail, because no A records. + self.delete_record(self.custom_zone, "testrecord", 'A', record_str, + assertion=False) + + def test_dns_tombstoned_nonzero_timestamp(self): + """See what happens when we set a record to be tombstoned with an + EntombedTime timestamp. + """ + # Because this tombstone has a non-zero EntombedTime, + # dns_common_replace() will decide the node was already + # tombstoned and there is nothing to be done, leaving the A + # record where it was. + + record_str = "192.168.50.50" + self.add_record(self.custom_zone, "testrecord", 'A', record_str) + + dn, record = self.get_record_from_db(self.custom_zone, "testrecord") + record.wType = dnsp.DNS_TYPE_TOMBSTONE + record.data = 0x123456789A + self.samdb.dns_replace_by_dn(dn, [record]) + + # there should be the A record and no TOMBSTONE + self.assert_num_records(self.custom_zone, "testrecord", 'A', 1) + self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0) + # this should succeed + self.delete_record(self.custom_zone, "testrecord", 'A', record_str, + assertion=True) + self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0) + self.assert_num_records(self.custom_zone, "testrecord", 'A', 0) + + def get_record_from_db(self, zone_name, record_name): + """ + Returns (dn of record, record) + """ + + zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, + expression="(objectClass=dnsZone)", + attrs=["cn"]) + + zone_dn = None + for zone in zones: + if "DC=%s," % zone_name in str(zone.dn): + zone_dn = zone.dn + break + + if zone_dn is None: + raise AssertionError("Couldn't find zone '%s'." % zone_name) + + records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE, + expression="(objectClass=dnsNode)", + attrs=["dnsRecord"]) + + for old_packed_record in records: + if record_name in str(old_packed_record.dn): + rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0]) + return (old_packed_record.dn, rec) + + def ldap_get_records(self, zone, name): + zone_dn = (f"DC={zone},CN=MicrosoftDNS,DC=DomainDNSZones," + f"{self.samdb.get_default_basedn()}") + + expr = f"(&(objectClass=dnsNode)(name={name}))" + nodes = self.samdb.search(base=zone_dn, + scope=ldb.SCOPE_SUBTREE, + expression=expr, + attrs=["dnsRecord"]) + + records = nodes[0].get('dnsRecord') + return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records] + + def test_duplicate_matching(self): + """ + Make sure that records which should be distinct from each other or duplicate + to each other behave as expected. + """ + + distinct_dns = [("SAMDOM.EXAMPLE.COM", + "SAMDOM.EXAMPLE.CO", + "EXAMPLE.COM", "SAMDOM.EXAMPLE")] + duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"), + ("EXAMPLE.", "EXAMPLE")] + + # Every tuple has entries which should be considered duplicate to one another. + duplicates = { + "AAAA": [("AAAA::", "aaaa::"), + ("AAAA::", "AAAA:0000::"), + ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"), + ("AAAA::", "AAAA:0:0:0:0:0:0:0"), + ("0123::", "123::"), + ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")], + } + + # Every tuple has entries which should be considered distinct from one another. + distinct = { + "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")], + "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"), + ("1000::", "::1000"), + ("::1", "::11", "::1111"), + ("1234::", "0234::")], + "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1", + "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")], + "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")], + "TXT": [("A RECORD", "B RECORD", "a record")] + } + + for record_type_str in ("PTR", "CNAME", "NS"): + distinct[record_type_str] = distinct_dns + duplicates[record_type_str] = duplicate_dns + + for record_type_str in duplicates: + for duplicate_tuple in duplicates[record_type_str]: + # Attempt to add duplicates and make sure that all after the first fails + self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0]) + for record in duplicate_tuple: + self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str) + self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0]) + + # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds + for record in duplicate_tuple: + self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0]) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record) + + for record_type_str in distinct: + for distinct_tuple in distinct[record_type_str]: + # Attempt to add distinct and make sure that they all succeed within a tuple + i = 0 + for record in distinct_tuple: + i = i + 1 + try: + self.add_record(self.custom_zone, "testrecord", record_type_str, record) + # All records should have been added. + self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i) + except AssertionError as e: + raise AssertionError("Failed to add %s, which should be distinct from all others in the set. " + "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple)) + for record in distinct_tuple: + self.delete_record(self.custom_zone, "testrecord", record_type_str, record) + # CNAMEs should not have been added, since they conflict. + if record_type_str == 'CNAME': + continue + + # Add the first distinct and attempt to remove all of the others, making sure this fails + # Windows fails this test. This is probably due to weird tombstoning behavior. + self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0]) + for record in distinct_tuple: + if record == distinct_tuple[0]: + continue + try: + self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False) + except AssertionError as e: + raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s" + % (distinct_tuple[0], record, e)) + self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0]) + + def test_accept_valid_commands(self): + """ + Make sure that we can add, update and delete a variety + of valid records. + """ + for record_type_str in self.good_records: + for record_str in self.good_records[record_type_str]: + self.add_record(self.custom_zone, "testrecord", record_type_str, record_str) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str) + + def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data, + wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"): + res = self.get_record_from_db(zone, rec_name) + self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data) + (rec_dn, rec) = res + self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength)) + self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank)) + self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags)) + self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds)) + self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved)) + self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower())) + self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType)) + self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp)) + + def test_record_params(self): + """ + Make sure that, when we add records to the database, + they're added with reasonable parameters. + """ + self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50") + self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1) + self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50") + self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::") + self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28) + self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::") + self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest") + self.check_params(13, 240, 0, 900, 0, "cnamedest", 5) + self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest") + + def test_reject_invalid_commands(self): + """ + Make sure that we can't add a variety of invalid records, + and that we can't update valid records to invalid ones. + """ + num_failures = 0 + for record_type_str in self.bad_records: + for record_str in self.bad_records[record_type_str]: + # Attempt to add the bad record, which should fail. Then, attempt to query for and delete + # it. Since it shouldn't exist, these should fail too. + try: + self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) + except AssertionError as e: + print(e) + num_failures = num_failures + 1 + + # Also try to update valid records to invalid ones, making sure this fails + for record_type_str in self.bad_records: + for record_str in self.bad_records[record_type_str]: + good_record_str = self.good_records[record_type_str][0] + self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str) + try: + self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False) + except AssertionError as e: + print(e) + num_failures = num_failures + 1 + self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str) + + self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures) + + def test_add_duplicate_different_type(self): + """ + Attempt to add some values which have the same name as + existing ones, just a different type. + """ + num_failures = 0 + for record_type_str_1 in self.good_records: + record1 = self.good_records[record_type_str_1][0] + self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1) + for record_type_str_2 in self.good_records: + if record_type_str_1 == record_type_str_2: + continue + + record2 = self.good_records[record_type_str_2][0] + + has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A' + has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA' + has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME' + has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR' + has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX' + has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV' + has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT' + + # If we attempt to add any record except A or AAAA when we already have an NS record, + # the add should fail. + add_error_ok = False + if record_type_str_1 == 'NS' and not has_a and not has_aaaa: + add_error_ok = True + # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail. + if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa): + add_error_ok = True + # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail. + # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail. + if has_cname and (has_a or has_aaaa or has_srv or has_txt): + add_error_ok = True + + try: + self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2) + if add_error_ok: + num_failures = num_failures + 1 + print("Expected error when adding %s while a %s existed." + % (record_type_str_2, record_type_str_1)) + except AssertionError as e: + if not add_error_ok: + num_failures = num_failures + 1 + print("Didn't expect error when adding %s while a %s existed." + % (record_type_str_2, record_type_str_1)) + + if not add_error_ok: + # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards. + expected_num_type_1 = 1 + expected_num_type_2 = 1 + + # If we have an MX record, a PTR record should replace it when added. + # If we have a PTR record, an MX record should replace it when added. + if has_ptr and has_mx: + expected_num_type_1 = 0 + + # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added. + if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'): + expected_num_type_1 = 0 + + if (record_type_str_1 == 'NS' and (has_a or has_aaaa)): + expected_num_type_2 = 0 + + try: + self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1) + except AssertionError as e: + num_failures = num_failures + 1 + print("Expected %s %s records after adding a %s record and a %s record already existed." + % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1)) + try: + self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2) + except AssertionError as e: + num_failures = num_failures + 1 + print("Expected %s %s records after adding a %s record and a %s record already existed." + % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1)) + + try: + self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2) + except AssertionError as e: + pass + + self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1) + + self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures) + + # Windows fails this test in the same way we do. + def _test_cname(self): + """ + Test some special properties of CNAME records. + """ + + # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias + cname_record = self.good_records["CNAME"][1] + self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record) + + for record_type_str in self.good_records: + other_record = self.good_records[record_type_str][0] + self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0) + + # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias + mx_record = "testrecord 1" + ns_record = "testrecord" + + self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False) + self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False) + + self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record) + + def test_add_duplicate_value(self): + """ + Make sure that we can't add duplicate values of any type. + """ + for record_type_str in self.good_records: + record = self.good_records[record_type_str][0] + + self.add_record(self.custom_zone, "testrecord", record_type_str, record) + self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record) + + def test_add_similar_value(self): + """ + Attempt to add values with the same name and type in the same + zone. This should work, and should result in both values + existing (except with some types). + """ + for record_type_str in self.good_records: + for i in range(1, len(self.good_records[record_type_str])): + record1 = self.good_records[record_type_str][i - 1] + record2 = self.good_records[record_type_str][i] + + if record_type_str == 'CNAME': + continue + # We expect CNAME records to override one another, as + # an alias can only map to one CNAME record. + # Also, on Windows, when the empty string is added and + # another record is added afterwards, the empty string + # will be silently overridden by the new one, so it + # fails this test for the empty string. + expected_num = 1 if record_type_str == 'CNAME' else 2 + + self.add_record(self.custom_zone, "testrecord", record_type_str, record1) + self.add_record(self.custom_zone, "testrecord", record_type_str, record2) + self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record1) + self.delete_record(self.custom_zone, "testrecord", record_type_str, record2) + + def assert_record(self, zone, name, record_type_str, expected_record_str, + assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): + """ + Asserts whether or not the given record with the given type exists in the + given zone. + """ + try: + _, result = self.query_records(zone, name, record_type_str) + except RuntimeError as e: + if assertion: + raise AssertionError("Record '%s' of type '%s' was not present when it should have been." + % (expected_record_str, record_type_str)) + else: + return + + found = False + for record in result.rec[0].records: + if record.data == expected_record_str: + found = True + break + + if found and not assertion: + raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str)) + elif not found and assertion: + raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str)) + + def assert_num_records(self, zone, name, record_type_str, expected_num=1, + client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): + """ + Asserts that there are a given amount of records with the given type in + the given zone. + """ + try: + _, result = self.query_records(zone, name, record_type_str) + num_results = len(result.rec[0].records) + if not num_results == expected_num: + raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected." + % (num_results, record_type_str, name, expected_num)) + except RuntimeError: + if not expected_num == 0: + raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected." + % (record_type_str, name, expected_num)) + + def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): + return self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + zone, + name, + None, + flag_from_string(record_type_str), + dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN, + None, + None) + + def add_record(self, zone, name, record_type_str, record_str, + assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): + """ + Attempts to add a map from the given name to a record of the given type, + in the given zone. + Also asserts whether or not the add was successful. + This can also update existing records if they have the same name. + """ + record = record_from_string(record_type_str, record_str, sep=' ') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = record + + try: + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + zone, + name, + add_rec_buf, + None) + if not assertion: + raise AssertionError("Successfully added record '%s' of type '%s', which should have failed." + % (record_str, record_type_str)) + except RuntimeError as e: + if assertion: + raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'." + % (record_str, record_type_str, str(e))) + + def delete_record(self, zone, name, record_type_str, record_str, + assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN): + """ + Attempts to delete a record with the given name, record and record type + from the given zone. + Also asserts whether or not the deletion was successful. + """ + record = record_from_string(record_type_str, record_str, sep=' ') + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = record + + try: + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + zone, + name, + None, + del_rec_buf) + if not assertion: + raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str)) + except RuntimeError as e: + if assertion: + raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e))) + + def test_query2(self): + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K, + 0, + self.server, + None, + 'ServerInfo') + self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid) + + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET, + 0, + self.server, + None, + 'ServerInfo') + self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid) + + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 'ServerInfo') + self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid) + + + # This test is to confirm that we do not support multizone operations, + # which are designated by a non-zero dwContext value (the 3rd argument + # to DnssrvOperation). + def test_operation_invalid(self): + non_zone = 'a-zone-that-does-not-exist' + typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM + name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM() + name_and_param.pszNodeName = 'AllowUpdate' + name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE + try: + res = self.conn.DnssrvOperation(self.server, + non_zone, + 1, + 'ResetDwordProperty', + typeid, + name_and_param) + except WERRORError as e: + if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: + return + + # We should always encounter a DOES_NOT_EXIST error. + self.fail() + + # This test is to confirm that we do not support multizone operations, + # which are designated by a non-zero dwContext value (the 5th argument + # to DnssrvOperation2). + def test_operation2_invalid(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + non_zone = 'a-zone-that-does-not-exist' + typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM + name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM() + name_and_param.pszNodeName = 'AllowUpdate' + name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE + try: + res = self.conn.DnssrvOperation2(client_version, + 0, + self.server, + non_zone, + 1, + 'ResetDwordProperty', + typeid, + name_and_param) + except WERRORError as e: + if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST: + return + + # We should always encounter a DOES_NOT_EXIST error. + self.fail() + + def test_operation2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + rev_zone = '1.168.192.in-addr.arpa' + + zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create.pszZoneName = rev_zone + zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE + zone_create.fAging = 0 + zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + + # Create zone + self.conn.DnssrvOperation2(client_version, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create) + + request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + _, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEqual(1, zones.dwZoneCount) + + # Delete zone + self.conn.DnssrvOperation2(client_version, + 0, + self.server, + rev_zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEqual(0, zones.dwZoneCount) + + def test_complexoperation2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) + self.assertEqual(3, zones.dwZoneCount) + + request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) + self.assertEqual(0, zones.dwZoneCount) + + def test_enumrecords2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + record_type = dnsp.DNS_TYPE_NS + select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA | + dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA) + _, roothints = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + '..RootHints', + '.', + None, + record_type, + select_flags, + None, + None) + self.assertEqual(14, roothints.count) # 1 NS + 13 A records (a-m) + + def test_updaterecords2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + record_type = dnsp.DNS_TYPE_A + select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA + name = 'dummy' + rec = ARecord('1.2.3.4') + rec2 = ARecord('5.6.7.8') + + # Add record + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + add_rec_buf, + None) + + _, result = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + self.assertEqual(1, result.count) + self.assertEqual(1, result.rec[0].wRecordCount) + self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) + self.assertEqual('1.2.3.4', result.rec[0].records[0].data) + + # Update record + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec2 + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = rec + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + add_rec_buf, + del_rec_buf) + + buflen, result = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + self.assertEqual(1, result.count) + self.assertEqual(1, result.rec[0].wRecordCount) + self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) + self.assertEqual('5.6.7.8', result.rec[0].records[0].data) + + # Delete record + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = rec2 + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + None, + del_rec_buf) + + self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2, + client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + + # The following tests do not pass against Samba because the owner and + # group are not consistent with Windows, as well as some ACEs. + # + # The following ACE are also required for 2012R2: + # + # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS) + # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)" + # + # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity] + def test_security_descriptor_msdcs_zone(self): + """ + Make sure that security descriptors of the msdcs zone is + as expected. + """ + + zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(), + scope=ldb.SCOPE_SUBTREE, + expression="(&(objectClass=dnsZone)(name=_msdcs*))", + attrs=["nTSecurityDescriptor", "objectClass"]) + self.assertEqual(len(zones), 1) + self.assertIn("nTSecurityDescriptor", zones[0]) + tmp = zones[0]["nTSecurityDescriptor"][0] + utils = sd_utils.SDUtils(self.samdb) + sd = ndr_unpack(security.descriptor, tmp) + + domain_sid = security.dom_sid(self.samdb.get_domain_sid()) + + res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, + expression="(sAMAccountName=DnsAdmins)", + attrs=["objectSid"]) + + dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0])) + + packed_sd = descriptor.sddl2binary("O:SYG:BA" + "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + "(A;;CC;;;AU)" + "(A;;RPLCLORC;;;WD)" + "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" + "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)", + domain_sid, {"DnsAdmins": dns_admin}) + expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd)) + + diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid) + self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n" + "Difference was:\n%s\nExpected: %s\nGot: %s" % + (diff, expected_sd.as_sddl(utils.domain_sid), + sd.as_sddl(utils.domain_sid))) + + def test_security_descriptor_forest_zone(self): + """ + Make sure that security descriptors of forest dns zones are + as expected. + """ + forest_zone = "test_forest_zone" + zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create_info.fAging = 0 + zone_create_info.fDsIntegrated = 1 + zone_create_info.fLoadExisting = 1 + + zone_create_info.pszZoneName = forest_zone + zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT + + self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create_info) + + partition_dn = self.samdb.get_default_basedn() + partition_dn.add_child("DC=ForestDnsZones") + zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE, + expression="(name=%s)" % forest_zone, + attrs=["nTSecurityDescriptor"]) + self.assertEqual(len(zones), 1) + current_dn = zones[0].dn + self.assertIn("nTSecurityDescriptor", zones[0]) + tmp = zones[0]["nTSecurityDescriptor"][0] + utils = sd_utils.SDUtils(self.samdb) + sd = ndr_unpack(security.descriptor, tmp) + + domain_sid = security.dom_sid(self.samdb.get_domain_sid()) + + res = self.samdb.search(base=self.samdb.get_default_basedn(), + scope=ldb.SCOPE_SUBTREE, + expression="(sAMAccountName=DnsAdmins)", + attrs=["objectSid"]) + + dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0])) + + packed_sd = descriptor.sddl2binary("O:DAG:DA" + "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + "(A;;CC;;;AU)" + "(A;;RPLCLORC;;;WD)" + "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" + "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)", + domain_sid, {"DnsAdmins": dns_admin}) + expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd)) + + packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid, + {"DnsAdmins": dns_admin}) + expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns)) + + packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid) + expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, + packed_part_sd)) + try: + msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn)) + security_desc_dict = [(current_dn.get_linearized(), expected_sd), + (msdns_dn.get_linearized(), expected_msdns_sd), + (partition_dn.get_linearized(), expected_part_sd)] + + for (key, sec_desc) in security_desc_dict: + zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE, + attrs=["nTSecurityDescriptor"]) + self.assertIn("nTSecurityDescriptor", zones[0]) + tmp = zones[0]["nTSecurityDescriptor"][0] + utils = sd_utils.SDUtils(self.samdb) + + sd = ndr_unpack(security.descriptor, tmp) + diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid) + + self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s" + % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid))) + + finally: + self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + forest_zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + def test_security_descriptor_domain_zone(self): + """ + Make sure that security descriptors of domain dns zones are + as expected. + """ + + partition_dn = self.samdb.get_default_basedn() + partition_dn.add_child("DC=DomainDnsZones") + zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE, + expression="(name=%s)" % self.custom_zone, + attrs=["nTSecurityDescriptor"]) + self.assertEqual(len(zones), 1) + current_dn = zones[0].dn + self.assertIn("nTSecurityDescriptor", zones[0]) + tmp = zones[0]["nTSecurityDescriptor"][0] + utils = sd_utils.SDUtils(self.samdb) + sd = ndr_unpack(security.descriptor, tmp) + sddl = sd.as_sddl(utils.domain_sid) + + domain_sid = security.dom_sid(self.samdb.get_domain_sid()) + + res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE, + expression="(sAMAccountName=DnsAdmins)", + attrs=["objectSid"]) + + dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0])) + + packed_sd = descriptor.sddl2binary("O:DAG:DA" + "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + "(A;;CC;;;AU)" + "(A;;RPLCLORC;;;WD)" + "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" + "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)", + domain_sid, {"DnsAdmins": dns_admin}) + expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd)) + + packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid, + {"DnsAdmins": dns_admin}) + expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns)) + + packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid) + expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, + packed_part_sd)) + + msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn)) + security_desc_dict = [(current_dn.get_linearized(), expected_sd), + (msdns_dn.get_linearized(), expected_msdns_sd), + (partition_dn.get_linearized(), expected_part_sd)] + + for (key, sec_desc) in security_desc_dict: + zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE, + attrs=["nTSecurityDescriptor"]) + self.assertIn("nTSecurityDescriptor", zones[0]) + tmp = zones[0]["nTSecurityDescriptor"][0] + utils = sd_utils.SDUtils(self.samdb) + + sd = ndr_unpack(security.descriptor, tmp) + diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid) + + self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s" + % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid))) |