diff options
Diffstat (limited to 'python/samba/tests/dsdb.py')
-rw-r--r-- | python/samba/tests/dsdb.py | 1223 |
1 files changed, 1223 insertions, 0 deletions
diff --git a/python/samba/tests/dsdb.py b/python/samba/tests/dsdb.py new file mode 100644 index 0000000..4d5b620 --- /dev/null +++ b/python/samba/tests/dsdb.py @@ -0,0 +1,1223 @@ +# Unix SMB/CIFS implementation. Tests for dsdb +# Copyright (C) Matthieu Patou <mat@matws.net> 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dsdb.""" + +from samba.credentials import Credentials +from samba.samdb import SamDB +from samba.auth import system_session +from samba.tests import TestCase +from samba.tests import delete_force +from samba.ndr import ndr_unpack, ndr_pack +from samba.dcerpc import drsblobs, security, misc +from samba.param import LoadParm +from samba import dsdb, functional_level +from samba import werror +import ldb +import samba +import uuid + + +class DsdbAccountTests(TestCase): + + def setUp(self): + super().setUp() + self.lp = samba.tests.env_loadparm() + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + self.samdb = SamDB(session_info=self.session, + credentials=self.creds, + lp=self.lp) + + # Create a test user + user_name = "dsdb-user-" + str(uuid.uuid4().hex[0:6]) + user_pass = samba.generate_random_password(32, 32) + user_description = "Test user for dsdb test" + + base_dn = self.samdb.domain_dn() + + self.account_dn = "CN=" + user_name + ",CN=Users," + base_dn + self.samdb.newuser(username=user_name, + password=user_pass, + description=user_description) + # Cleanup (teardown) + self.addCleanup(delete_force, self.samdb, self.account_dn) + + # Get server reference DN + res = self.samdb.search(base=ldb.Dn(self.samdb, + self.samdb.get_serverName()), + scope=ldb.SCOPE_BASE, + attrs=["serverReference"]) + # Get server reference + self.server_ref_dn = ldb.Dn( + self.samdb, res[0]["serverReference"][0].decode("utf-8")) + + # Get RID Set DN + res = self.samdb.search(base=self.server_ref_dn, + scope=ldb.SCOPE_BASE, + attrs=["rIDSetReferences"]) + rid_set_refs = res[0] + self.assertIn("rIDSetReferences", rid_set_refs) + rid_set_str = rid_set_refs["rIDSetReferences"][0].decode("utf-8") + self.rid_set_dn = ldb.Dn(self.samdb, rid_set_str) + + def get_rid_set(self, rid_set_dn): + res = self.samdb.search(base=rid_set_dn, + scope=ldb.SCOPE_BASE, + attrs=["rIDAllocationPool", + "rIDPreviousAllocationPool", + "rIDUsedPool", + "rIDNextRID"]) + return res[0] + + def test_ridalloc_next_free_rid(self): + # Test RID allocation. We assume that RID + # pools allocated to us are contiguous. + self.samdb.transaction_start() + try: + orig_rid_set = self.get_rid_set(self.rid_set_dn) + self.assertIn("rIDAllocationPool", orig_rid_set) + self.assertIn("rIDPreviousAllocationPool", orig_rid_set) + self.assertIn("rIDUsedPool", orig_rid_set) + self.assertIn("rIDNextRID", orig_rid_set) + + # Get rIDNextRID value from RID set. + next_rid = int(orig_rid_set["rIDNextRID"][0]) + + # Check the result of next_free_rid(). + next_free_rid = self.samdb.next_free_rid() + self.assertEqual(next_rid + 1, next_free_rid) + + # Check calling it twice in succession gives the same result. + next_free_rid2 = self.samdb.next_free_rid() + self.assertEqual(next_free_rid, next_free_rid2) + + # Ensure that the RID set attributes have not changed. + rid_set2 = self.get_rid_set(self.rid_set_dn) + self.assertEqual(orig_rid_set, rid_set2) + finally: + self.samdb.transaction_cancel() + + def test_ridalloc_no_ridnextrid(self): + self.samdb.transaction_start() + try: + # Delete the rIDNextRID attribute of the RID set, + # and set up previous and next pools. + prev_lo = 1000 + prev_hi = 1999 + next_lo = 3000 + next_hi = 3999 + msg = ldb.Message() + msg.dn = self.rid_set_dn + msg["rIDNextRID"] = ldb.MessageElement([], + ldb.FLAG_MOD_DELETE, + "rIDNextRID") + msg["rIDPreviousAllocationPool"] = ( + ldb.MessageElement(str((prev_hi << 32) | prev_lo), + ldb.FLAG_MOD_REPLACE, + "rIDPreviousAllocationPool")) + msg["rIDAllocationPool"] = ( + ldb.MessageElement(str((next_hi << 32) | next_lo), + ldb.FLAG_MOD_REPLACE, + "rIDAllocationPool")) + self.samdb.modify(msg) + + # Ensure that next_free_rid() returns the start of the next pool. + next_free_rid3 = self.samdb.next_free_rid() + self.assertEqual(next_lo, next_free_rid3) + + # Check the result of allocate_rid() matches. + rid = self.samdb.allocate_rid() + self.assertEqual(next_free_rid3, rid) + + # Check that the result of next_free_rid() has now changed. + next_free_rid4 = self.samdb.next_free_rid() + self.assertEqual(rid + 1, next_free_rid4) + + # Check the range of available RIDs. + free_lo, free_hi = self.samdb.free_rid_bounds() + self.assertEqual(rid + 1, free_lo) + self.assertEqual(next_hi, free_hi) + finally: + self.samdb.transaction_cancel() + + def test_ridalloc_no_free_rids(self): + self.samdb.transaction_start() + try: + # Exhaust our current pool of RIDs. + pool_lo = 2000 + pool_hi = 2999 + msg = ldb.Message() + msg.dn = self.rid_set_dn + msg["rIDPreviousAllocationPool"] = ( + ldb.MessageElement(str((pool_hi << 32) | pool_lo), + ldb.FLAG_MOD_REPLACE, + "rIDPreviousAllocationPool")) + msg["rIDAllocationPool"] = ( + ldb.MessageElement(str((pool_hi << 32) | pool_lo), + ldb.FLAG_MOD_REPLACE, + "rIDAllocationPool")) + msg["rIDNextRID"] = ( + ldb.MessageElement(str(pool_hi), + ldb.FLAG_MOD_REPLACE, + "rIDNextRID")) + self.samdb.modify(msg) + + # Ensure that calculating the next free RID fails. + with self.assertRaises(ldb.LdbError) as err: + self.samdb.next_free_rid() + + self.assertEqual("RID pools out of RIDs", err.exception.args[1]) + + # Ensure we can still allocate a new RID. + self.samdb.allocate_rid() + finally: + self.samdb.transaction_cancel() + + def test_ridalloc_new_ridset(self): + self.samdb.transaction_start() + try: + # Test what happens with RID Set values set to zero (similar to + # when a RID Set is first created, except we also set + # rIDAllocationPool to zero). + msg = ldb.Message() + msg.dn = self.rid_set_dn + msg["rIDPreviousAllocationPool"] = ( + ldb.MessageElement("0", + ldb.FLAG_MOD_REPLACE, + "rIDPreviousAllocationPool")) + msg["rIDAllocationPool"] = ( + ldb.MessageElement("0", + ldb.FLAG_MOD_REPLACE, + "rIDAllocationPool")) + msg["rIDNextRID"] = ( + ldb.MessageElement("0", + ldb.FLAG_MOD_REPLACE, + "rIDNextRID")) + self.samdb.modify(msg) + + # Ensure that calculating the next free RID fails. + with self.assertRaises(ldb.LdbError) as err: + self.samdb.next_free_rid() + + self.assertEqual("RID pools out of RIDs", err.exception.args[1]) + + # Set values for the next pool. + pool_lo = 2000 + pool_hi = 2999 + msg = ldb.Message() + msg.dn = self.rid_set_dn + msg["rIDAllocationPool"] = ( + ldb.MessageElement(str((pool_hi << 32) | pool_lo), + ldb.FLAG_MOD_REPLACE, + "rIDAllocationPool")) + self.samdb.modify(msg) + + # Ensure the next free RID value is equal to the next pool's lower + # bound. + next_free_rid5 = self.samdb.next_free_rid() + self.assertEqual(pool_lo, next_free_rid5) + + # Check the range of available RIDs. + free_lo, free_hi = self.samdb.free_rid_bounds() + self.assertEqual(pool_lo, free_lo) + self.assertEqual(pool_hi, free_hi) + finally: + self.samdb.transaction_cancel() + + def test_ridalloc_move_to_new_pool(self): + self.samdb.transaction_start() + try: + # Test moving to a new pool from the previous pool. + pool_lo = 2000 + pool_hi = 2999 + new_pool_lo = 4500 + new_pool_hi = 4599 + msg = ldb.Message() + msg.dn = self.rid_set_dn + msg["rIDPreviousAllocationPool"] = ( + ldb.MessageElement(str((pool_hi << 32) | pool_lo), + ldb.FLAG_MOD_REPLACE, + "rIDPreviousAllocationPool")) + msg["rIDAllocationPool"] = ( + ldb.MessageElement(str((new_pool_hi << 32) | new_pool_lo), + ldb.FLAG_MOD_REPLACE, + "rIDAllocationPool")) + msg["rIDNextRID"] = ( + ldb.MessageElement(str(pool_hi - 1), + ldb.FLAG_MOD_REPLACE, + "rIDNextRID")) + self.samdb.modify(msg) + + # We should have remained in the previous pool. + next_free_rid6 = self.samdb.next_free_rid() + self.assertEqual(pool_hi, next_free_rid6) + + # Check the range of available RIDs. + free_lo, free_hi = self.samdb.free_rid_bounds() + self.assertEqual(pool_hi, free_lo) + self.assertEqual(pool_hi, free_hi) + + # Allocate a new RID. + rid2 = self.samdb.allocate_rid() + self.assertEqual(next_free_rid6, rid2) + + # We should now move to the next pool. + next_free_rid7 = self.samdb.next_free_rid() + self.assertEqual(new_pool_lo, next_free_rid7) + + # Check the new range of available RIDs. + free_lo2, free_hi2 = self.samdb.free_rid_bounds() + self.assertEqual(new_pool_lo, free_lo2) + self.assertEqual(new_pool_hi, free_hi2) + + # Ensure that allocate_rid() matches. + rid3 = self.samdb.allocate_rid() + self.assertEqual(next_free_rid7, rid3) + finally: + self.samdb.transaction_cancel() + + def test_ridalloc_no_ridsetreferences(self): + self.samdb.transaction_start() + try: + # Delete the rIDSetReferences attribute. + msg = ldb.Message() + msg.dn = self.server_ref_dn + msg["rIDSetReferences"] = ( + ldb.MessageElement([], + ldb.FLAG_MOD_DELETE, + "rIDSetReferences")) + self.samdb.modify(msg) + + # Ensure calculating the next free RID fails. + with self.assertRaises(ldb.LdbError) as err: + self.samdb.next_free_rid() + + enum, estr = err.exception.args + self.assertEqual(ldb.ERR_NO_SUCH_ATTRIBUTE, enum) + self.assertIn("No RID Set DN - " + "Cannot find attribute rIDSetReferences of %s " + "to calculate reference dn" % self.server_ref_dn, + estr) + + # Ensure allocating a new RID fails. + with self.assertRaises(ldb.LdbError) as err: + self.samdb.allocate_rid() + + enum, estr = err.exception.args + self.assertEqual(ldb.ERR_ENTRY_ALREADY_EXISTS, enum) + self.assertIn("No RID Set DN - " + "Failed to add RID Set %s - " + "Entry %s already exists" % + (self.rid_set_dn, self.rid_set_dn), + estr) + finally: + self.samdb.transaction_cancel() + + def test_ridalloc_no_rid_set(self): + self.samdb.transaction_start() + try: + # Set the rIDSetReferences attribute to not point to a RID Set. + fake_rid_set_str = self.account_dn + msg = ldb.Message() + msg.dn = self.server_ref_dn + msg["rIDSetReferences"] = ( + ldb.MessageElement(fake_rid_set_str, + ldb.FLAG_MOD_REPLACE, + "rIDSetReferences")) + self.samdb.modify(msg) + + # Ensure calculating the next free RID fails. + with self.assertRaises(ldb.LdbError) as err: + self.samdb.next_free_rid() + + enum, estr = err.exception.args + self.assertEqual(ldb.ERR_OPERATIONS_ERROR, enum) + self.assertIn("Bad RID Set " + fake_rid_set_str, estr) + + # Ensure allocating a new RID fails. + with self.assertRaises(ldb.LdbError) as err: + self.samdb.allocate_rid() + + enum, estr = err.exception.args + self.assertEqual(ldb.ERR_OPERATIONS_ERROR, enum) + self.assertIn("Bad RID Set " + fake_rid_set_str, estr) + finally: + self.samdb.transaction_cancel() + + def test_error_replpropertymetadata(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_error_replpropertymetadata_nochange(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_error_replpropertymetadata_allow_sort(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"]) + + def test_twoatt_replpropertymetadata(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData", "uSNChanged"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + o.local_usn = int(str(res[0]["uSNChanged"])) + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_set_replpropertymetadata(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData", "uSNChanged"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + o.local_usn = int(str(res[0]["uSNChanged"])) + 1 + o.originating_usn = int(str(res[0]["uSNChanged"])) + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_get_attribute_replmetadata_version(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["dn"]) + self.assertEqual(len(res), 1) + dn = str(res[0].dn) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2) + + def test_set_attribute_replmetadata_version(self): + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["dn"]) + self.assertEqual(len(res), 1) + dn = str(res[0].dn) + version = self.samdb.get_attribute_replmetadata_version(dn, "description") + self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2) + + def test_no_error_on_invalid_control(self): + try: + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"], + controls=["local_oid:%s:0" + % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED]) + except ldb.LdbError as e: + self.fail("Should have not raised an exception") + + def test_error_on_invalid_critical_control(self): + try: + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"], + controls=["local_oid:%s:1" + % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED]) + except ldb.LdbError as e: + (errno, estr) = e.args + if errno != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION: + self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION" + % e[1]) + +class DsdbTests(TestCase): + def setUp(self): + super().setUp() + self.lp = samba.tests.env_loadparm() + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + self.samdb = SamDB(session_info=self.session, + credentials=self.creds, + lp=self.lp) + + # Allocate a unique RID for use in the objectSID tests. + # + def allocate_rid(self): + self.samdb.transaction_start() + try: + rid = self.samdb.allocate_rid() + except: + self.samdb.transaction_cancel() + raise + self.samdb.transaction_commit() + return str(rid) + + def test_get_oid_from_attrid(self): + oid = self.samdb.get_oid_from_attid(591614) + self.assertEqual(oid, "1.2.840.113556.1.4.1790") + + def test_ok_get_attribute_from_attid(self): + self.assertEqual(self.samdb.get_attribute_from_attid(13), "description") + + def test_ko_get_attribute_from_attid(self): + self.assertEqual(self.samdb.get_attribute_from_attid(11979), None) + + # Ensure that duplicate objectSID's are permitted for foreign security + # principals. + # + def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self): + + # + # We need to build a foreign security principal SID + # i.e a SID not in the current domain. + # + dom_sid = self.samdb.get_domain_sid() + if str(dom_sid).endswith("0"): + c = "9" + else: + c = "0" + sid_str = str(dom_sid)[:-1] + c + "-1000" + sid = ndr_pack(security.dom_sid(sid_str)) + basedn = self.samdb.get_default_basedn() + dn = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid_str, basedn) + + # + # First without control + # + + try: + self.samdb.add({ + "dn": dn, + "objectClass": "foreignSecurityPrincipal"}) + self.fail("No exception should get ERR_OBJECT_CLASS_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_OBJECT_CLASS_VIOLATION, str(e)) + werr = "%08X" % werror.WERR_DS_MISSING_REQUIRED_ATT + self.assertTrue(werr in msg, msg) + + try: + self.samdb.add({ + "dn": dn, + "objectClass": "foreignSecurityPrincipal", + "objectSid": sid}) + self.fail("No exception should get ERR_UNWILLING_TO_PERFORM") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e)) + werr = "%08X" % werror.WERR_DS_ILLEGAL_MOD_OPERATION + self.assertTrue(werr in msg, msg) + + # + # We need to use the provision control + # in order to add foreignSecurityPrincipal + # objects + # + + controls = ["provision:0"] + self.samdb.add({ + "dn": dn, + "objectClass": "foreignSecurityPrincipal"}, + controls=controls) + + self.samdb.delete(dn) + + try: + self.samdb.add({ + "dn": dn, + "objectClass": "foreignSecurityPrincipal"}, + controls=controls) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + + # cleanup + self.samdb.delete(dn) + + def _test_foreignSecurityPrincipal(self, obj_class, fpo_attr): + + dom_sid = self.samdb.get_domain_sid() + lsid_str = str(dom_sid) + "-4294967294" + bsid_str = "S-1-5-32-4294967294" + fsid_str = "S-1-5-4294967294" + basedn = self.samdb.get_default_basedn() + cn = "dsdb_test_fpo" + dn_str = "cn=%s,cn=Users,%s" % (cn, basedn) + dn = ldb.Dn(self.samdb, dn_str) + + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % lsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % bsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % fsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + + self.addCleanup(delete_force, self.samdb, dn_str) + + self.samdb.add({ + "dn": dn_str, + "objectClass": obj_class}) + + msg = ldb.Message() + msg.dn = dn + msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_UNWILLING_TO_PERFORM") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e)) + werr = "%08X" % werror.WERR_DS_INVALID_GROUP_TYPE + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = dn + msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_NO_SUCH_OBJECT") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_NO_SUCH_OBJECT, str(e)) + werr = "%08X" % werror.WERR_NO_SUCH_MEMBER + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = dn + msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + except ldb.LdbError as e: + self.fail("Should have not raised an exception") + + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % fsid_str, + attrs=[]) + self.assertEqual(len(res), 1) + self.samdb.delete(res[0].dn) + self.samdb.delete(dn) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % fsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + + def test_foreignSecurityPrincipal_member(self): + return self._test_foreignSecurityPrincipal( + "group", "member") + + def test_foreignSecurityPrincipal_MembersForAzRole(self): + return self._test_foreignSecurityPrincipal( + "msDS-AzRole", "msDS-MembersForAzRole") + + def test_foreignSecurityPrincipal_NeverRevealGroup(self): + return self._test_foreignSecurityPrincipal( + "computer", "msDS-NeverRevealGroup") + + def test_foreignSecurityPrincipal_RevealOnDemandGroup(self): + return self._test_foreignSecurityPrincipal( + "computer", "msDS-RevealOnDemandGroup") + + def _test_fail_foreignSecurityPrincipal(self, obj_class, fpo_attr, + msg_exp, lerr_exp, werr_exp, + allow_reference=True): + + dom_sid = self.samdb.get_domain_sid() + lsid_str = str(dom_sid) + "-4294967294" + bsid_str = "S-1-5-32-4294967294" + fsid_str = "S-1-5-4294967294" + basedn = self.samdb.get_default_basedn() + cn1 = "dsdb_test_fpo1" + dn1_str = "cn=%s,cn=Users,%s" % (cn1, basedn) + dn1 = ldb.Dn(self.samdb, dn1_str) + cn2 = "dsdb_test_fpo2" + dn2_str = "cn=%s,cn=Users,%s" % (cn2, basedn) + dn2 = ldb.Dn(self.samdb, dn2_str) + + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % lsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % bsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=basedn, + expression="(objectSid=%s)" % fsid_str, + attrs=[]) + self.assertEqual(len(res), 0) + + self.addCleanup(delete_force, self.samdb, dn1_str) + self.addCleanup(delete_force, self.samdb, dn2_str) + + self.samdb.add({ + "dn": dn1_str, + "objectClass": obj_class}) + + self.samdb.add({ + "dn": dn2_str, + "objectClass": obj_class}) + + msg = ldb.Message() + msg.dn = dn1 + msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + self.fail("No exception should get %s" % msg_exp) + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, lerr_exp, str(e)) + werr = "%08X" % werr_exp + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = dn1 + msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + self.fail("No exception should get %s" % msg_exp) + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, lerr_exp, str(e)) + werr = "%08X" % werr_exp + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = dn1 + msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + self.fail("No exception should get %s" % msg) + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, lerr_exp, str(e)) + werr = "%08X" % werr_exp + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = dn1 + msg[fpo_attr] = ldb.MessageElement("%s" % dn2, + ldb.FLAG_MOD_ADD, + fpo_attr) + try: + self.samdb.modify(msg) + if not allow_reference: + self.fail("No exception should get %s" % msg_exp) + except ldb.LdbError as e: + if allow_reference: + self.fail("Should have not raised an exception: %s" % e) + (code, msg) = e.args + self.assertEqual(code, lerr_exp, str(e)) + werr = "%08X" % werr_exp + self.assertTrue(werr in msg, msg) + + self.samdb.delete(dn2) + self.samdb.delete(dn1) + + def test_foreignSecurityPrincipal_NonMembers(self): + return self._test_fail_foreignSecurityPrincipal( + "group", "msDS-NonMembers", + "LDB_ERR_UNWILLING_TO_PERFORM/WERR_NOT_SUPPORTED", + ldb.ERR_UNWILLING_TO_PERFORM, werror.WERR_NOT_SUPPORTED, + allow_reference=False) + + def test_foreignSecurityPrincipal_HostServiceAccount(self): + return self._test_fail_foreignSecurityPrincipal( + "computer", "msDS-HostServiceAccount", + "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID", + ldb.ERR_CONSTRAINT_VIOLATION, + werror.WERR_DS_NAME_REFERENCE_INVALID) + + def test_foreignSecurityPrincipal_manager(self): + return self._test_fail_foreignSecurityPrincipal( + "user", "manager", + "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID", + ldb.ERR_CONSTRAINT_VIOLATION, + werror.WERR_DS_NAME_REFERENCE_INVALID) + + # + # Duplicate objectSID's should not be permitted for sids in the local + # domain. The test sequence is add an object, delete it, then attempt to + # re-add it, this should fail with a constraint violation + # + def test_duplicate_objectSIDs_not_allowed_on_local_objects(self): + + dom_sid = self.samdb.get_domain_sid() + rid = self.allocate_rid() + sid_str = str(dom_sid) + "-" + rid + sid = ndr_pack(security.dom_sid(sid_str)) + basedn = self.samdb.get_default_basedn() + cn = "dsdb_test_01" + dn = "cn=%s,cn=Users,%s" % (cn, basedn) + + self.samdb.add({ + "dn": dn, + "objectClass": "user", + "objectSID": sid}) + self.samdb.delete(dn) + + try: + self.samdb.add({ + "dn": dn, + "objectClass": "user", + "objectSID": sid}) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + if code != ldb.ERR_CONSTRAINT_VIOLATION: + self.fail("Got %d - %s should have got " + "LDB_ERR_CONSTRAINT_VIOLATION" + % (code, msg)) + + def test_linked_vs_non_linked_reference(self): + basedn = self.samdb.get_default_basedn() + kept_dn_str = "cn=reference_kept,cn=Users,%s" % (basedn) + removed_dn_str = "cn=reference_removed,cn=Users,%s" % (basedn) + dom_sid = self.samdb.get_domain_sid() + none_sid_str = str(dom_sid) + "-4294967294" + none_guid_str = "afafafaf-fafa-afaf-fafa-afafafafafaf" + + self.addCleanup(delete_force, self.samdb, kept_dn_str) + self.addCleanup(delete_force, self.samdb, removed_dn_str) + + self.samdb.add({ + "dn": kept_dn_str, + "objectClass": "user"}) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=kept_dn_str, + attrs=["objectGUID", "objectSID"]) + self.assertEqual(len(res), 1) + kept_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0]) + kept_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) + kept_dn = res[0].dn + + self.samdb.add({ + "dn": removed_dn_str, + "objectClass": "user"}) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=removed_dn_str, + attrs=["objectGUID", "objectSID"]) + self.assertEqual(len(res), 1) + removed_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0]) + removed_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) + self.samdb.delete(removed_dn_str) + + # + # First try the linked attribute 'manager' + # by GUID and SID + # + + msg = ldb.Message() + msg.dn = kept_dn + msg["manager"] = ldb.MessageElement("<SID=%s>" % removed_sid, + ldb.FLAG_MOD_ADD, + "manager") + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e)) + werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = kept_dn + msg["manager"] = ldb.MessageElement("<GUID=%s>" % removed_guid, + ldb.FLAG_MOD_ADD, + "manager") + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e)) + werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID + self.assertTrue(werr in msg, msg) + + # + # Try the non-linked attribute 'assistant' + # by GUID and SID, which should work. + # + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid, + ldb.FLAG_MOD_ADD, + "assistant") + self.samdb.modify(msg) + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid, + ldb.FLAG_MOD_DELETE, + "assistant") + self.samdb.modify(msg) + + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid, + ldb.FLAG_MOD_ADD, + "assistant") + self.samdb.modify(msg) + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid, + ldb.FLAG_MOD_DELETE, + "assistant") + self.samdb.modify(msg) + + # + # Finally ry the non-linked attribute 'assistant' + # but with non existing GUID, SID, DN + # + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("CN=NoneNone,%s" % (basedn), + ldb.FLAG_MOD_ADD, + "assistant") + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e)) + werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("<SID=%s>" % none_sid_str, + ldb.FLAG_MOD_ADD, + "assistant") + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e)) + werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID + self.assertTrue(werr in msg, msg) + + msg = ldb.Message() + msg.dn = kept_dn + msg["assistant"] = ldb.MessageElement("<GUID=%s>" % none_guid_str, + ldb.FLAG_MOD_ADD, + "assistant") + try: + self.samdb.modify(msg) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e.args + self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e)) + werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID + self.assertTrue(werr in msg, msg) + + self.samdb.delete(kept_dn) + + def test_normalize_dn_in_domain_full(self): + domain_dn = self.samdb.domain_dn() + + part_dn = ldb.Dn(self.samdb, "CN=Users") + + full_dn = part_dn + full_dn.add_base(domain_dn) + + full_str = str(full_dn) + + # That is, no change + self.assertEqual(full_dn, + self.samdb.normalize_dn_in_domain(full_str)) + + def test_normalize_dn_in_domain_part(self): + domain_dn = self.samdb.domain_dn() + + part_str = "CN=Users" + + full_dn = ldb.Dn(self.samdb, part_str) + full_dn.add_base(domain_dn) + + # That is, the domain DN appended + self.assertEqual(full_dn, + self.samdb.normalize_dn_in_domain(part_str)) + + def test_normalize_dn_in_domain_full_dn(self): + domain_dn = self.samdb.domain_dn() + + part_dn = ldb.Dn(self.samdb, "CN=Users") + + full_dn = part_dn + full_dn.add_base(domain_dn) + + # That is, no change + self.assertEqual(full_dn, + self.samdb.normalize_dn_in_domain(full_dn)) + + def test_normalize_dn_in_domain_part_dn(self): + domain_dn = self.samdb.domain_dn() + + part_dn = ldb.Dn(self.samdb, "CN=Users") + + # That is, the domain DN appended + self.assertEqual(ldb.Dn(self.samdb, + str(part_dn) + "," + str(domain_dn)), + self.samdb.normalize_dn_in_domain(part_dn)) + +class DsdbNCRootTests(TestCase): + + def setUp(self): + super().setUp() + self.lp = samba.tests.env_loadparm() + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + self.samdb = SamDB(session_info=self.session, + credentials=self.creds, + lp=self.lp) + self.remote = False + + # These all use the local mode of operation inside + # dsdb_find_nc_root() using the partitions control + def test_dsdb_dn_nc_root_sid(self): + dom_sid = self.samdb.get_domain_sid() + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"<SID={dom_sid}>") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(domain_dn, nc_root) + + def test_dsdb_dn_nc_root_admin_sid(self): + dom_sid = self.samdb.get_domain_sid() + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"<SID={dom_sid}-500>") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(domain_dn, nc_root) + + def test_dsdb_dn_nc_root_users_container(self): + dom_sid = self.samdb.get_domain_sid() + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"CN=Users,{domain_dn}") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(domain_dn, nc_root) + + def test_dsdb_dn_nc_root_new_dn(self): + dom_sid = self.samdb.get_domain_sid() + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"CN=Xnotexisting,CN=Users,{domain_dn}") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(domain_dn, nc_root) + + def test_dsdb_dn_nc_root_new_dn_with_guid(self): + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"<GUID=828e3baf-fa02-4d82-ba5d-6f647dab5fd8>;CN=Xnotexisting,CN=Users,{domain_dn}") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(domain_dn, nc_root) + + def test_dsdb_dn_nc_root_guid(self): + ntds_guid = self.samdb.get_ntds_GUID() + configuration_dn = self.samdb.get_config_basedn() + dn = ldb.Dn(self.samdb, f"<GUID={ntds_guid}>") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(configuration_dn, nc_root) + + def test_dsdb_dn_nc_root_misleading_to_noexisting_guid(self): + ntds_guid = self.samdb.get_ntds_GUID() + configuration_dn = self.samdb.get_config_basedn() + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"<GUID={ntds_guid}>;CN=Xnotexisting,CN=Users,{domain_dn}") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(configuration_dn, nc_root) + + def test_dsdb_dn_nc_root_misleading_to_existing_guid(self): + ntds_guid = self.samdb.get_ntds_GUID() + configuration_dn = self.samdb.get_config_basedn() + domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn()) + dn = ldb.Dn(self.samdb, f"<GUID={ntds_guid}>;{domain_dn}") + try: + nc_root = self.samdb.get_nc_root(dn) + except ldb.LdbError as e: + (code, msg) = e.args + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + self.assertEqual(configuration_dn, nc_root) + +class DsdbRemoteNCRootTests(DsdbNCRootTests): + def setUp(self): + super().setUp() + # Reconnect to the remote LDAP port + self.samdb = SamDB(url="ldap://%s" % samba.tests.env_get_var_value('SERVER'), + session_info=self.session, + credentials=self.get_credentials(), + lp=self.lp) + self.remote = True + + +class DsdbFullScanTests(TestCase): + + def setUp(self): + super().setUp() + self.lp = samba.tests.env_loadparm() + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + + def test_sam_ldb_open_no_full_scan(self): + try: + self.samdb = SamDB(session_info=self.session, + credentials=self.creds, + lp=self.lp, + options=["disable_full_db_scan_for_self_test:1"]) + except ldb.LdbError as err: + estr = err.args[1] + self.fail("sam.ldb required a full scan to start up") + +class DsdbStartUpTests(TestCase): + def setUp(self): + super().setUp() + lp = samba.tests.env_loadparm() + path = lp.configfile + + # This is to avoid a tattoo of the global state + self.lp = LoadParm(filename_for_non_global_lp=path) + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + self.samdb = SamDB(session_info=self.session, + credentials=self.creds, + lp=self.lp) + + def test_correct_fl(self): + res = self.samdb.search(base="", + scope=ldb.SCOPE_BASE, + attrs=["domainFunctionality"]) + # This confirms the domain is in FL 2016 by default, this is + # important to verify the original state + self.assertEqual(int(res[0]["domainFunctionality"][0]), + dsdb.DS_DOMAIN_FUNCTION_2016) + self.assertEqual(functional_level.dc_level_from_lp(self.lp), + dsdb.DS_DOMAIN_FUNCTION_2016) + dsdb.check_and_update_fl(self.samdb, self.lp) + + def test_lower_smb_conf_fl(self): + old_lp_fl = self.lp.get("ad dc functional level") + self.lp.set("ad dc functional level", + "2008_R2") + self.addCleanup(self.lp.set, "ad dc functional level", old_lp_fl) + try: + dsdb.check_and_update_fl(self.samdb, self.lp) + self.fail("Should have failed to start DC with 2008 R2 FL in 2016 domain") + except ldb.LdbError as err: + (errno, estr) = err.args + self.assertEqual(errno, ldb.ERR_CONSTRAINT_VIOLATION) |