diff options
Diffstat (limited to 'source4/torture/drs/python/repl_move.py')
-rw-r--r-- | source4/torture/drs/python/repl_move.py | 2608 |
1 files changed, 2608 insertions, 0 deletions
diff --git a/source4/torture/drs/python/repl_move.py b/source4/torture/drs/python/repl_move.py new file mode 100644 index 0000000..c206ab8 --- /dev/null +++ b/source4/torture/drs/python/repl_move.py @@ -0,0 +1,2608 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010 +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# +# Usage: +# export DC1=dc1_dns_name +# export DC2=dc2_dns_name +# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_move -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# + +import time +import samba.tests + +from samba.ndr import ndr_unpack +from samba.dcerpc import drsblobs +from samba.dcerpc import misc +from samba.drs_utils import drs_DsBind + +from ldb import ( + SCOPE_BASE, + SCOPE_SUBTREE, +) + +import drs_base +import ldb +from samba.dcerpc.drsuapi import ( + drsuapi, + DRSUAPI_ATTID_accountExpires, + DRSUAPI_ATTID_cn, + DRSUAPI_ATTID_codePage, + DRSUAPI_ATTID_countryCode, + DRSUAPI_ATTID_dBCSPwd, + DRSUAPI_ATTID_description, + DRSUAPI_ATTID_instanceType, + DRSUAPI_ATTID_isDeleted, + DRSUAPI_ATTID_isRecycled, + DRSUAPI_ATTID_lastKnownParent, + DRSUAPI_ATTID_lmPwdHistory, + DRSUAPI_ATTID_logonHours, + DRSUAPI_ATTID_name, + DRSUAPI_ATTID_ntPwdHistory, + DRSUAPI_ATTID_ntSecurityDescriptor, + DRSUAPI_ATTID_objectCategory, + DRSUAPI_ATTID_objectClass, + DRSUAPI_ATTID_objectSid, + DRSUAPI_ATTID_ou, + DRSUAPI_ATTID_primaryGroupID, + DRSUAPI_ATTID_pwdLastSet, + DRSUAPI_ATTID_sAMAccountName, + DRSUAPI_ATTID_sAMAccountType, + DRSUAPI_ATTID_unicodePwd, + DRSUAPI_ATTID_userAccountControl, + DRSUAPI_ATTID_userPrincipalName, + DRSUAPI_ATTID_whenCreated, + DRSUAPI_DRS_SYNC_FORCED, + DRSUAPI_EXOP_REPL_OBJ, + DsGetNCChangesRequest8, + DsReplicaHighWaterMark, + DsReplicaObjectIdentifier) + + +class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): + + def setUp(self): + super(DrsMoveObjectTestCase, self).setUp() + # disable automatic replication temporary + self._disable_all_repl(self.dnsname_dc1) + self._disable_all_repl(self.dnsname_dc2) + + # make sure DCs are synchronized before the test + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + self.top_ou = samba.tests.create_test_ou(self.ldb_dc1, + "replica_move") + + self.ou1_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU1") + self.ou1_dn.add_base(self.top_ou) + ou1 = {} + ou1["dn"] = self.ou1_dn + ou1["objectclass"] = "organizationalUnit" + ou1["ou"] = self.ou1_dn.get_component_value(0) + self.ldb_dc1.add(ou1) + + self.ou2_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2") + self.ou2_dn.add_base(self.top_ou) + ou2 = {} + ou2["dn"] = self.ou2_dn + ou2["objectclass"] = "organizationalUnit" + ou2["ou"] = self.ou2_dn.get_component_value(0) + self.ldb_dc1.add(ou2) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + self.dc1_guid = self.ldb_dc1.get_invocation_id() + self.dc2_guid = self.ldb_dc2.get_invocation_id() + + self.drs_dc1 = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1) + self.drs_dc2 = self._ds_bind(self.dnsname_dc2, ip=self.url_dc2) + + def tearDown(self): + try: + self.ldb_dc1.delete(self.top_ou, ["tree_delete:1"]) + except ldb.LdbError as e: + (enum, string) = e.args + if enum == ldb.ERR_NO_SUCH_OBJECT: + pass + + self._enable_all_repl(self.dnsname_dc1) + self._enable_all_repl(self.dnsname_dc2) + super(DrsMoveObjectTestCase, self).tearDown() + + def _make_username(self): + return "DrsMoveU_" + time.strftime("%s", time.gmtime()) + + def _check_metadata(self, user_dn, sam_ldb, drs, metadata, expected): + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0]) + + self.assertEqual(len(repl.ctr.array), len(expected)) + + i = 0 + for o in repl.ctr.array: + e = expected[i] + (attid, orig_dsa, version) = e + self.assertEqual(attid, o.attid, + "(LDAP) Wrong attid " + "for expected value %d, wanted 0x%08x got 0x%08x" + % (i, attid, o.attid)) + self.assertEqual(o.originating_invocation_id, + misc.GUID(orig_dsa), + "(LDAP) Wrong originating_invocation_id " + "for expected value %d, attid 0x%08x, wanted %s got %s" + % (i, o.attid, + misc.GUID(orig_dsa), + o.originating_invocation_id)) + # Allow version to be skipped when it does not matter + if version is not None: + self.assertEqual(o.version, version, + "(LDAP) Wrong version for expected value %d, " + "attid 0x%08x, " + "wanted %d got %d" + % (i, o.attid, + version, o.version)) + i = i + 1 + + if drs is None: + return + + req8 = DsGetNCChangesRequest8() + + req8.source_dsa_invocation_id = misc.GUID(sam_ldb.get_invocation_id()) + req8.naming_context = DsReplicaObjectIdentifier() + req8.naming_context.dn = str(user_dn) + req8.highwatermark = DsReplicaHighWaterMark() + req8.highwatermark.tmp_highest_usn = 0 + req8.highwatermark.reserved_usn = 0 + req8.highwatermark.highest_usn = 0 + req8.uptodateness_vector = None + req8.replica_flags = DRSUAPI_DRS_SYNC_FORCED + req8.max_object_count = 1 + req8.max_ndr_size = 402116 + req8.extended_op = DRSUAPI_EXOP_REPL_OBJ + req8.fsmo_info = 0 + req8.partial_attribute_set = None + req8.partial_attribute_set_ex = None + req8.mapping_ctr.num_mappings = 0 + req8.mapping_ctr.mappings = None + + (drs_conn, drs_handle) = drs + + (level, drs_ctr) = drs_conn.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6) + self.assertEqual(drs_ctr.object_count, 1) + + self.assertEqual(len(drs_ctr.first_object.meta_data_ctr.meta_data), len(expected) - 1) + att_idx = 0 + for o in drs_ctr.first_object.meta_data_ctr.meta_data: + i = 0 + drs_attid = drs_ctr.first_object.object.attribute_ctr.attributes[att_idx] + e = expected[i] + (attid, orig_dsa, version) = e + + # Skip the RDN from the expected set, it is not sent over DRS + if (user_dn.get_rdn_name().upper() == "CN" + and attid == DRSUAPI_ATTID_cn) \ + or (user_dn.get_rdn_name().upper() == "OU" + and attid == DRSUAPI_ATTID_ou): + i = i + 1 + e = expected[i] + (attid, orig_dsa, version) = e + + self.assertEqual(attid, drs_attid.attid, + "(DRS) Wrong attid " + "for expected value %d, wanted 0x%08x got 0x%08x" + % (i, attid, drs_attid.attid)) + + self.assertEqual(o.originating_invocation_id, + misc.GUID(orig_dsa), + "(DRS) Wrong originating_invocation_id " + "for expected value %d, attid 0x%08x, wanted %s got %s" + % (i, attid, + misc.GUID(orig_dsa), + o.originating_invocation_id)) + # Allow version to be skipped when it does not matter + if version is not None: + self.assertEqual(o.version, version, + "(DRS) Wrong version for expected value %d, " + "attid 0x%08x, " + "wanted %d got %d" + % (i, attid, version, o.version)) + break + i = i + 1 + att_idx = att_idx + 1 + + # now also used to check the group + def _check_obj(self, sam_ldb, obj_orig, is_deleted, expected_metadata=None, drs=None): + # search the user by guid as it may be deleted + guid_str = self._GUID_string(obj_orig["objectGUID"][0]) + res = sam_ldb.search(base='<GUID=%s>' % guid_str, + controls=["show_deleted:1"], + attrs=["*", "parentGUID", + "replPropertyMetaData"]) + self.assertEqual(len(res), 1) + user_cur = res[0] + rdn_orig = str(obj_orig[user_cur.dn.get_rdn_name()][0]) + rdn_cur = str(user_cur[user_cur.dn.get_rdn_name()][0]) + name_orig = str(obj_orig["name"][0]) + name_cur = str(user_cur["name"][0]) + dn_orig = obj_orig["dn"] + dn_cur = user_cur["dn"] + # now check properties of the user + if is_deleted: + self.assertTrue("isDeleted" in user_cur) + self.assertEqual(rdn_cur.split('\n')[0], rdn_orig) + self.assertEqual(name_cur.split('\n')[0], name_orig) + self.assertEqual(dn_cur.get_rdn_value().split('\n')[0], + dn_orig.get_rdn_value()) + self.assertEqual(name_cur, rdn_cur) + else: + self.assertFalse("isDeleted" in user_cur) + self.assertEqual(rdn_cur, rdn_orig) + self.assertEqual(name_cur, name_orig) + self.assertEqual(dn_cur, dn_orig) + self.assertEqual(name_cur, rdn_cur) + parent_cur = user_cur["parentGUID"][0] + try: + parent_orig = obj_orig["parentGUID"][0] + self.assertEqual(parent_orig, parent_cur) + except KeyError: + pass + self.assertEqual(name_cur, user_cur.dn.get_rdn_value()) + + if expected_metadata is not None: + self._check_metadata(dn_cur, sam_ldb, drs, user_cur["replPropertyMetaData"], + expected_metadata) + + return user_cur + + def test_ReplicateMoveObject1(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the OU is renamed + - We verify that after replication, + that the user has the correct DN (under OU2) + - the OU is deleted + - the OU is modified on DC2 + - We verify that after replication, + that the user has the correct DN (deleted) and has not description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + initial_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_orig, is_deleted=False, + expected_metadata=initial_metadata) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + moved_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC1 after rename - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=moved_metadata) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + moved_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=moved_metadata_dc2) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + deleted_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True, expected_metadata=deleted_metadata) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = new_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + modified_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 1), + (DRSUAPI_ATTID_description, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=modified_metadata) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + deleted_modified_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 2), + (DRSUAPI_ATTID_description, self.dc2_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_modified_metadata_dc2) + self.assertFalse("description" in user_cur) + + # trigger replication from DC2 to DC1, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + deleted_modified_metadata_dc1 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_description, self.dc2_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_modified_metadata_dc1) + self.assertFalse("description" in user_cur) + + def test_ReplicateMoveObject2(self): + """Verifies how a moved container with a user inside is not + replicated between two DCs as no replication is triggered + This test should verify that: + - the OU is not replicated + - the user is not replicated + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + initial_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_orig, is_deleted=False, + expected_metadata=initial_metadata) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_moved_orig = ldb_res[0] + + moved_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC1 after rename - should be valid user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=moved_metadata) + + # check user info on DC2 - should not be there, we have not done replication + ldb_res = self.ldb_dc2.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 0) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + deleted_metadata_dc1 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC1 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + deleted_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC2 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc2) + + # trigger replication from DC2 to DC1, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + def test_ReplicateMoveObject3(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is created on DC1 + - the OU is renamed on DC1 + - We verify that after replication, + that the user has the correct DN (under OU2). + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + initial_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_orig, is_deleted=False, + expected_metadata=initial_metadata) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + moved_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC1 after rename - should be valid user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=moved_metadata) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + deleted_metadata_dc1 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC1 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + deleted_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC2 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc2) + + def test_ReplicateMoveObject3b(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is created on DC1 + - the OU is renamed on DC1 + - We verify that after replication, + that the user has the correct DN (under OU2). + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + initial_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_orig, is_deleted=False, + expected_metadata=initial_metadata) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # trigger replication from DC2 (Which has never seen the object) to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + moved_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC1 after rename - should be valid user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=moved_metadata) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + deleted_metadata_dc1 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC1 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + deleted_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC2 - should be deleted user + self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc2) + + def test_ReplicateMoveObject4(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the user is modified on DC2 + - the OU is renamed on DC1 + - We verify that after replication DC1 -> DC2, + that the user has the correct DN (under OU2), and the description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + initial_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_orig, is_deleted=False, + expected_metadata=initial_metadata) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + initial_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_orig, is_deleted=False, + expected_metadata=initial_metadata_dc2) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + moved_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC1 after rename - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=moved_metadata) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = user_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + modified_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 1), + (DRSUAPI_ATTID_description, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 1), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_orig, + is_deleted=False, + expected_metadata=modified_metadata) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + modified_renamed_metadata = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 2), + (DRSUAPI_ATTID_description, self.dc2_guid, 1), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 2), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 1), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)] + + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=modified_renamed_metadata) + + self.assertTrue("description" in user_cur) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + deleted_metadata_dc1 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=False, + expected_metadata=modified_renamed_metadata) + + self.assertTrue("description" in user_cur) + + deleted_metadata_dc1 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc1_guid, 2), + (DRSUAPI_ATTID_description, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc1) + + self.assertFalse("description" in user_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + deleted_metadata_dc2 = [ + (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1), + (DRSUAPI_ATTID_cn, self.dc2_guid, 3), + (DRSUAPI_ATTID_description, self.dc1_guid, 2), + (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1), + (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1), + (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1), + (DRSUAPI_ATTID_name, self.dc1_guid, 3), + (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None), + (DRSUAPI_ATTID_codePage, self.dc1_guid, 2), + (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2), + (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1), + (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1), + (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2), + (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2), + (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1), + (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2), + (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1), + (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2), + (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2), + (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1), + (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2), + (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)] + + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2, + obj_orig=user_moved_orig, + is_deleted=True, + expected_metadata=deleted_metadata_dc2) + + self.assertFalse("description" in user_cur) + + def test_ReplicateMoveObject5(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the user is modified on DC2 + - the OU is renamed on DC1 + - We verify that after replication DC2 -> DC1, + that the user has the correct DN (under OU2), and the description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = user_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # delete user on DC2 + self.ldb_dc2.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + def test_ReplicateMoveObject6(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (under OU2), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + def test_ReplicateMoveObject7(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to be under OU2 + - We verify that after replication DC2 -> DC1, + that the OU1 has the correct DN (under OU2), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + def test_ReplicateMoveObject8(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to OU1-renamed + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (OU1-renamed), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou1_dn.parent()) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + def test_ReplicateMoveObject9(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to be under OU2 + - the OU1 is renamed on DC1 to OU1-renamed + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (OU1-renamed), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou1_dn.parent()) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + def test_ReplicateMoveObject10(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is deleted on DC1 + - We verify that after replication DC1 -> DC2, + that the OU1 is deleted, and the description has gone away + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + def test_ReplicateMoveObject11(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is deleted on DC1 + - We verify that after replication DC2 -> DC1, + that the OU1 is deleted, and the description has gone away + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + +class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): + + def setUp(self): + super(DrsMoveBetweenTreeOfObjectTestCase, self).setUp() + # disable automatic replication temporary + self._disable_all_repl(self.dnsname_dc1) + self._disable_all_repl(self.dnsname_dc2) + + self.top_ou = samba.tests.create_test_ou(self.ldb_dc1, + "replica_move") + + # make sure DCs are synchronized before the test + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + self.ou1_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU1") + self.ou1_dn.add_base(self.top_ou) + self.ou1 = {} + self.ou1["dn"] = self.ou1_dn + self.ou1["objectclass"] = "organizationalUnit" + self.ou1["ou"] = self.ou1_dn.get_component_value(0) + + self.ou2_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2,OU=DrsOU1") + self.ou2_dn.add_base(self.top_ou) + self.ou2 = {} + self.ou2["dn"] = self.ou2_dn + self.ou2["objectclass"] = "organizationalUnit" + self.ou2["ou"] = self.ou2_dn.get_component_value(0) + + self.ou2b_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2B,OU=DrsOU1") + self.ou2b_dn.add_base(self.top_ou) + self.ou2b = {} + self.ou2b["dn"] = self.ou2b_dn + self.ou2b["objectclass"] = "organizationalUnit" + self.ou2b["ou"] = self.ou2b_dn.get_component_value(0) + + self.ou2c_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2C,OU=DrsOU1") + self.ou2c_dn.add_base(self.top_ou) + + self.ou3_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU3,OU=DrsOU2,OU=DrsOU1") + self.ou3_dn.add_base(self.top_ou) + self.ou3 = {} + self.ou3["dn"] = self.ou3_dn + self.ou3["objectclass"] = "organizationalUnit" + self.ou3["ou"] = self.ou3_dn.get_component_value(0) + + self.ou4_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU4,OU=DrsOU3,OU=DrsOU2,OU=DrsOU1") + self.ou4_dn.add_base(self.top_ou) + self.ou4 = {} + self.ou4["dn"] = self.ou4_dn + self.ou4["objectclass"] = "organizationalUnit" + self.ou4["ou"] = self.ou4_dn.get_component_value(0) + + self.ou5_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU5,OU=DrsOU4,OU=DrsOU3,OU=DrsOU2,OU=DrsOU1") + self.ou5_dn.add_base(self.top_ou) + self.ou5 = {} + self.ou5["dn"] = self.ou5_dn + self.ou5["objectclass"] = "organizationalUnit" + self.ou5["ou"] = self.ou5_dn.get_component_value(0) + + self.ou6_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU6,OU=DrsOU5,OU=DrsOU4,OU=DrsOU3,OU=DrsOU2,OU=DrsOU1") + self.ou6_dn.add_base(self.top_ou) + self.ou6 = {} + self.ou6["dn"] = self.ou6_dn + self.ou6["objectclass"] = "organizationalUnit" + self.ou6["ou"] = self.ou6_dn.get_component_value(0) + + def tearDown(self): + self.ldb_dc1.delete(self.top_ou, ["tree_delete:1"]) + self._enable_all_repl(self.dnsname_dc1) + self._enable_all_repl(self.dnsname_dc2) + super(DrsMoveBetweenTreeOfObjectTestCase, self).tearDown() + + def _make_username(self): + return "DrsTreeU_" + time.strftime("%s", time.gmtime()) + + # now also used to check the group + def _check_obj(self, sam_ldb, obj_orig, is_deleted): + # search the user by guid as it may be deleted + guid_str = self._GUID_string(obj_orig["objectGUID"][0]) + res = sam_ldb.search(base='<GUID=%s>' % guid_str, + controls=["show_deleted:1"], + attrs=["*", "parentGUID"]) + self.assertEqual(len(res), 1) + user_cur = res[0] + cn_orig = str(obj_orig["cn"][0]) + cn_cur = str(user_cur["cn"][0]) + name_orig = str(obj_orig["name"][0]) + name_cur = str(user_cur["name"][0]) + dn_orig = obj_orig["dn"] + dn_cur = user_cur["dn"] + # now check properties of the user + if is_deleted: + self.assertTrue("isDeleted" in user_cur) + self.assertEqual(cn_cur.split('\n')[0], cn_orig) + self.assertEqual(name_cur.split('\n')[0], name_orig) + self.assertEqual(dn_cur.get_rdn_value().split('\n')[0], + dn_orig.get_rdn_value()) + self.assertEqual(name_cur, cn_cur) + else: + self.assertFalse("isDeleted" in user_cur) + self.assertEqual(cn_cur, cn_orig) + self.assertEqual(name_cur, name_orig) + self.assertEqual(dn_cur, dn_orig) + self.assertEqual(name_cur, cn_cur) + self.assertEqual(name_cur, user_cur.dn.get_rdn_value()) + + return user_cur + + def test_ReplicateMoveInTree1(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - a complex OU tree can be replicated correctly + - the user is in the correct spot (renamed into) within the tree + on both DCs + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + self.ldb_dc1.add(self.ou2) + self.ldb_dc1.add(self.ou3) + self.ldb_dc1.add(self.ou4) + self.ldb_dc1.add(self.ou5) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou5_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateMoveInTree2(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - a complex OU tree can be replicated correctly + - the user is in the correct spot (renamed into) within the tree + on both DCs + - that a rename back works correctly, and is replicated + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + self.ldb_dc1.add(self.ou2) + self.ldb_dc1.add(self.ou2b) + self.ldb_dc1.add(self.ou3) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou3_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + new_dn3 = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou3_dn.get_component_value(0)) + new_dn3.add_base(self.ou2b_dn) + self.ldb_dc1.rename(self.ou3_dn, new_dn3) + + ldb_res = self.ldb_dc1.search(base=new_dn3, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + + # Rename on DC1 + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou1_dn) + self.ldb_dc1.rename(user_moved_dn, new_dn) + + # Modify description on DC2 + msg = ldb.Message() + msg.dn = user_moved_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + def test_ReplicateMoveInTree3(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - a complex OU tree can be replicated correctly + - the user is in the correct spot (renamed into) within the tree + on both DCs + - that a rename back works correctly, and is replicated + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + self.ldb_dc1.add(self.ou2) + self.ldb_dc1.add(self.ou2b) + self.ldb_dc1.add(self.ou3) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou3_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + new_dn3 = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou3_dn.get_component_value(0)) + new_dn3.add_base(self.ou2b_dn) + self.ldb_dc1.rename(self.ou3_dn, new_dn3) + + ldb_res = self.ldb_dc1.search(base=new_dn3, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_moved_dn, new_dn) + + self.ldb_dc1.rename(self.ou2_dn, self.ou2c_dn) + self.ldb_dc1.rename(self.ou2b_dn, self.ou2_dn) + self.ldb_dc1.rename(self.ou2c_dn, self.ou2b_dn) + + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + + self.assertEqual(user_cur["parentGUID"], user_moved_orig["parentGUID"]) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateMoveInTree3b(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - a complex OU tree can be replicated correctly + - the user is in the correct spot (renamed into) within the tree + on both DCs + - that a rename back works correctly, and is replicated + - that a complex rename suffle, combined with unrelated changes to the object, + is replicated correctly. The aim here is the send the objects out-of-order + when sorted by usnChanged. + - confirm that the OU tree and (in particular the user DN) is identical between + the DCs once this has been replicated. + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + self.ldb_dc1.add(self.ou2) + self.ldb_dc1.add(self.ou2b) + self.ldb_dc1.add(self.ou3) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + + msg = ldb.Message() + msg.dn = new_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + # The sleep(1) calls here ensure that the name objects get a + # new 1-sec based timestamp, and so we select how the conflict + # resolution resolves. + self.ldb_dc1.rename(self.ou2_dn, self.ou2c_dn) + time.sleep(1) + self.ldb_dc1.rename(self.ou2b_dn, self.ou2_dn) + time.sleep(1) + self.ldb_dc1.rename(self.ou2c_dn, self.ou2b_dn) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]), new_dn) + + msg = ldb.Message() + msg.dn = self.ou2_dn + msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + msg = ldb.Message() + msg.dn = self.ou2b_dn + msg["description"] = ldb.MessageElement("OU2b Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertEqual(user_cur["parentGUID"][0], user_moved_orig["parentGUID"][0]) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateMoveInTree4(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - an OU and user can be replicated correctly, even after a rename + - The creation and rename of the OU has been combined with unrelated changes to the object, + The aim here is the send the objects out-of-order when sorted by usnChanged. + - That is, the OU will be sorted by usnChanged after the user that is within that OU. + - That will cause the client to need to get the OU first, by use of the GET_ANC flag + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + self.ldb_dc1.add(self.ou2) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + msg = ldb.Message() + msg.dn = self.ou2_dn + msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEqual(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateAddInOU(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - an OU and user can be replicated correctly + - The creation of the OU has been combined with unrelated changes to the object, + The aim here is the send the objects out-of-order when sorted by usnChanged. + - That is, the OU will be sorted by usnChanged after the user that is within that OU. + - That will cause the client to need to get the OU first, by use of the GET_ANC flag + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + + msg = ldb.Message() + msg.dn = self.ou1_dn + msg["description"] = ldb.MessageElement("OU1 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False) + + self.assertEqual(user_cur["parentGUID"], user_orig["parentGUID"]) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateAddInMovedOU(self): + """Verifies how an object is replicated between two DCs. + This test should verify that: + - an OU and user can be replicated correctly + - The creation of the OU has been combined with unrelated changes to the object, + The aim here is the send the objects out-of-order when sorted by usnChanged. + - That is, the OU will be sorted by usnChanged after the user that is within that OU. + - That will cause the client to need to get the OU first, by use of the GET_ANC flag + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + self.ldb_dc1.add(self.ou2) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + self.ldb_dc1.rename(self.ou2_dn, self.ou2b_dn) + + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_moved = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved, is_deleted=False) + + self.assertEqual(user_cur["parentGUID"], user_moved["parentGUID"]) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateAddInConflictOU_time(self): + """Verifies how an object is replicated between two DCs, when created in an ambiguous location + This test should verify that: + - Without replication, two conflicting objects can be created + - force the conflict resolution algorithm so we know which copy will win + (by sleeping while creating the objects, therefore increasing that timestamp on 'name') + - confirm that the user object, created on DC1, ends up in the right place on DC2 + - therefore confirm that the conflict algorithm worked correctly, and that parentGUID was used. + + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # Now create two, conflicting objects. This gives the user + # object something to be under on both DCs. + + # We sleep between the two adds so that DC1 adds second, and + # so wins the conflict resolution due to a later creation time + # (modification timestamp on the name attribute). + self.ldb_dc2.add(self.ou2) + time.sleep(1) + self.ldb_dc1.add(self.ou2) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + # Now that we have renamed the user (and so bumped the + # usnChanged), bump the value on the OUs. + msg = ldb.Message() + msg.dn = self.ou2_dn + msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + msg = ldb.Message() + msg.dn = self.ou2_dn + msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_moved = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be under the OU2 from DC1 + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved, is_deleted=False) + + self.assertEqual(user_cur["parentGUID"], user_moved["parentGUID"]) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + def test_ReplicateAddInConflictOU2(self): + """Verifies how an object is replicated between two DCs, when created in an ambiguous location + This test should verify that: + - Without replication, two conflicting objects can be created + - force the conflict resolution algorithm so we know which copy will win + (by changing the description twice, therefore increasing that version count) + - confirm that the user object, created on DC1, ends up in the right place on DC2 + - therefore confirm that the conflict algorithm worked correctly, and that parentGUID was used. + """ + # work-out unique username to test with + username = self._make_username() + + self.ldb_dc1.add(self.ou1) + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s,ou=%s" + % (self.ou1_dn.get_component_value(0), + self.top_ou.get_component_value(0)), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # Now create two, conflicting objects. This gives the user + # object something to be under on both DCs. We create it on + # DC1 1sec later so that it will win the conflict resolution. + + self.ldb_dc2.add(self.ou2) + time.sleep(1) + self.ldb_dc1.add(self.ou2) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + + # Now that we have renamed the user (and so bumped the + # usnChanged), bump the value on the OUs. + msg = ldb.Message() + msg.dn = self.ou2_dn + msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc1.modify(msg) + + msg = ldb.Message() + msg.dn = self.ou2_dn + msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username, + attrs=["*", "parentGUID"]) + self.assertEqual(len(ldb_res), 1) + user_moved = ldb_res[0] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be under the OU2 from DC1 + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved, is_deleted=False) + + self.assertEqual(user_cur["parentGUID"], user_moved["parentGUID"]) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) |