summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/samba_tool/dnscmd.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
commit8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch)
tree4099e8021376c7d8c05bdf8503093d80e9c7bad0 /python/samba/tests/samba_tool/dnscmd.py
parentInitial commit. (diff)
downloadsamba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz
samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python/samba/tests/samba_tool/dnscmd.py')
-rw-r--r--python/samba/tests/samba_tool/dnscmd.py1506
1 files changed, 1506 insertions, 0 deletions
diff --git a/python/samba/tests/samba_tool/dnscmd.py b/python/samba/tests/samba_tool/dnscmd.py
new file mode 100644
index 0000000..d372bc5
--- /dev/null
+++ b/python/samba/tests/samba_tool/dnscmd.py
@@ -0,0 +1,1506 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import ldb
+import re
+
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.dcerpc import dnsp
+from samba.tests.samba_tool.base import SambaToolCmdTest
+import time
+from samba import dsdb_dns
+
+
+class DnsCmdTestCase(SambaToolCmdTest):
+ def setUp(self):
+ super().setUp()
+
+ self.dburl = "ldap://%s" % os.environ["SERVER"]
+ self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
+ os.environ["DC_PASSWORD"])
+
+ self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
+ self.config_dn = str(self.samdb.get_config_basedn())
+
+ self.testip = "192.168.0.193"
+ self.testip2 = "192.168.0.194"
+
+ self.addCleanup(self.deleteZone)
+ self.addZone()
+
+ # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
+
+ good_dns = ["SAMDOM.EXAMPLE.COM",
+ "1.EXAMPLE.COM",
+ "%sEXAMPLE.COM" % ("1." * 100),
+ "EXAMPLE",
+ "!@#$%^&*()_",
+ "HIGH\xFFBYTE",
+ "@.EXAMPLE.COM",
+ "."]
+ bad_dns = ["...",
+ ".EXAMPLE.COM",
+ ".EXAMPLE.",
+ "",
+ "SAMDOM..EXAMPLE.COM"]
+
+ good_mx = ["SAMDOM.EXAMPLE.COM 65530",
+ "SAMDOM.EXAMPLE.COM 0"]
+ bad_mx = ["SAMDOM.EXAMPLE.COM -1",
+ "SAMDOM.EXAMPLE.COM",
+ " ",
+ "SAMDOM.EXAMPLE.COM 1 1",
+ "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
+
+ good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530",
+ "SAMDOM.EXAMPLE.COM 1 1 1"]
+ bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
+ "SAMDOM.EXAMPLE.COM 0 0 65536",
+ "SAMDOM.EXAMPLE.COM 65536 0 0"]
+
+ for bad_dn in bad_dns:
+ bad_mx.append("%s 1" % bad_dn)
+ bad_srv.append("%s 0 0 0" % bad_dn)
+ for good_dn in good_dns:
+ good_mx.append("%s 1" % good_dn)
+ good_srv.append("%s 0 0 0" % good_dn)
+
+ self.good_records = {
+ "A":["192.168.0.1", "255.255.255.255"],
+ "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
+ "0000:0000:0000:0000:0000:0000:0000:0000",
+ "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
+ "1234:1234:1234::",
+ "1234:5678:9ABC:DEF0::",
+ "0000:0000::0000",
+ "1234::5678:9ABC:0000:0000:0000:0000",
+ "::1",
+ "::",
+ "1:1:1:1:1:1:1:1"],
+ "PTR": good_dns,
+ "CNAME": good_dns,
+ "NS": good_dns,
+ "MX": good_mx,
+ "SRV": good_srv,
+ "TXT": ["text", "", "@#!", "\n"]
+ }
+
+ self.bad_records = {
+ "A":["192.168.0.500",
+ "255.255.255.255/32"],
+ "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
+ "0000:0000:0000:0000:0000:0000:0000:0000/1",
+ "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
+ "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
+ "1234:5678:9ABC:DEF0:1234:5678:9ABC",
+ "1111::1111::1111"],
+ "PTR": bad_dns,
+ "CNAME": bad_dns,
+ "NS": bad_dns,
+ "MX": bad_mx,
+ "SRV": bad_srv
+ }
+
+ def resetZone(self):
+ self.deleteZone()
+ self.addZone()
+
+ def addZone(self):
+ self.zone = "zone"
+ result, out, err = self.runsubcmd("dns",
+ "zonecreate",
+ os.environ["SERVER"],
+ self.zone,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+
+ def deleteZone(self):
+ result, out, err = self.runsubcmd("dns",
+ "zonedelete",
+ os.environ["SERVER"],
+ self.zone,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+
+ def get_all_records(self, zone_name):
+ zone_dn = (f"DC={zone_name},CN=MicrosoftDNS,DC=DomainDNSZones,"
+ f"{self.samdb.get_default_basedn()}")
+
+ expression = "(&(objectClass=dnsNode)(!(dNSTombstoned=TRUE)))"
+
+ nodes = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
+ expression=expression,
+ attrs=["dnsRecord", "name"])
+
+ record_map = {}
+ for node in nodes:
+ name = node["name"][0].decode()
+ record_map[name] = list(node["dnsRecord"])
+
+ return record_map
+
+ def get_record_from_db(self, zone_name, record_name):
+ zones = self.samdb.search(base="DC=DomainDnsZones,%s"
+ % self.samdb.get_default_basedn(),
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(objectClass=dnsZone)",
+ attrs=["cn"])
+
+ for zone in zones:
+ if zone_name in str(zone.dn):
+ zone_dn = zone.dn
+ break
+
+ records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="(objectClass=dnsNode)",
+ attrs=["dnsRecord"])
+
+ for old_packed_record in records:
+ if record_name in str(old_packed_record.dn):
+ return (old_packed_record.dn,
+ ndr_unpack(dnsp.DnssrvRpcRecord,
+ old_packed_record["dnsRecord"][0]))
+
+ def test_rank_none(self):
+ record_str = "192.168.50.50"
+ record_type_str = "A"
+
+ result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
+ self.zone, "testrecord", record_type_str,
+ record_str, self.creds_string)
+ self.assertCmdSuccess(result, out, err,
+ "Failed to add record '%s' with type %s."
+ % (record_str, record_type_str))
+
+ dn, record = self.get_record_from_db(self.zone, "testrecord")
+ record.rank = 0 # DNS_RANK_NONE
+ res = self.samdb.dns_replace_by_dn(dn, [record])
+ if res is not None:
+ self.fail("Unable to update dns record to have DNS_RANK_NONE.")
+
+ errors = []
+
+ # The record should still exist
+ result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
+ self.zone, "testrecord", record_type_str,
+ self.creds_string)
+ try:
+ self.assertCmdSuccess(result, out, err,
+ "Failed to query for a record"
+ "which had DNS_RANK_NONE.")
+ self.assertTrue("testrecord" in out and record_str in out,
+ "Query for a record which had DNS_RANK_NONE"
+ "succeeded but produced no resulting records.")
+ except AssertionError:
+ # Windows produces no resulting records
+ pass
+
+ # We should not be able to add a duplicate
+ result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
+ self.zone, "testrecord", record_type_str,
+ record_str, self.creds_string)
+ try:
+ self.assertCmdFail(result, "Successfully added duplicate record"
+ "of one which had DNS_RANK_NONE.")
+ except AssertionError as e:
+ errors.append(e)
+
+ # We should be able to delete it
+ result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
+ self.zone, "testrecord", record_type_str,
+ record_str, self.creds_string)
+ try:
+ self.assertCmdSuccess(result, out, err, "Failed to delete record"
+ "which had DNS_RANK_NONE.")
+ except AssertionError as e:
+ errors.append(e)
+
+ # Now the record should not exist
+ result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
+ self.zone, "testrecord",
+ record_type_str, self.creds_string)
+ try:
+ self.assertCmdFail(result, "Successfully queried for deleted record"
+ "which had DNS_RANK_NONE.")
+ except AssertionError as e:
+ errors.append(e)
+
+ if len(errors) > 0:
+ err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
+ for error in errors:
+ err_str = err_str + "\n" + str(error)
+ raise AssertionError(err_str)
+
+ def test_accept_valid_commands(self):
+ """
+ For all good records, attempt to add, query and delete them.
+ """
+ num_failures = 0
+ failure_msgs = []
+ for dnstype in self.good_records:
+ for record in self.good_records[dnstype]:
+ try:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add"
+ "record %s with type %s."
+ % (record, dnstype))
+
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to query"
+ "record %s with qualifier %s."
+ % (record, dnstype))
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to remove"
+ "record %s with type %s."
+ % (record, dnstype))
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ failure_msgs.append(e)
+
+ if num_failures > 0:
+ for msg in failure_msgs:
+ print(msg)
+ self.fail("Failed to accept valid commands. %d total failures."
+ "Errors above." % num_failures)
+
+ def test_reject_invalid_commands(self):
+ """
+ For all bad records, attempt to add them and update to them,
+ making sure that both operations fail.
+ """
+ num_failures = 0
+ failure_msgs = []
+
+ # Add invalid records and make sure they fail to be added
+ for dnstype in self.bad_records:
+ for record in self.bad_records[dnstype]:
+ try:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdFail(result, "Successfully added invalid"
+ "record '%s' of type '%s'."
+ % (record, dnstype))
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ failure_msgs.append(e)
+ self.resetZone()
+ try:
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdFail(result, "Successfully deleted invalid"
+ "record '%s' of type '%s' which"
+ "shouldn't exist." % (record, dnstype))
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ failure_msgs.append(e)
+ self.resetZone()
+
+ # Update valid records to invalid ones and make sure they
+ # fail to be updated
+ for dnstype in self.bad_records:
+ for bad_record in self.bad_records[dnstype]:
+ good_record = self.good_records[dnstype][0]
+
+ try:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, good_record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add "
+ "record '%s' with type %s."
+ % (record, dnstype))
+
+ result, out, err = self.runsubcmd("dns", "update",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, good_record,
+ bad_record,
+ self.creds_string)
+ self.assertCmdFail(result, "Successfully updated valid "
+ "record '%s' of type '%s' to invalid "
+ "record '%s' of the same type."
+ % (good_record, dnstype, bad_record))
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, good_record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Could not delete "
+ "valid record '%s' of type '%s'."
+ % (good_record, dnstype))
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ failure_msgs.append(e)
+ self.resetZone()
+
+ if num_failures > 0:
+ for msg in failure_msgs:
+ print(msg)
+ self.fail("Failed to reject invalid commands. %d total failures. "
+ "Errors above." % num_failures)
+
+ def test_update_invalid_type(self):
+ """Make sure that a record can't be updated to another type leaving
+ the data the same, where that data would be incompatible with
+ the new type. This is not always enforced at the C level.
+
+ We don't try with all types, because many types are compatible
+ in their representations (e.g. A records could be TXT or CNAME
+ records; PTR record values are exactly the same as CNAME
+ record values, etc).
+ """
+ dnstypes = ('A', 'AAAA', 'SRV')
+ for dnstype1 in dnstypes:
+ record1 = self.good_records[dnstype1][0]
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype1, record1,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add "
+ "record %s with type %s."
+ % (record1, dnstype1))
+
+ for dnstype2 in dnstypes:
+ if dnstype1 == dnstype2:
+ continue
+
+ record2 = self.good_records[dnstype2][0]
+
+ # Check both ways: Give the current type and try to update,
+ # and give the new type and try to update.
+ result, out, err = self.runsubcmd("dns", "update",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype1, record1,
+ record2, self.creds_string)
+ self.assertCmdFail(result, "Successfully updated record '%s' "
+ "to '%s', even though the latter is of "
+ "type '%s' where '%s' was expected."
+ % (record1, record2, dnstype2, dnstype1))
+
+ result, out, err = self.runsubcmd("dns", "update",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype2, record1, record2,
+ self.creds_string)
+ self.assertCmdFail(result, "Successfully updated record "
+ "'%s' to '%s', even though the former "
+ "is of type '%s' where '%s' was expected."
+ % (record1, record2, dnstype1, dnstype2))
+
+ def test_update_valid_type(self):
+ for dnstype in self.good_records:
+ for record in self.good_records[dnstype]:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add "
+ "record %s with type %s."
+ % (record, dnstype))
+
+ if record == '.' and dnstype != 'TXT':
+ # This will fail because the update finds a match
+ # for "." that is actually "" (in
+ # dns_record_match()), then uses the "" record in
+ # a call to dns_to_dnsp_convert() which calls
+ # dns_name_check() which rejects "" as a bad DNS
+ # name. Maybe FIXME, maybe not.
+ continue
+
+ # Update the record to be the same.
+ result, out, err = self.runsubcmd("dns", "update",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record, record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err,
+ "Could not update record "
+ "'%s' to be exactly the same." % record)
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Could not delete "
+ "valid record '%s' of type '%s'."
+ % (record, dnstype))
+
+ for record in self.good_records["SRV"]:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "SRV", record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add "
+ "record %s with type 'SRV'." % record)
+
+ split = record.split()
+ new_bit = str(int(split[3]) + 1)
+ new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
+
+ result, out, err = self.runsubcmd("dns", "update",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "SRV", record,
+ new_record, self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to update record "
+ "'%s' of type '%s' to '%s'."
+ % (record, "SRV", new_record))
+
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "SRV", self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to query for "
+ "record '%s' of type '%s'."
+ % (new_record, "SRV"))
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "SRV", new_record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Could not delete "
+ "valid record '%s' of type '%s'."
+ % (new_record, "SRV"))
+
+ # Since 'dns update' takes the current value as a parameter, make sure
+ # we can't enter the wrong current value for a given record.
+ for dnstype in self.good_records:
+ if len(self.good_records[dnstype]) < 3:
+ continue # Not enough records of this type to do this test
+
+ used_record = self.good_records[dnstype][0]
+ unused_record = self.good_records[dnstype][1]
+ new_record = self.good_records[dnstype][2]
+
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, used_record,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add record %s "
+ "with type %s." % (used_record, dnstype))
+
+ result, out, err = self.runsubcmd("dns", "update",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype, unused_record,
+ new_record,
+ self.creds_string)
+ self.assertCmdFail(result, "Successfully updated record '%s' "
+ "from '%s' to '%s', even though the given "
+ "source record is incorrect."
+ % (used_record, unused_record, new_record))
+
+ def test_invalid_types(self):
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "SOA", "test",
+ self.creds_string)
+ self.assertCmdFail(result, "Successfully added record of type SOA, "
+ "when this type should not be available.")
+ self.assertTrue("type SOA is not supported" in err,
+ "Invalid error message '%s' when attempting to "
+ "add record of type SOA." % err)
+
+ def test_add_overlapping_different_type(self):
+ """
+ Make sure that we can add an entry with the same name as an existing one but a different type.
+ """
+
+ i = 0
+ for dnstype1 in self.good_records:
+ record1 = self.good_records[dnstype1][0]
+ for dnstype2 in self.good_records:
+ # Only do some subset of dns types, otherwise it takes a long time.
+ i += 1
+ if i % 4 != 0:
+ continue
+
+ if dnstype1 == dnstype2:
+ continue
+
+ record2 = self.good_records[dnstype2][0]
+
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype1, record1,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add record "
+ "'%s' of type '%s'." % (record1, dnstype1))
+
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype2, record2,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to add record "
+ "'%s' of type '%s' when a record '%s' "
+ "of type '%s' with the same name exists."
+ % (record1, dnstype1, record2, dnstype2))
+
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype1, self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to query for "
+ "record '%s' of type '%s' when a new "
+ "record '%s' of type '%s' with the same "
+ "name was added."
+ % (record1, dnstype1, record2, dnstype2))
+
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype2, self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to query "
+ "record '%s' of type '%s' which should "
+ "have been added with the same name as "
+ "record '%s' of type '%s'."
+ % (record2, dnstype2, record1, dnstype1))
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype1, record1,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to delete "
+ "record '%s' of type '%s'."
+ % (record1, dnstype1))
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ dnstype2, record2,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err, "Failed to delete "
+ "record '%s' of type '%s'."
+ % (record2, dnstype2))
+
+ def test_query_deleted_record(self):
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ "testrecord", "A", self.testip, self.creds_string)
+ self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
+ "testrecord", "A", self.testip, self.creds_string)
+
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "A", self.creds_string)
+ self.assertCmdFail(result)
+
+ def test_add_duplicate_record(self):
+ for record_type in self.good_records:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ record_type,
+ self.good_records[record_type][0],
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ record_type,
+ self.good_records[record_type][0],
+ self.creds_string)
+ self.assertCmdFail(result)
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ record_type, self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ record_type,
+ self.good_records[record_type][0],
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+
+ def test_remove_deleted_record(self):
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ "testrecord", "A", self.testip, self.creds_string)
+ self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
+ "testrecord", "A", self.testip, self.creds_string)
+
+ # Attempting to delete a record that has already been deleted or has never existed should fail
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "A", self.testip, self.creds_string)
+ self.assertCmdFail(result)
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, "testrecord",
+ "A", self.creds_string)
+ self.assertCmdFail(result)
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, "testrecord2",
+ "A", self.testip, self.creds_string)
+ self.assertCmdFail(result)
+
+ def test_cleanup_record(self):
+ """
+ Test dns cleanup command is working fine.
+ """
+
+ # add a A record
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'testa', "A", self.testip, self.creds_string)
+
+ # the above A record points to this host
+ dnshostname = '{0}.{1}'.format('testa', self.zone.lower())
+
+ # add a CNAME record points to above host
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'testcname', "CNAME", dnshostname, self.creds_string)
+
+ # add a NS record
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'testns', "NS", dnshostname, self.creds_string)
+
+ # add a PTR record points to above host
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'testptr', "PTR", dnshostname, self.creds_string)
+
+ # add a SRV record points to above host
+ srv_record = "{0} 65530 65530 65530".format(dnshostname)
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'testsrv', "SRV", srv_record, self.creds_string)
+
+ # cleanup record for this dns host
+ self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
+ dnshostname, self.creds_string)
+
+ # all records should be marked as dNSTombstoned
+ for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
+
+ records = self.samdb.search(
+ base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(&(objectClass=dnsNode)(name={0}))".format(record_name),
+ attrs=["dNSTombstoned"])
+
+ self.assertEqual(len(records), 1)
+ for record in records:
+ self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
+
+ def test_cleanup_record_no_A_record(self):
+ """
+ Test dns cleanup command works with no A record.
+ """
+
+ # add a A record
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'notesta', "A", self.testip, self.creds_string)
+
+ # the above A record points to this host
+ dnshostname = '{0}.{1}'.format('testa', self.zone.lower())
+
+ # add a CNAME record points to above host
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'notestcname', "CNAME", dnshostname, self.creds_string)
+
+ # add a NS record
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'notestns', "NS", dnshostname, self.creds_string)
+
+ # add a PTR record points to above host
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'notestptr', "PTR", dnshostname, self.creds_string)
+
+ # add a SRV record points to above host
+ srv_record = "{0} 65530 65530 65530".format(dnshostname)
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ 'notestsrv', "SRV", srv_record, self.creds_string)
+
+ # Remove the initial A record (leading to hanging references)
+ self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
+ 'notesta', "A", self.testip, self.creds_string)
+
+ # cleanup record for this dns host
+ self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
+ dnshostname, self.creds_string)
+
+ # all records should be marked as dNSTombstoned
+ for record_name in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:
+
+ records = self.samdb.search(
+ base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(&(objectClass=dnsNode)(name={0}))".format(record_name),
+ attrs=["dNSTombstoned"])
+
+ self.assertEqual(len(records), 1)
+ for record in records:
+ self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
+
+ def test_cleanup_multi_srv_record(self):
+ """
+ Test dns cleanup command for multi-valued SRV record.
+
+ Steps:
+ - Add 2 A records host1 and host2
+ - Add a SRV record srv1 and points to both host1 and host2
+ - Run cleanup command for host1
+ - Check records for srv1, data for host1 should be gone and host2 is kept.
+ """
+
+ hosts = ['host1', 'host2'] # A record names
+ srv_name = 'srv1'
+
+ # add A records
+ for host in hosts:
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ host, "A", self.testip, self.creds_string)
+
+ # the above A record points to this host
+ dnshostname = '{0}.{1}'.format(host, self.zone.lower())
+
+ # add a SRV record points to above host
+ srv_record = "{0} 65530 65530 65530".format(dnshostname)
+ self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+ srv_name, "SRV", srv_record, self.creds_string)
+
+ records = self.samdb.search(
+ base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(&(objectClass=dnsNode)(name={0}))".format(srv_name),
+ attrs=['dnsRecord'])
+ # should have 2 records here
+ self.assertEqual(len(records[0]['dnsRecord']), 2)
+
+ # cleanup record for dns host1
+ dnshostname1 = 'host1.{0}'.format(self.zone.lower())
+ self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
+ dnshostname1, self.creds_string)
+
+ records = self.samdb.search(
+ base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(&(objectClass=dnsNode)(name={0}))".format(srv_name),
+ attrs=['dnsRecord', 'dNSTombstoned'])
+
+ # dnsRecord for host1 should be deleted
+ self.assertEqual(len(records[0]['dnsRecord']), 1)
+
+ # unpack data
+ dns_record_bin = records[0]['dnsRecord'][0]
+ dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
+
+ # dnsRecord for host2 is still there and is the only one
+ dnshostname2 = 'host2.{0}'.format(self.zone.lower())
+ self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)
+
+ # assert that the record isn't spuriously tombstoned
+ self.assertTrue('dNSTombstoned' not in records[0] or
+ str(records[0]['dNSTombstoned']) == 'FALSE')
+
+ def test_dns_wildcards(self):
+ """
+ Ensure that DNS wild card entries can be added deleted and queried
+ """
+ num_failures = 0
+ failure_msgs = []
+ records = [("*.", "MISS", "A", "1.1.1.1"),
+ ("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
+ for (name, miss, dnstype, record) in records:
+ try:
+ result, out, err = self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, name,
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdSuccess(
+ result,
+ out,
+ err,
+ ("Failed to add record %s (%s) with type %s."
+ % (name, record, dnstype)))
+
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, name,
+ dnstype,
+ self.creds_string)
+ self.assertCmdSuccess(
+ result,
+ out,
+ err,
+ ("Failed to query record %s with qualifier %s."
+ % (record, dnstype)))
+
+ # dns tool does not perform dns wildcard search if the name
+ # does not match
+ result, out, err = self.runsubcmd("dns", "query",
+ os.environ["SERVER"],
+ self.zone, miss,
+ dnstype,
+ self.creds_string)
+ self.assertCmdFail(
+ result,
+ ("Failed to query record %s with qualifier %s."
+ % (record, dnstype)))
+
+ result, out, err = self.runsubcmd("dns", "delete",
+ os.environ["SERVER"],
+ self.zone, name,
+ dnstype, record,
+ self.creds_string)
+ self.assertCmdSuccess(
+ result,
+ out,
+ err,
+ ("Failed to remove record %s with type %s."
+ % (record, dnstype)))
+ except AssertionError as e:
+ num_failures = num_failures + 1
+ failure_msgs.append(e)
+
+ if num_failures > 0:
+ for msg in failure_msgs:
+ print(msg)
+ self.fail("Failed to accept valid commands. %d total failures."
+ "Errors above." % num_failures)
+
+ def test_serverinfo(self):
+ for v in ['w2k', 'dotnet', 'longhorn']:
+ result, out, err = self.runsubcmd("dns",
+ "serverinfo",
+ "--client-version", v,
+ os.environ["SERVER"],
+ self.creds_string)
+ self.assertCmdSuccess(result,
+ out,
+ err,
+ "Failed to print serverinfo with "
+ "client version %s" % v)
+ self.assertTrue(out != '')
+
+ def test_zoneinfo(self):
+ result, out, err = self.runsubcmd("dns",
+ "zoneinfo",
+ os.environ["SERVER"],
+ self.zone,
+ self.creds_string)
+ self.assertCmdSuccess(result,
+ out,
+ err,
+ "Failed to print zoneinfo")
+ self.assertTrue(out != '')
+
+ def test_zoneoptions_aging(self):
+ for options, vals, error in (
+ (['--aging=1'], {'fAging': 'TRUE'}, False),
+ (['--aging=0'], {'fAging': 'FALSE'}, False),
+ (['--aging=-1'], {'fAging': 'FALSE'}, True),
+ (['--aging=2'], {}, True),
+ (['--aging=2', '--norefreshinterval=1'], {}, True),
+ (['--aging=1', '--norefreshinterval=1'],
+ {'fAging': 'TRUE', 'dwNoRefreshInterval': '1'}, False),
+ (['--aging=1', '--norefreshinterval=0'],
+ {'fAging': 'TRUE', 'dwNoRefreshInterval': '0'}, False),
+ (['--aging=0', '--norefreshinterval=99', '--refreshinterval=99'],
+ {'fAging': 'FALSE',
+ 'dwNoRefreshInterval': '99',
+ 'dwRefreshInterval': '99'}, False),
+ (['--aging=0', '--norefreshinterval=-99', '--refreshinterval=99'],
+ {}, True),
+ (['--refreshinterval=9999999'], {}, True),
+ (['--norefreshinterval=9999999'], {}, True),
+ ):
+ result, out, err = self.runsubcmd("dns",
+ "zoneoptions",
+ os.environ["SERVER"],
+ self.zone,
+ self.creds_string,
+ *options)
+ if error:
+ self.assertCmdFail(result, "zoneoptions should fail")
+ else:
+ self.assertCmdSuccess(result,
+ out,
+ err,
+ "zoneoptions shouldn't fail")
+
+
+ info_r, info_out, info_err = self.runsubcmd("dns",
+ "zoneinfo",
+ os.environ["SERVER"],
+ self.zone,
+ self.creds_string)
+
+ self.assertCmdSuccess(info_r,
+ info_out,
+ info_err,
+ "zoneinfo shouldn't fail after zoneoptions")
+
+ info = {k: v for k, v in re.findall(r'^\s*(\w+)\s*:\s*(\w+)\s*$',
+ info_out,
+ re.MULTILINE)}
+ for k, v in vals.items():
+ self.assertIn(k, info)
+ self.assertEqual(v, info[k])
+
+
+ def ldap_add_node_with_records(self, name, records):
+ dn = (f"DC={name},DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
+ f"{self.samdb.get_default_basedn()}")
+
+ dns_records = []
+ for r in records:
+ rec = dnsp.DnssrvRpcRecord()
+ rec.wType = r.get('wType', dnsp.DNS_TYPE_A)
+ rec.rank = dnsp.DNS_RANK_ZONE
+ rec.dwTtlSeconds = 900
+ rec.dwTimeStamp = r.get('dwTimeStamp', 0)
+ rec.data = r.get('data', '10.10.10.10')
+ dns_records.append(ndr_pack(rec))
+
+ msg = ldb.Message.from_dict(self.samdb,
+ {'dn': dn,
+ "objectClass": ["top", "dnsNode"],
+ 'dnsRecord': dns_records
+ })
+ self.samdb.add(msg)
+
+ def get_timestamp_map(self):
+ re_wtypes = (dnsp.DNS_TYPE_A,
+ dnsp.DNS_TYPE_AAAA,
+ dnsp.DNS_TYPE_TXT)
+
+ t = time.time()
+ now = dsdb_dns.unix_to_dns_timestamp(int(t))
+
+ records = self.get_all_records(self.zone)
+ tsmap = {}
+ for k, recs in records.items():
+ m = []
+ tsmap[k] = m
+ for rec in recs:
+ r = ndr_unpack(dnsp.DnssrvRpcRecord, rec)
+ timestamp = r.dwTimeStamp
+ if abs(timestamp - now) < 3:
+ timestamp = 'nowish'
+
+ if r.wType in re_wtypes:
+ m.append(('R', timestamp))
+ else:
+ m.append(('-', timestamp))
+
+ return tsmap
+
+
+ def test_zoneoptions_mark_records(self):
+ self.maxDiff = 10000
+ # We need a number of records to work with, so we'll use part
+ # of our known good records list, using three different names
+ # to test the regex. All these records will be static.
+ for dnstype in self.good_records:
+ for record in self.good_records[dnstype][:2]:
+ self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "frobitz",
+ dnstype, record,
+ self.creds_string)
+ self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "weergly",
+ dnstype, record,
+ self.creds_string)
+ self.runsubcmd("dns", "add",
+ os.environ["SERVER"],
+ self.zone, "snizle",
+ dnstype, record,
+ self.creds_string)
+
+ # and we also want some that aren't static, and some mixed
+ # static/dynamic records.
+ # timestamps are in hours since 1601; now ~= 3.7 million
+ for ts in (0, 100, 10 ** 6, 10 ** 7):
+ name = f"ts-{ts}"
+ self.ldap_add_node_with_records(name, [{"dwTimeStamp": ts}])
+
+ recs = []
+ for ts in (0, 100, 10 ** 6, 10 ** 7):
+ addr = f'10.{(ts >> 16) & 255}.{(ts >> 8) & 255}.{ts & 255}'
+ recs.append({"dwTimeStamp": ts, "data": addr})
+
+ self.ldap_add_node_with_records("ts-multi", recs)
+
+ # get the state of ALL records.
+ # then we make assertions about the diffs, keeping track of
+ # the current state.
+
+ tsmap = self.get_timestamp_map()
+
+
+
+ for options, diff, output_substrings, error in (
+ # --mark-old-records-static
+ # --mark-records-static-regex
+ # --mark-records-dynamic-regex
+ (
+ ['--mark-old-records-static=1971-13-04'],
+ {},
+ [],
+ "bad date"
+ ),
+ (
+ # using --dry-run, should be no change, but output.
+ ['--mark-old-records-static=1971-03-04', '--dry-run'],
+ {},
+ [
+ "would make 1/1 records static on ts-1000000.zone.",
+ "would make 1/1 records static on ts-100.zone.",
+ "would make 2/4 records static on ts-multi.zone.",
+ ],
+ False
+ ),
+ (
+ # timestamps < ~ 3.25 million are now static
+ ['--mark-old-records-static=1971-03-04'],
+ {
+ 'ts-100': [('R', 0)],
+ 'ts-1000000': [('R', 0)],
+ 'ts-multi': [('R', 0), ('R', 0), ('R', 0), ('R', 10000000)]
+ },
+ [
+ "made 1/1 records static on ts-1000000.zone.",
+ "made 1/1 records static on ts-100.zone.",
+ "made 2/4 records static on ts-multi.zone.",
+ ],
+ False
+ ),
+ (
+ # no change, old records already static
+ ['--mark-old-records-static=1972-03-04'],
+ {},
+ [],
+ False
+ ),
+ (
+ # no change, samba-tool added records already static
+ ['--mark-records-static-regex=sniz'],
+ {},
+ [],
+ False
+ ),
+ (
+ # snizle has 2 A, 2 AAAA, 10 fancy, and 2 TXT records, in
+ # that order.
+ # the A, AAAA, and TXT records should be dynamic
+ ['--mark-records-dynamic-regex=sniz'],
+ {'snizle': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 'nowish'),
+ ('R', 'nowish')]
+ },
+ ['made 6/16 records dynamic on snizle.zone.'],
+ False
+ ),
+ (
+ # This regex should catch snizle, weergly, and ts-*
+ # but we're doing dry-run so no change
+ ['--mark-records-dynamic-regex=[sw]', '-n'],
+ {},
+ ['would make 3/4 records dynamic on ts-multi.zone.',
+ 'would make 1/1 records dynamic on ts-0.zone.',
+ 'would make 1/1 records dynamic on ts-1000000.zone.',
+ 'would make 6/16 records dynamic on weergly.zone.',
+ 'would make 1/1 records dynamic on ts-100.zone.'
+ ],
+ False
+ ),
+ (
+ # This regex should catch snizle and frobitz
+ # but snizle has already been changed.
+ ['--mark-records-dynamic-regex=z'],
+ {'frobitz': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 'nowish'),
+ ('R', 'nowish')]
+ },
+ ['made 6/16 records dynamic on frobitz.zone.'],
+ False
+ ),
+ (
+ # This regex should catch snizle, frobitz, and
+ # ts-multi. Note that the 1e7 ts-multi record is
+ # already dynamic and doesn't change.
+ ['--mark-records-dynamic-regex=[i]'],
+ {'ts-multi': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 10000000)]
+ },
+ ['made 3/4 records dynamic on ts-multi.zone.'],
+ False
+ ),
+ (
+ # matches no records
+ ['--mark-records-dynamic-regex=^aloooooo[qw]+'],
+ {},
+ [],
+ False
+ ),
+ (
+ # This should be an error, as only one --mark-*
+ # argument is allowed at a time
+ ['--mark-records-dynamic-regex=.',
+ '--mark-records-static-regex=.',
+ ],
+ {},
+ [],
+ True
+ ),
+ (
+ # This should also be an error
+ ['--mark-old-records-static=1997-07-07',
+ '--mark-records-static-regex=.',
+ ],
+ {},
+ [],
+ True
+ ),
+ (
+ # This should not be an error. --aging and refresh
+ # options can be mixed with --mark ones.
+ ['--mark-old-records-static=1997-07-07',
+ '--aging=0',
+ ],
+ {},
+ ['Set Aging to 0'],
+ False
+ ),
+ (
+ # This regex should catch weergly, but all the
+ # records are already static,
+ ['--mark-records-static-regex=wee'],
+ {},
+ [],
+ False
+ ),
+ (
+ # Make frobitz static again.
+ ['--mark-records-static-regex=obi'],
+ {'frobitz': [('R', 0),
+ ('R', 0),
+ ('R', 0),
+ ('R', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 0),
+ ('R', 0)]
+ },
+ ['made 6/16 records static on frobitz.zone.'],
+ False
+ ),
+ (
+ # would make almost everything static, but --dry-run
+ ['--mark-old-records-static=2222-03-04', '--dry-run'],
+ {},
+ [
+ 'would make 6/16 records static on snizle.zone.',
+ 'would make 3/4 records static on ts-multi.zone.'
+ ],
+ False
+ ),
+ (
+ # make everything static
+ ['--mark-records-static-regex=.'],
+ {'snizle': [('R', 0),
+ ('R', 0),
+ ('R', 0),
+ ('R', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 0),
+ ('R', 0)],
+ 'ts-10000000': [('R', 0)],
+ 'ts-multi': [('R', 0), ('R', 0), ('R', 0), ('R', 0)]
+ },
+ [
+ 'made 4/4 records static on ts-multi.zone.',
+ 'made 1/1 records static on ts-10000000.zone.',
+ 'made 6/16 records static on snizle.zone.',
+ ],
+ False
+ ),
+ (
+ # make everything dynamic that can be
+ ['--mark-records-dynamic-regex=.'],
+ {'frobitz': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 'nowish'),
+ ('R', 'nowish')],
+ 'snizle': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 'nowish'),
+ ('R', 'nowish')],
+ 'ts-0': [('R', 'nowish')],
+ 'ts-100': [('R', 'nowish')],
+ 'ts-1000000': [('R', 'nowish')],
+ 'ts-10000000': [('R', 'nowish')],
+ 'ts-multi': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish')],
+ 'weergly': [('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('R', 'nowish'),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('-', 0),
+ ('R', 'nowish'),
+ ('R', 'nowish')]
+ },
+ [
+ 'made 4/4 records dynamic on ts-multi.zone.',
+ 'made 6/16 records dynamic on snizle.zone.',
+ 'made 1/1 records dynamic on ts-0.zone.',
+ 'made 1/1 records dynamic on ts-1000000.zone.',
+ 'made 1/1 records dynamic on ts-10000000.zone.',
+ 'made 1/1 records dynamic on ts-100.zone.',
+ 'made 6/16 records dynamic on frobitz.zone.',
+ 'made 6/16 records dynamic on weergly.zone.',
+ ],
+ False
+ ),
+ ):
+ result, out, err = self.runsubcmd("dns",
+ "zoneoptions",
+ os.environ["SERVER"],
+ self.zone,
+ self.creds_string,
+ *options)
+ if error:
+ self.assertCmdFail(result, f"zoneoptions should fail ({error})")
+ else:
+ self.assertCmdSuccess(result,
+ out,
+ err,
+ "zoneoptions shouldn't fail")
+
+ new_tsmap = self.get_timestamp_map()
+
+ # same keys, always
+ self.assertEqual(sorted(new_tsmap), sorted(tsmap))
+ changes = {}
+ for k in tsmap:
+ if tsmap[k] != new_tsmap[k]:
+ changes[k] = new_tsmap[k]
+
+ self.assertEqual(diff, changes)
+
+ for s in output_substrings:
+ self.assertIn(s, out)
+ tsmap = new_tsmap
+
+ def test_zonecreate_dns_domain_directory_partition(self):
+ zone = "test-dns-domain-dp-zone"
+ dns_dp_opt = "--dns-directory-partition=domain"
+
+ result, out, err = self.runsubcmd("dns",
+ "zonecreate",
+ os.environ["SERVER"],
+ zone,
+ self.creds_string,
+ dns_dp_opt)
+ self.assertCmdSuccess(result,
+ out,
+ err,
+ "Failed to create zone with "
+ "--dns-directory-partition option")
+ self.assertTrue('Zone %s created successfully' % zone in out,
+ "Unexpected output: %s")
+
+ result, out, err = self.runsubcmd("dns",
+ "zoneinfo",
+ os.environ["SERVER"],
+ zone,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+ self.assertTrue("DNS_DP_DOMAIN_DEFAULT" in out,
+ "Missing DNS_DP_DOMAIN_DEFAULT flag")
+
+ result, out, err = self.runsubcmd("dns",
+ "zonedelete",
+ os.environ["SERVER"],
+ zone,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err,
+ "Failed to delete zone in domain DNS directory "
+ "partition")
+ result, out, err = self.runsubcmd("dns",
+ "zonelist",
+ os.environ["SERVER"],
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err,
+ "Failed to delete zone in domain DNS directory "
+ "partition")
+ self.assertTrue(zone not in out,
+ "Deleted zone still exists")
+
+ def test_zonecreate_dns_forest_directory_partition(self):
+ zone = "test-dns-forest-dp-zone"
+ dns_dp_opt = "--dns-directory-partition=forest"
+
+ result, out, err = self.runsubcmd("dns",
+ "zonecreate",
+ os.environ["SERVER"],
+ zone,
+ self.creds_string,
+ dns_dp_opt)
+ self.assertCmdSuccess(result,
+ out,
+ err,
+ "Failed to create zone with "
+ "--dns-directory-partition option")
+ self.assertTrue('Zone %s created successfully' % zone in out,
+ "Unexpected output: %s")
+
+ result, out, err = self.runsubcmd("dns",
+ "zoneinfo",
+ os.environ["SERVER"],
+ zone,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err)
+ self.assertTrue("DNS_DP_FOREST_DEFAULT" in out,
+ "Missing DNS_DP_FOREST_DEFAULT flag")
+
+ result, out, err = self.runsubcmd("dns",
+ "zonedelete",
+ os.environ["SERVER"],
+ zone,
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err,
+ "Failed to delete zone in forest DNS directory "
+ "partition")
+
+ result, out, err = self.runsubcmd("dns",
+ "zonelist",
+ os.environ["SERVER"],
+ self.creds_string)
+ self.assertCmdSuccess(result, out, err,
+ "Failed to delete zone in forest DNS directory "
+ "partition")
+ self.assertTrue(zone not in out,
+ "Deleted zone still exists")