diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
commit | 8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch) | |
tree | 4099e8021376c7d8c05bdf8503093d80e9c7bad0 /source4/dsdb/tests/python/tombstone_reanimation.py | |
parent | Initial commit. (diff) | |
download | samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip |
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'source4/dsdb/tests/python/tombstone_reanimation.py')
-rwxr-xr-x | source4/dsdb/tests/python/tombstone_reanimation.py | 958 |
1 files changed, 958 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/tombstone_reanimation.py b/source4/dsdb/tests/python/tombstone_reanimation.py new file mode 100755 index 0000000..88aebd6 --- /dev/null +++ b/source4/dsdb/tests/python/tombstone_reanimation.py @@ -0,0 +1,958 @@ +#!/usr/bin/env python3 +# +# Tombstone reanimation tests +# +# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014 +# Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import unittest + +sys.path.insert(0, "bin/python") +import samba + +from samba.ndr import ndr_unpack, ndr_print +from samba.dcerpc import misc +from samba.dcerpc import drsblobs +from samba.dcerpc.drsuapi import * +from samba.tests.password_test import PasswordCommon +from samba.common import get_string + +import samba.tests +from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message, + MessageElement, LdbError, + ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS, + ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM) + + +class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase): + """ verify Samba restores required attributes when + user restores a Deleted object + """ + + def setUp(self): + super(RestoredObjectAttributesBaseTestCase, self).setUp() + self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD") + self.base_dn = self.samdb.domain_dn() + self.schema_dn = self.samdb.get_schema_basedn().get_linearized() + self.configuration_dn = self.samdb.get_config_basedn().get_linearized() + + # permit password changes during this test + PasswordCommon.allow_password_changes(self, self.samdb) + + def tearDown(self): + super(RestoredObjectAttributesBaseTestCase, self).tearDown() + + def GUID_string(self, guid): + return get_string(self.samdb.schema_format_value("objectGUID", guid)) + + def search_guid(self, guid, attrs=None): + if attrs is None: + attrs = ["*"] + + res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid), + scope=SCOPE_BASE, attrs=attrs, + controls=["show_deleted:1"]) + self.assertEqual(len(res), 1) + return res[0] + + def search_dn(self, dn): + res = self.samdb.search(expression="(objectClass=*)", + base=dn, + scope=SCOPE_BASE, + controls=["show_recycled:1"]) + self.assertEqual(len(res), 1) + return res[0] + + def _create_object(self, msg): + """:param msg: dict with dn and attributes to create an object from""" + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, msg['dn']) + self.samdb.add(msg) + return self.search_dn(msg['dn']) + + def assertNamesEqual(self, attrs_expected, attrs_extra): + self.assertEqual(attrs_expected, attrs_extra, + "Actual object does not have expected attributes, missing from expected (%s), extra (%s)" + % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected)))) + + def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest): + self.assertNamesEqual(attrs_orig, attrs_rest) + # remove volatile attributes, they can't be equal + attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"]) + for attr in attrs_orig: + # convert original attr value to ldif + orig_val = obj_orig.get(attr) + if orig_val is None: + continue + if not isinstance(orig_val, MessageElement): + orig_val = MessageElement(str(orig_val), 0, attr) + m = Message() + m.add(orig_val) + orig_ldif = self.samdb.write_ldif(m, 0) + # convert restored attr value to ldif + rest_val = obj_restored.get(attr) + self.assertFalse(rest_val is None) + m = Message() + if not isinstance(rest_val, MessageElement): + rest_val = MessageElement(str(rest_val), 0, attr) + m.add(rest_val) + rest_ldif = self.samdb.write_ldif(m, 0) + # compare generated ldif's + self.assertEqual(orig_ldif, rest_ldif) + + def assertAttributesExists(self, attr_expected, obj_msg): + """Check object contains at least expected attrbigutes + :param attr_expected: dict of expected attributes with values. ** is any value + :param obj_msg: Ldb.Message for the object under test + """ + actual_names = set(obj_msg.keys()) + # Samba does not use 'dSCorePropagationData', so skip it + actual_names -= set(['dSCorePropagationData']) + expected_names = set(attr_expected.keys()) + self.assertNamesEqual(expected_names, actual_names) + for name in attr_expected.keys(): + expected_val = attr_expected[name] + actual_val = obj_msg.get(name) + self.assertFalse(actual_val is None, "No value for attribute '%s'" % name) + if expected_val == "**": + # "**" values means "any" + continue + # if expected_val is e.g. ldb.bytes we can't depend on + # str(actual_value) working, we may just get a decoding + # error. Better to just compare raw values + if not isinstance(expected_val, str): + actual_val = actual_val[0] + else: + actual_val = str(actual_val) + self.assertEqual(expected_val, actual_val, + "Unexpected value (%s) for '%s', expected (%s)" % ( + repr(actual_val), name, repr(expected_val))) + + def _check_metadata(self, metadata, expected): + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0]) + + repl_array = [] + for o in repl.ctr.array: + repl_array.append((o.attid, o.version)) + repl_set = set(repl_array) + + expected_set = set(expected) + self.assertEqual(len(repl_set), len(expected), + "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % ( + str(expected_set.difference(repl_set)), + str(repl_set.difference(expected_set)), + ndr_print(repl))) + + i = 0 + for o in repl.ctr.array: + e = expected[i] + (attid, version) = e + self.assertEqual(attid, o.attid, + "(LDAP) Wrong attid " + "for expected value %d, wanted 0x%08x got 0x%08x, " + "repl: \n%s" + % (i, attid, o.attid, ndr_print(repl))) + # Allow version to be skipped when it does not matter + if version is not None: + self.assertEqual(o.version, version, + "(LDAP) Wrong version for expected value %d, " + "attid 0x%08x, " + "wanted %d got %d, repl: \n%s" + % (i, o.attid, + version, o.version, ndr_print(repl))) + i = i + 1 + + @staticmethod + def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None): + """Restores a deleted object + :param samdb: SamDB connection to SAM + :param del_dn: str Deleted object DN + :param new_dn: str Where to restore the object + :param new_attrs: dict Additional attributes to set + """ + msg = Message() + msg.dn = Dn(samdb, str(del_dn)) + msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") + msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName") + if new_attrs is not None: + assert isinstance(new_attrs, dict) + for attr in new_attrs: + msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr) + samdb.modify(msg, ["show_deleted:1"]) + + +class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase): + def setUp(self): + super(BaseRestoreObjectTestCase, self).setUp() + + def enable_recycle_bin(self): + msg = Message() + msg.dn = Dn(self.samdb, "") + msg["enableOptionalFeature"] = MessageElement( + "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a", + FLAG_MOD_ADD, "enableOptionalFeature") + try: + self.samdb.modify(msg) + except LdbError as e: + (num, _) = e.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + def test_undelete(self): + print("Testing standard undelete operation") + usr1 = "cn=testuser,cn=users," + self.base_dn + samba.tests.delete_force(self.samdb, usr1) + self.samdb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1 = objLive1["objectGUID"][0] + self.samdb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1) + objLive2 = self.search_dn(usr1) + self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower()) + samba.tests.delete_force(self.samdb, usr1) + + def test_rename(self): + print("Testing attempt to rename deleted object") + usr1 = "cn=testuser,cn=users," + self.base_dn + self.samdb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1 = objLive1["objectGUID"][0] + self.samdb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + # just to make sure we get the correct error if the show deleted is missing + try: + self.samdb.rename(str(objDeleted1.dn), usr1) + self.fail() + except LdbError as e1: + (num, _) = e1.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + try: + self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"]) + self.fail() + except LdbError as e2: + (num, _) = e2.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + def test_undelete_with_mod(self): + print("Testing standard undelete operation with modification of additional attributes") + usr1 = "cn=testuser,cn=users," + self.base_dn + self.samdb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1 = objLive1["objectGUID"][0] + self.samdb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"}) + objLive2 = self.search_dn(usr1) + self.assertEqual(str(objLive2["url"][0]), "www.samba.org") + samba.tests.delete_force(self.samdb, usr1) + + def test_undelete_newuser(self): + print("Testing undelete user with a different dn") + usr1 = "cn=testuser,cn=users," + self.base_dn + usr2 = "cn=testuser2,cn=users," + self.base_dn + samba.tests.delete_force(self.samdb, usr1) + self.samdb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1 = objLive1["objectGUID"][0] + self.samdb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2) + objLive2 = self.search_dn(usr2) + samba.tests.delete_force(self.samdb, usr1) + samba.tests.delete_force(self.samdb, usr2) + + def test_undelete_existing(self): + print("Testing undelete user after a user with the same dn has been created") + usr1 = "cn=testuser,cn=users," + self.base_dn + self.samdb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1 = objLive1["objectGUID"][0] + self.samdb.delete(usr1) + self.samdb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objDeleted1 = self.search_guid(guid1) + try: + self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1) + self.fail() + except LdbError as e3: + (num, _) = e3.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + + def test_undelete_cross_nc(self): + print("Cross NC undelete") + c1 = "cn=ldaptestcontainer," + self.base_dn + c2 = "cn=ldaptestcontainer2," + self.configuration_dn + c3 = "cn=ldaptestcontainer," + self.configuration_dn + c4 = "cn=ldaptestcontainer2," + self.base_dn + samba.tests.delete_force(self.samdb, c1) + samba.tests.delete_force(self.samdb, c2) + samba.tests.delete_force(self.samdb, c3) + samba.tests.delete_force(self.samdb, c4) + self.samdb.add({ + "dn": c1, + "objectclass": "container"}) + self.samdb.add({ + "dn": c2, + "objectclass": "container"}) + objLive1 = self.search_dn(c1) + objLive2 = self.search_dn(c2) + guid1 = objLive1["objectGUID"][0] + guid2 = objLive2["objectGUID"][0] + self.samdb.delete(c1) + self.samdb.delete(c2) + objDeleted1 = self.search_guid(guid1) + objDeleted2 = self.search_guid(guid2) + # try to undelete from base dn to config + try: + self.restore_deleted_object(self.samdb, objDeleted1.dn, c3) + self.fail() + except LdbError as e4: + (num, _) = e4.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + # try to undelete from config to base dn + try: + self.restore_deleted_object(self.samdb, objDeleted2.dn, c4) + self.fail() + except LdbError as e5: + (num, _) = e5.args + self.assertEqual(num, ERR_OPERATIONS_ERROR) + # assert undeletion will work in same nc + self.restore_deleted_object(self.samdb, objDeleted1.dn, c4) + self.restore_deleted_object(self.samdb, objDeleted2.dn, c3) + + +class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test cases for delete/reanimate user objects""" + + def _expected_user_add_attributes(self, username, user_dn, category): + return {'dn': user_dn, + 'objectClass': '**', + 'cn': username, + 'distinguishedName': user_dn, + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': username, + 'objectGUID': '**', + 'userAccountControl': '546', + 'badPwdCount': '0', + 'badPasswordTime': '0', + 'codePage': '0', + 'countryCode': '0', + 'lastLogon': '0', + 'lastLogoff': '0', + 'pwdLastSet': '0', + 'primaryGroupID': '513', + 'objectSid': '**', + 'accountExpires': '9223372036854775807', + 'logonCount': '0', + 'sAMAccountName': username, + 'sAMAccountType': '805306368', + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) + } + + def _expected_user_add_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 1), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 1), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 1), + (DRSUAPI_ATTID_countryCode, 1), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 1), + (DRSUAPI_ATTID_primaryGroupID, 1), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_accountExpires, 1), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 1), + (DRSUAPI_ATTID_objectCategory, 1)] + + def _expected_user_del_attributes(self, username, _guid, _sid): + guid = ndr_unpack(misc.GUID, _guid) + dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn) + cn = "%s\nDEL:%s" % (username, guid) + return {'dn': dn, + 'objectClass': '**', + 'cn': cn, + 'distinguishedName': dn, + 'isDeleted': 'TRUE', + 'isRecycled': 'TRUE', + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': cn, + 'objectGUID': _guid, + 'userAccountControl': '546', + 'objectSid': _sid, + 'sAMAccountName': username, + 'lastKnownParent': 'CN=Users,%s' % self.base_dn, + } + + def _expected_user_del_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 2), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_isDeleted, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 2), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 2), + (DRSUAPI_ATTID_countryCode, 2), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 2), + (DRSUAPI_ATTID_primaryGroupID, 2), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_accountExpires, 2), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 2), + (DRSUAPI_ATTID_lastKnownParent, 1), + (DRSUAPI_ATTID_objectCategory, 2), + (DRSUAPI_ATTID_isRecycled, 1)] + + def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category): + return {'dn': user_dn, + 'objectClass': '**', + 'cn': username, + 'distinguishedName': user_dn, + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': username, + 'objectGUID': guid, + 'userAccountControl': '546', + 'badPwdCount': '0', + 'badPasswordTime': '0', + 'codePage': '0', + 'countryCode': '0', + 'lastLogon': '0', + 'lastLogoff': '0', + 'pwdLastSet': '0', + 'primaryGroupID': '513', + 'operatorCount': '0', + 'objectSid': sid, + 'adminCount': '0', + 'accountExpires': '0', + 'logonCount': '0', + 'sAMAccountName': username, + 'sAMAccountType': '805306368', + 'lastKnownParent': 'CN=Users,%s' % self.base_dn, + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) + } + + def _expected_user_restore_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 3), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_isDeleted, 2), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 3), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 3), + (DRSUAPI_ATTID_countryCode, 3), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 3), + (DRSUAPI_ATTID_primaryGroupID, 3), + (DRSUAPI_ATTID_operatorCount, 1), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_adminCount, 1), + (DRSUAPI_ATTID_accountExpires, 3), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 3), + (DRSUAPI_ATTID_lastKnownParent, 1), + (DRSUAPI_ATTID_objectCategory, 3), + (DRSUAPI_ATTID_isRecycled, 2)] + + def test_restore_user(self): + print("Test restored user attributes") + username = "restore_user" + usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn) + samba.tests.delete_force(self.samdb, usr_dn) + self.samdb.add({ + "dn": usr_dn, + "objectClass": "user", + "sAMAccountName": username}) + obj = self.search_dn(usr_dn) + guid = obj["objectGUID"][0] + sid = obj["objectSID"][0] + obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj) + self._check_metadata(obj_rmd["replPropertyMetaData"], + self._expected_user_add_metadata()) + self.samdb.delete(usr_dn) + obj_del = self.search_guid(guid) + obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + orig_attrs = set(obj.keys()) + del_attrs = set(obj_del.keys()) + self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del) + self._check_metadata(obj_del_rmd["replPropertyMetaData"], + self._expected_user_del_metadata()) + # restore the user and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn) + obj_restore = self.search_guid(guid) + obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + # check original attributes and restored one are same + orig_attrs = set(obj.keys()) + # windows restore more attributes that originally we have + orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent']) + rest_attrs = set(obj_restore.keys()) + self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore) + self._check_metadata(obj_restore_rmd["replPropertyMetaData"], + self._expected_user_restore_metadata()) + + +class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test cases for delete/reanimate user objects with password""" + + def _expected_userpw_add_attributes(self, username, user_dn, category): + return {'dn': user_dn, + 'objectClass': '**', + 'cn': username, + 'distinguishedName': user_dn, + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': username, + 'objectGUID': '**', + 'userAccountControl': '546', + 'badPwdCount': '0', + 'badPasswordTime': '0', + 'codePage': '0', + 'countryCode': '0', + 'lastLogon': '0', + 'lastLogoff': '0', + 'pwdLastSet': '**', + 'primaryGroupID': '513', + 'objectSid': '**', + 'accountExpires': '9223372036854775807', + 'logonCount': '0', + 'sAMAccountName': username, + 'sAMAccountType': '805306368', + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) + } + + def _expected_userpw_add_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 1), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 1), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 1), + (DRSUAPI_ATTID_countryCode, 1), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 1), + (DRSUAPI_ATTID_primaryGroupID, 1), + (DRSUAPI_ATTID_supplementalCredentials, 1), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_accountExpires, 1), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 1), + (DRSUAPI_ATTID_objectCategory, 1)] + + def _expected_userpw_del_attributes(self, username, _guid, _sid): + guid = ndr_unpack(misc.GUID, _guid) + dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn) + cn = "%s\nDEL:%s" % (username, guid) + return {'dn': dn, + 'objectClass': '**', + 'cn': cn, + 'distinguishedName': dn, + 'isDeleted': 'TRUE', + 'isRecycled': 'TRUE', + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': cn, + 'objectGUID': _guid, + 'userAccountControl': '546', + 'objectSid': _sid, + 'sAMAccountName': username, + 'lastKnownParent': 'CN=Users,%s' % self.base_dn, + } + + def _expected_userpw_del_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 2), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_isDeleted, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 2), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 2), + (DRSUAPI_ATTID_countryCode, 2), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 2), + (DRSUAPI_ATTID_ntPwdHistory, 2), + (DRSUAPI_ATTID_pwdLastSet, 2), + (DRSUAPI_ATTID_primaryGroupID, 2), + (DRSUAPI_ATTID_supplementalCredentials, 2), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_accountExpires, 2), + (DRSUAPI_ATTID_lmPwdHistory, None), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 2), + (DRSUAPI_ATTID_lastKnownParent, 1), + (DRSUAPI_ATTID_objectCategory, 2), + (DRSUAPI_ATTID_isRecycled, 1)] + + def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category): + return {'dn': user_dn, + 'objectClass': '**', + 'cn': username, + 'distinguishedName': user_dn, + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': username, + 'objectGUID': guid, + 'userAccountControl': '546', + 'badPwdCount': '0', + 'badPasswordTime': '0', + 'codePage': '0', + 'countryCode': '0', + 'lastLogon': '0', + 'lastLogoff': '0', + 'pwdLastSet': '**', + 'primaryGroupID': '513', + 'operatorCount': '0', + 'objectSid': sid, + 'adminCount': '0', + 'accountExpires': '0', + 'logonCount': '0', + 'sAMAccountName': username, + 'sAMAccountType': '805306368', + 'lastKnownParent': 'CN=Users,%s' % self.base_dn, + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) + } + + def _expected_userpw_restore_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 3), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_isDeleted, 2), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 3), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 3), + (DRSUAPI_ATTID_countryCode, 3), + (DRSUAPI_ATTID_dBCSPwd, 2), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 3), + (DRSUAPI_ATTID_ntPwdHistory, 3), + (DRSUAPI_ATTID_pwdLastSet, 4), + (DRSUAPI_ATTID_primaryGroupID, 3), + (DRSUAPI_ATTID_supplementalCredentials, 3), + (DRSUAPI_ATTID_operatorCount, 1), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_adminCount, 1), + (DRSUAPI_ATTID_accountExpires, 3), + (DRSUAPI_ATTID_lmPwdHistory, None), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 3), + (DRSUAPI_ATTID_lastKnownParent, 1), + (DRSUAPI_ATTID_objectCategory, 3), + (DRSUAPI_ATTID_isRecycled, 2)] + + def test_restorepw_user(self): + print("Test restored user attributes") + username = "restorepw_user" + usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn) + samba.tests.delete_force(self.samdb, usr_dn) + self.samdb.add({ + "dn": usr_dn, + "objectClass": "user", + "userPassword": "thatsAcomplPASS0", + "sAMAccountName": username}) + obj = self.search_dn(usr_dn) + guid = obj["objectGUID"][0] + sid = obj["objectSID"][0] + obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj) + self._check_metadata(obj_rmd["replPropertyMetaData"], + self._expected_userpw_add_metadata()) + self.samdb.delete(usr_dn) + obj_del = self.search_guid(guid) + obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + orig_attrs = set(obj.keys()) + del_attrs = set(obj_del.keys()) + self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del) + self._check_metadata(obj_del_rmd["replPropertyMetaData"], + self._expected_userpw_del_metadata()) + # restore the user and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]}) + obj_restore = self.search_guid(guid) + obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + # check original attributes and restored one are same + orig_attrs = set(obj.keys()) + # windows restore more attributes that originally we have + orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent']) + rest_attrs = set(obj_restore.keys()) + self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore) + self._check_metadata(obj_restore_rmd["replPropertyMetaData"], + self._expected_userpw_restore_metadata()) + + +class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test different scenarios for delete/reanimate group objects""" + + def _make_object_dn(self, name): + return "CN=%s,CN=Users,%s" % (name, self.base_dn) + + def _create_test_user(self, user_name): + user_dn = self._make_object_dn(user_name) + ldif = { + "dn": user_dn, + "objectClass": "user", + "sAMAccountName": user_name, + } + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, user_dn) + # finally, create the group + self.samdb.add(ldif) + return self.search_dn(user_dn) + + def _create_test_group(self, group_name, members=None): + group_dn = self._make_object_dn(group_name) + ldif = { + "dn": group_dn, + "objectClass": "group", + "sAMAccountName": group_name, + } + try: + ldif["member"] = [str(usr_dn) for usr_dn in members] + except TypeError: + pass + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, group_dn) + # finally, create the group + self.samdb.add(ldif) + return self.search_dn(group_dn) + + def _expected_group_attributes(self, groupname, group_dn, category): + return {'dn': group_dn, + 'groupType': '-2147483646', + 'distinguishedName': group_dn, + 'sAMAccountName': groupname, + 'name': groupname, + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn), + 'objectClass': '**', + 'objectGUID': '**', + 'lastKnownParent': 'CN=Users,%s' % self.base_dn, + 'whenChanged': '**', + 'sAMAccountType': '268435456', + 'objectSid': '**', + 'whenCreated': '**', + 'uSNCreated': '**', + 'operatorCount': '0', + 'uSNChanged': '**', + 'instanceType': '4', + 'adminCount': '0', + 'cn': groupname} + + def test_plain_group(self): + print("Test restored Group attributes") + # create test group + obj = self._create_test_group("r_group") + guid = obj["objectGUID"][0] + # delete the group + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Group and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + # windows restore more attributes that originally we have + attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent']) + attr_rest = set(obj_restore.keys()) + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore) + + def test_group_with_members(self): + print("Test restored Group with members attributes") + # create test group + usr1 = self._create_test_user("r_user_1") + usr2 = self._create_test_user("r_user_2") + obj = self._create_test_group("r_group", [usr1.dn, usr2.dn]) + guid = obj["objectGUID"][0] + # delete the group + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Group and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + # windows restore more attributes that originally we have + attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent']) + # and does not restore following attributes + attr_orig.remove("member") + attr_rest = set(obj_restore.keys()) + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore) + + +class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase): + """Test different scenarios for delete/reanimate OU/container objects""" + + def _expected_container_attributes(self, rdn, name, dn, category): + if rdn == 'OU': + lastKnownParent = '%s' % self.base_dn + else: + lastKnownParent = 'CN=Users,%s' % self.base_dn + return {'dn': dn, + 'distinguishedName': dn, + 'name': name, + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn), + 'objectClass': '**', + 'objectGUID': '**', + 'lastKnownParent': lastKnownParent, + 'whenChanged': '**', + 'whenCreated': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'instanceType': '4', + rdn.lower(): name} + + def _create_test_ou(self, rdn, name=None, description=None): + ou_dn = "OU=%s,%s" % (rdn, self.base_dn) + # delete an object if leftover from previous test + samba.tests.delete_force(self.samdb, ou_dn) + # create ou and return created object + self.samdb.create_ou(ou_dn, name=name, description=description) + return self.search_dn(ou_dn) + + def test_ou_with_name_description(self): + print("Test OU reanimation") + # create OU to test with + obj = self._create_test_ou(rdn="r_ou", + name="r_ou name", + description="r_ou description") + guid = obj["objectGUID"][0] + # delete the object + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Object and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + attr_rest = set(obj_restore.keys()) + # windows restore more attributes that originally we have + attr_orig.update(["lastKnownParent"]) + # and does not restore following attributes + attr_orig -= set(["description"]) + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit") + self.assertAttributesExists(expected_attrs, obj_restore) + + def test_container(self): + print("Test Container reanimation") + # create test Container + obj = self._create_object({ + "dn": "CN=r_container,CN=Users,%s" % self.base_dn, + "objectClass": "container" + }) + guid = obj["objectGUID"][0] + # delete the object + self.samdb.delete(str(obj.dn)) + obj_del = self.search_guid(guid) + # restore the Object and fetch what's restored + self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn) + obj_restore = self.search_guid(guid) + # check original attributes and restored one are same + attr_orig = set(obj.keys()) + attr_rest = set(obj_restore.keys()) + # windows restore more attributes that originally we have + attr_orig.update(["lastKnownParent"]) + # and does not restore following attributes + attr_orig -= set(["showInAdvancedViewOnly"]) + self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest) + expected_attrs = self._expected_container_attributes("CN", "r_container", + str(obj.dn), "Container") + self.assertAttributesExists(expected_attrs, obj_restore) + + +if __name__ == '__main__': + unittest.main() |