diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
commit | 4f5791ebd03eaec1c7da0865a383175b05102712 (patch) | |
tree | 8ce7b00f7a76baa386372422adebbe64510812d4 /source4/torture/drs/python/ridalloc_exop.py | |
parent | Initial commit. (diff) | |
download | samba-upstream.tar.xz samba-upstream.zip |
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'source4/torture/drs/python/ridalloc_exop.py')
-rw-r--r-- | source4/torture/drs/python/ridalloc_exop.py | 813 |
1 files changed, 813 insertions, 0 deletions
diff --git a/source4/torture/drs/python/ridalloc_exop.py b/source4/torture/drs/python/ridalloc_exop.py new file mode 100644 index 0000000..0d46eee --- /dev/null +++ b/source4/torture/drs/python/ridalloc_exop.py @@ -0,0 +1,813 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Tests various RID allocation scenarios +# +# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011 +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016 +# Copyright (C) Catalyst IT Ltd. 2016 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# +# Usage: +# export DC1=dc1_dns_name +# export DC2=dc2_dns_name +# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# + +import drs_base +import samba.tests + +import ldb +from ldb import SCOPE_BASE + +from samba.dcerpc import drsuapi, misc +from samba.drs_utils import drs_DsBind +from samba.samdb import SamDB + +import shutil +import tempfile +import os +from samba.auth import system_session, admin_session +from samba.dbchecker import dbcheck +from samba.ndr import ndr_pack +from samba.dcerpc import security +from samba import drs_utils, dsdb + + +class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): + """Intended as a semi-black box test case for DsGetNCChanges + implementation for extended operations. It should be testing + how DsGetNCChanges handles different input params (mostly invalid). + Final goal is to make DsGetNCChanges as binary compatible to + Windows implementation as possible""" + + def setUp(self): + super(DrsReplicaSyncTestCase, self).setUp() + + def tearDown(self): + super(DrsReplicaSyncTestCase, self).tearDown() + + def _determine_fSMORoleOwner(self, fsmo_obj_dn): + """Returns (owner, not_owner) pair where: + owner: dns name for FSMO owner + not_owner: dns name for DC not owning the FSMO""" + # collect info to return later + fsmo_info_1 = {"dns_name": self.dnsname_dc1, + "invocation_id": self.ldb_dc1.get_invocation_id(), + "ntds_guid": self.ldb_dc1.get_ntds_GUID(), + "server_dn": self.ldb_dc1.get_serverName()} + fsmo_info_2 = {"dns_name": self.dnsname_dc2, + "invocation_id": self.ldb_dc2.get_invocation_id(), + "ntds_guid": self.ldb_dc2.get_ntds_GUID(), + "server_dn": self.ldb_dc2.get_serverName()} + + msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"]) + fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0].decode('utf8')) + fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"] + + msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"]) + fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0].decode('utf8')) + fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"] + + # determine the owner dc + res = self.ldb_dc1.search(fsmo_obj_dn, + scope=SCOPE_BASE, attrs=["fSMORoleOwner"]) + assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!" % fsmo_obj_dn + fsmo_owner = res[0]["fSMORoleOwner"][0] + if fsmo_owner == self.info_dc1["dsServiceName"][0]: + return (fsmo_info_1, fsmo_info_2) + return (fsmo_info_2, fsmo_info_1) + + def _check_exop_failed(self, ctr6, expected_failure): + self.assertEqual(ctr6.extended_ret, expected_failure) + #self.assertEqual(ctr6.object_count, 0) + #self.assertEqual(ctr6.first_object, None) + self.assertEqual(ctr6.more_data, False) + self.assertEqual(ctr6.nc_object_count, 0) + self.assertEqual(ctr6.nc_linked_attributes_count, 0) + self.assertEqual(ctr6.linked_attributes_count, 0) + self.assertEqual(ctr6.linked_attributes, []) + self.assertEqual(ctr6.drs_error[0], 0) + + def test_InvalidDestDSA_ridalloc(self): + """Test RID allocation with invalid destination DSA guid""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + + def test_do_ridalloc(self): + """Test doing a RID allocation with a valid destination DSA guid""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"], + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + ctr6 = ctr + self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS) + self.assertEqual(ctr6.object_count, 3) + self.assertNotEqual(ctr6.first_object, None) + self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn) + self.assertNotEqual(ctr6.first_object.next_object, None) + self.assertNotEqual(ctr6.first_object.next_object.next_object, None) + second_object = ctr6.first_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"]) + third_object = ctr6.first_object.next_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"]) + + self.assertEqual(ctr6.more_data, False) + self.assertEqual(ctr6.nc_object_count, 0) + self.assertEqual(ctr6.nc_linked_attributes_count, 0) + self.assertEqual(ctr6.drs_error[0], 0) + # We don't check the linked_attributes_count as if the domain + # has an RODC, it can gain links on the server account object + + def test_do_ridalloc_get_anc(self): + """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"], + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC, + replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + ctr6 = ctr + self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS) + self.assertEqual(ctr6.object_count, 3) + self.assertNotEqual(ctr6.first_object, None) + self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn) + self.assertNotEqual(ctr6.first_object.next_object, None) + self.assertNotEqual(ctr6.first_object.next_object.next_object, None) + second_object = ctr6.first_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"]) + third_object = ctr6.first_object.next_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"]) + self.assertEqual(ctr6.more_data, False) + self.assertEqual(ctr6.nc_object_count, 0) + self.assertEqual(ctr6.nc_linked_attributes_count, 0) + self.assertEqual(ctr6.drs_error[0], 0) + # We don't check the linked_attributes_count as if the domain + # has an RODC, it can gain links on the server account object + + def test_edit_rid_master(self): + """Test doing a RID allocation after changing the RID master from the original one. + This should set rIDNextRID to 0 on the new RID master.""" + # 1. a. Transfer role to non-RID master + # b. Check that it succeeds correctly + # + # 2. a. Call the RID alloc against the former master. + # b. Check that it succeeds. + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + # 1. Swap RID master role + m = ldb.Message() + m.dn = ldb.Dn(self.ldb_dc1, "") + m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, + "becomeRidMaster") + + # Make sure that ldb_dc1 == RID Master + + server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent()) + + # self.ldb_dc1 == LOCALDC + if server_dn == fsmo_owner['server_dn']: + # ldb_dc1 == VAMPIREDC + ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1 + else: + # Otherwise switch the two + ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2 + + try: + # ldb_dc1 is now RID MASTER (as VAMPIREDC) + ldb_dc1.modify(m) + except ldb.LdbError as e1: + (num, msg) = e1.args + self.fail("Failed to reassign RID Master " + msg) + + try: + # 2. Perform a RID alloc + req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"], + invocation_id=fsmo_not_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"]) + # 3. Make sure the allocation succeeds + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except RuntimeError as e: + self.fail("RID allocation failed: " + str(e)) + + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + + self.assertEqual(level, 6, "Expected level 6 response!") + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"])) + ctr6 = ctr + self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS) + self.assertEqual(ctr6.object_count, 3) + self.assertNotEqual(ctr6.first_object, None) + self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn) + self.assertNotEqual(ctr6.first_object.next_object, None) + self.assertNotEqual(ctr6.first_object.next_object.next_object, None) + second_object = ctr6.first_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"]) + third_object = ctr6.first_object.next_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"]) + finally: + # Swap the RID master back for other tests + m = ldb.Message() + m.dn = ldb.Dn(ldb_dc2, "") + m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster") + try: + ldb_dc2.modify(m) + except ldb.LdbError as e: + (num, msg) = e.args + self.fail("Failed to restore RID Master " + msg) + + def test_offline_samba_tool_seized_ridalloc(self): + """Perform a join against the non-RID manager and then seize the RID Manager role""" + + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + smbconf = os.path.join(targetdir, "etc/smb.conf") + + lp = self.get_loadparm() + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=system_session(lp), lp=lp) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # Assert that no RID Set has been set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertFalse("rIDSetReferences" in res[0]) + + (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "--configfile=%s" % (smbconf), "--force") + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + finally: + shutil.rmtree(targetdir, ignore_errors=True) + self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1") + + def _test_join(self, server, netbios_name): + tmpdir = os.path.join(self.tempdir, "targetdir") + creds = self.get_credentials() + (result, out, err) = self.runsubcmd("domain", "join", + creds.get_realm(), + "dc", "-U%s%%%s" % (creds.get_username(), + creds.get_password()), + '--targetdir=%s' % tmpdir, + '--server=%s' % server, + "--option=netbios name = %s" % netbios_name) + self.assertCmdSuccess(result, out, err) + return tmpdir + + def _test_force_demote(self, server, netbios_name): + creds = self.get_credentials() + (result, out, err) = self.runsubcmd("domain", "demote", + "-U%s%%%s" % (creds.get_username(), + creds.get_password()), + '--server=%s' % server, + "--remove-other-dead-server=%s" % netbios_name) + self.assertCmdSuccess(result, out, err) + + def test_offline_manual_seized_ridalloc_with_dbcheck(self): + """Peform the same actions as test_offline_samba_tool_seized_ridalloc, + but do not create the RID set. Confirm that dbcheck correctly creates + the RID Set. + + Also check + """ + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=system_session(lp), lp=lp) + + serviceName = new_ldb.get_dsServiceName() + m = ldb.Message() + m.dn = fsmo_dn + m["fSMORoleOwner"] = ldb.MessageElement(serviceName, + ldb.FLAG_MOD_REPLACE, + "fSMORoleOwner") + new_ldb.modify(m) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # Assert that no RID Set has been set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertFalse("rIDSetReferences" in res[0]) + + smbconf = os.path.join(targetdir, "etc/smb.conf") + + chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True) + + self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)") + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + finally: + self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_offline_manual_seized_ridalloc_add_user(self): + """Peform the same actions as test_offline_samba_tool_seized_ridalloc, + but do not create the RID set. Confirm that user-add correctly creates + the RID Set.""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=system_session(lp), lp=lp) + + serviceName = new_ldb.get_dsServiceName() + m = ldb.Message() + m.dn = fsmo_dn + m["fSMORoleOwner"] = ldb.MessageElement(serviceName, + ldb.FLAG_MOD_REPLACE, + "fSMORoleOwner") + new_ldb.modify(m) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # Assert that no RID Set has been set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertFalse("rIDSetReferences" in res[0]) + + smbconf = os.path.join(targetdir, "etc/smb.conf") + + new_ldb.newuser("ridalloctestuser", "P@ssword!") + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + + finally: + self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_offline_manual_seized_ridalloc_add_user_as_admin(self): + """Peform the same actions as test_offline_samba_tool_seized_ridalloc, + but do not create the RID set. Confirm that user-add correctly creates + the RID Set.""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp) + + serviceName = new_ldb.get_dsServiceName() + m = ldb.Message() + m.dn = fsmo_dn + m["fSMORoleOwner"] = ldb.MessageElement(serviceName, + ldb.FLAG_MOD_REPLACE, + "fSMORoleOwner") + new_ldb.modify(m) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # Assert that no RID Set has been set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertFalse("rIDSetReferences" in res[0]) + + smbconf = os.path.join(targetdir, "etc/smb.conf") + + # Create a user to allocate a RID Set for itself (the RID master) + new_ldb.newuser("ridalloctestuser", "P@ssword!") + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + + finally: + self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_join_time_ridalloc(self): + """Perform a join against the RID manager and assert we have a RID Set""" + + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + smbconf = os.path.join(targetdir, "etc/smb.conf") + + lp = self.get_loadparm() + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=system_session(lp), lp=lp) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + finally: + self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_rid_set_dbcheck(self): + """Perform a join against the RID manager and assert we have a RID Set. + Using dbcheck, we assert that we can detect out of range users.""" + + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + smbconf = os.path.join(targetdir, "etc/smb.conf") + + lp = self.get_loadparm() + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=system_session(lp), lp=lp) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8')) + + # 4. Add a new user (triggers RID set work) + new_ldb.newuser("ridalloctestuser", "P@ssword!") + + # 5. Now fetch the RID SET + rid_set_res = new_ldb.search(base=rid_set_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDNextRid', + 'rIDAllocationPool']) + next_pool = int(rid_set_res[0]["rIDAllocationPool"][0]) + last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32 + + # 6. Add user above the ridNextRid and at mid-range. + # + # We can do this with safety because this is an offline DB that will be + # destroyed. + m = ldb.Message() + m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users") + m.dn.add_base(new_ldb.get_default_basedn()) + m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass') + m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))), + ldb.FLAG_MOD_ADD, + 'objectSid') + new_ldb.add(m, controls=["relax:0"]) + + # 7. Check the RID Set + chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True) + + # Should have one error (wrong rIDNextRID) + self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1) + + # 8. Assert we get didn't show any other errors + chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True) + + rid_set_res = new_ldb.search(base=rid_set_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDNextRid', + 'rIDAllocationPool']) + last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0]) + self.assertEqual(last_allocated_rid, last_rid - 10) + + # 9. Assert that the range wasn't thrown away + + next_pool = int(rid_set_res[0]["rIDAllocationPool"][0]) + self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed") + finally: + self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_rid_set_dbcheck_after_seize(self): + """Perform a join against the RID manager and assert we have a RID Set. + We seize the RID master role, then using dbcheck, we assert that we can + detect out of range users (and then bump the RID set as required).""" + + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + smbconf = os.path.join(targetdir, "etc/smb.conf") + + lp = self.get_loadparm() + new_ldb = SamDB(ldb_url, credentials=self.get_credentials(), + session_info=system_session(lp), lp=lp) + + # 1. Get server name + res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()), + scope=ldb.SCOPE_BASE, attrs=["serverReference"]) + # 2. Get server reference + server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8')) + + # 3. Assert we get the RID Set + res = new_ldb.search(base=server_ref_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences']) + + self.assertTrue("rIDSetReferences" in res[0]) + rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8')) + # 4. Seize the RID Manager role + (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "--configfile=%s" % (smbconf), "--force") + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + + # 5. Add a new user (triggers RID set work) + new_ldb.newuser("ridalloctestuser", "P@ssword!") + + # 6. Now fetch the RID SET + rid_set_res = new_ldb.search(base=rid_set_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDNextRid', + 'rIDAllocationPool']) + next_pool = int(rid_set_res[0]["rIDAllocationPool"][0]) + last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32 + + # 7. Add user above the ridNextRid and at almost the end of the range. + # + m = ldb.Message() + m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users") + m.dn.add_base(new_ldb.get_default_basedn()) + m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass') + m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))), + ldb.FLAG_MOD_ADD, + 'objectSid') + new_ldb.add(m, controls=["relax:0"]) + + # 8. Add user above the ridNextRid and at the end of the range + m = ldb.Message() + m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users") + m.dn.add_base(new_ldb.get_default_basedn()) + m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass') + m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)), + ldb.FLAG_MOD_ADD, + 'objectSid') + new_ldb.add(m, controls=["relax:0"]) + + chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True) + + # Should have fixed two errors (wrong ridNextRid) + self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2) + + # 9. Assert we get didn't show any other errors + chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True) + + # 10. Add another user (checks RID rollover) + # We have seized the role, so we can do that. + new_ldb.newuser("ridalloctestuser3", "P@ssword!") + + rid_set_res = new_ldb.search(base=rid_set_dn, + scope=ldb.SCOPE_BASE, attrs=['rIDNextRid', + 'rIDAllocationPool']) + next_pool = int(rid_set_res[0]["rIDAllocationPool"][0]) + self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed") + finally: + self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_replicate_against_deleted_objects_transaction(self): + """Not related to RID allocation, but uses the infrastructure here. + Do a join, create a link between two objects remotely, but + remove the target locally. Show that we need to set a magic + opaque if there is an outer transaction. + + """ + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + test_user4 = "ridalloctestuser4" + test_group = "ridalloctestgroup1" + + self.ldb_dc1.newuser(test_user4, "P@ssword!") + + self.addCleanup(self.ldb_dc1.deleteuser, test_user4) + + self.ldb_dc1.newgroup(test_group) + self.addCleanup(self.ldb_dc1.deletegroup, test_group) + + targetdir = self._test_join(self.dnsname_dc1, "RIDALLOCTEST8") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, + session_info=system_session(lp), lp=lp) + + destination_dsa_guid = misc.GUID(new_ldb.get_ntds_GUID()) + + repl = drs_utils.drs_Replicate(f'ncacn_ip_tcp:{self.dnsname_dc1}[seal]', + lp, + self.get_credentials(), + new_ldb, + destination_dsa_guid) + + source_dsa_invocation_id = misc.GUID(self.ldb_dc1.invocation_id) + + # Add the link on the remote DC + self.ldb_dc1.add_remove_group_members(test_group, [test_user4]) + + # Starting a transaction overrides, currently the logic + # inside repl.replicatate to retry with GET_TGT which in + # turn tells the repl_meta_data module that the most up to + # date info is already available + new_ldb.transaction_start() + repl.replicate(self.ldb_dc1.domain_dn(), + source_dsa_invocation_id, + destination_dsa_guid) + + # Delete the user locally, before applying the links. + # This simulates getting the delete in the replciation + # stream. + new_ldb.deleteuser(test_user4) + + # This fails as the user has been deleted locally but a remote link is sent + self.assertRaises(ldb.LdbError, new_ldb.transaction_commit) + + new_ldb.transaction_start() + repl.replicate(self.ldb_dc1.domain_dn(), + source_dsa_invocation_id, + destination_dsa_guid) + + # Delete the user locally (the previous transaction + # doesn't apply), before applying the links. This + # simulates getting the delete in the replciation stream. + new_ldb.deleteuser(test_user4) + + new_ldb.set_opaque_integer(dsdb.DSDB_FULL_JOIN_REPLICATION_COMPLETED_OPAQUE_NAME, + 1) + + # This should now work + try: + new_ldb.transaction_commit() + except ldb.LdbError as e: + self.fail(f"Failed to replicate despite setting opaque with {e.args[1]}") + + finally: + self._test_force_demote(self.dnsname_dc1, "RIDALLOCTEST8") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_replicate_against_deleted_objects_normal(self): + """Not related to RID allocation, but uses the infrastructure here. + Do a join, create a link between two objects remotely, but + remove the target locally. . + + """ + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + test_user5 = "ridalloctestuser5" + test_group2 = "ridalloctestgroup2" + + self.ldb_dc1.newuser(test_user5, "P@ssword!") + self.addCleanup(self.ldb_dc1.deleteuser, test_user5) + + self.ldb_dc1.newgroup(test_group2) + self.addCleanup(self.ldb_dc1.deletegroup, test_group2) + + targetdir = self._test_join(self.dnsname_dc1, "RIDALLOCTEST9") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, + session_info=system_session(lp), lp=lp) + + destination_dsa_guid = misc.GUID(new_ldb.get_ntds_GUID()) + + repl = drs_utils.drs_Replicate(f'ncacn_ip_tcp:{self.dnsname_dc1}[seal]', + lp, + self.get_credentials(), + new_ldb, + destination_dsa_guid) + + source_dsa_invocation_id = misc.GUID(self.ldb_dc1.invocation_id) + + # Add the link on the remote DC + self.ldb_dc1.add_remove_group_members(test_group2, [test_user5]) + + # Delete the user locally + new_ldb.deleteuser(test_user5) + + # Confirm replication copes with a link to a locally deleted user + repl.replicate(self.ldb_dc1.domain_dn(), + source_dsa_invocation_id, + destination_dsa_guid) + + finally: + self._test_force_demote(self.dnsname_dc1, "RIDALLOCTEST9") + shutil.rmtree(targetdir, ignore_errors=True) |