#!/usr/bin/env python3 # -*- coding: utf-8 -*- # This is a port of the original in testprogs/ejs/ldap.js import optparse import sys import os import time sys.path.insert(0, "bin/python") import samba from samba.tests.subunitrun import SubunitOptions, TestProgram import samba.getopt as options from samba.credentials import Credentials, DONT_USE_KERBEROS from samba.auth import system_session from samba.common import get_string from ldb import SCOPE_BASE, LdbError from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM from ldb import ERR_OTHER, ERR_NO_SUCH_ATTRIBUTE from ldb import ERR_OBJECT_CLASS_VIOLATION from ldb import ERR_CONSTRAINT_VIOLATION from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS from ldb import ERR_INVALID_CREDENTIALS from ldb import ERR_STRONG_AUTH_REQUIRED from ldb import Message, MessageElement, Dn from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE from samba.samdb import SamDB from samba.dsdb import (UF_NORMAL_ACCOUNT, UF_ACCOUNTDISABLE, UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT, UF_PARTIAL_SECRETS_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT, UF_INTERDOMAIN_TRUST_ACCOUNT, UF_SMARTCARD_REQUIRED, UF_PASSWD_NOTREQD, UF_LOCKOUT, UF_PASSWORD_EXPIRED, ATYPE_NORMAL_ACCOUNT, GTYPE_SECURITY_BUILTIN_LOCAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP, GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP, GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP, GTYPE_DISTRIBUTION_GLOBAL_GROUP, GTYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_UNIVERSAL_GROUP, ATYPE_SECURITY_LOCAL_GROUP, ATYPE_DISTRIBUTION_GLOBAL_GROUP, ATYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_DISTRIBUTION_LOCAL_GROUP, ATYPE_WORKSTATION_TRUST) from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_ADMINS, DOMAIN_RID_DOMAIN_MEMBERS, DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS) from samba.ndr import ndr_unpack from samba.dcerpc import drsblobs from samba.dcerpc import drsuapi from samba.dcerpc import security from samba.tests import delete_force from samba import gensec from samba import werror parser = optparse.OptionParser("sam.py [options] ") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) parser.add_option_group(options.VersionOptions(parser)) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) subunitopts = SubunitOptions(parser) parser.add_option_group(subunitopts) opts, args = parser.parse_args() if len(args) < 1: parser.print_usage() sys.exit(1) host = args[0] lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) class SamTests(samba.tests.TestCase): def setUp(self): super(SamTests, self).setUp() self.ldb = ldb self.base_dn = ldb.domain_dn() print("baseDN: %s\n" % self.base_dn) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptest\,specialuser,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) delete_force(self.ldb, "cn=ldaptestcomputer2,cn=computers," + self.base_dn) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) def test_users_groups(self): """This tests the SAM users and groups behaviour""" print("Testing users and groups behaviour\n") ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) ldb.add({ "dn": "cn=ldaptestgroup2,cn=users," + self.base_dn, "objectclass": "group"}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["objectSID"]) self.assertTrue(len(res1) == 1) obj_sid = get_string(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])) group_rid_1 = security.dom_sid(obj_sid).split()[1] res1 = ldb.search("cn=ldaptestgroup2,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["objectSID"]) self.assertTrue(len(res1) == 1) obj_sid = get_string(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])) group_rid_2 = security.dom_sid(obj_sid).split()[1] # Try to create a user with an invalid account name try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "sAMAccountName": "administrator"}) self.fail() except LdbError as e9: (num, _) = e9.args self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Try to create a user with an invalid account name try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "sAMAccountName": []}) self.fail() except LdbError as e10: (num, _) = e10.args self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Try to create a user with an invalid primary group try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "primaryGroupID": "0"}) self.fail() except LdbError as e11: (num, _) = e11.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Try to Create a user with a valid primary group try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "primaryGroupID": str(group_rid_1)}) self.fail() except LdbError as e12: (num, _) = e12.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Test to see how we should behave when the user account doesn't # exist m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError as e13: (num, _) = e13.args self.assertEqual(num, ERR_NO_SUCH_OBJECT) # Test to see how we should behave when the account isn't a user m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError as e14: (num, _) = e14.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) # Test default primary groups on add operations ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # unfortunately the INTERDOMAIN_TRUST_ACCOUNT case cannot be tested # since such accounts aren't directly creatable (ACCESS_DENIED) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DCS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Read-only DC accounts are only creatable by # UF_WORKSTATION_TRUST_ACCOUNT and work only on DCs >= 2008 (therefore # we have a fallback in the assertion) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_PARTIAL_SECRETS_ACCOUNT | UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertTrue(int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_READONLY_DCS or int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_DOMAIN_MEMBERS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Test default primary groups on modify operations ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) # unfortunately the INTERDOMAIN_TRUST_ACCOUNT case cannot be tested # since such accounts aren't directly creatable (ACCESS_DENIED) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DCS) # Read-only DC accounts are only creatable by # UF_WORKSTATION_TRUST_ACCOUNT and work only on DCs >= 2008 (therefore # we have a fallback in the assertion) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_PARTIAL_SECRETS_ACCOUNT | UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertTrue(int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_READONLY_DCS or int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_DOMAIN_MEMBERS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Recreate account for further tests ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) # Try to set an invalid account name m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountName"] = MessageElement("administrator", FLAG_MOD_REPLACE, "sAMAccountName") try: ldb.modify(m) self.fail() except LdbError as e15: (num, _) = e15.args self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) # But to reset the actual "sAMAccountName" should still be possible res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountName"]) self.assertTrue(len(res1) == 1) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountName"] = MessageElement(res1[0]["sAMAccountName"][0], FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) # And another (free) name should be possible as well m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountName"] = MessageElement("xxx_ldaptestuser_xxx", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) # We should be able to reset our actual primary group m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Try to add invalid primary group m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError as e16: (num, _) = e16.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Try to make group 1 primary - should be denied since it is not yet # secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_1), FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError as e17: (num, _) = e17.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Make group 1 secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_REPLACE, "member") ldb.modify(m) # Make group 1 primary m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_1), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Try to delete group 1 - should be denied try: ldb.delete("cn=ldaptestgroup,cn=users," + self.base_dn) self.fail() except LdbError as e18: (num, _) = e18.args self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) # Try to add group 1 also as secondary - should be denied m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError as e19: (num, _) = e19.args self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) # Try to add invalid member to group 1 - should be denied m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement( "cn=ldaptestuser3,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError as e20: (num, _) = e20.args self.assertEqual(num, ERR_NO_SUCH_OBJECT) # Make group 2 secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") ldb.modify(m) # Swap the groups m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_2), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Swap the groups (does not really make sense but does the same) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_1), FLAG_MOD_REPLACE, "primaryGroupID") m["primaryGroupID"] = MessageElement(str(group_rid_2), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Old primary group should contain a "member" attribute for the user, # the new shouldn't contain anymore one res1 = ldb.search("cn=ldaptestgroup, cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["member"]) self.assertTrue(len(res1) == 1) self.assertTrue(len(res1[0]["member"]) == 1) self.assertEqual(str(res1[0]["member"][0]).lower(), ("cn=ldaptestuser,cn=users," + self.base_dn).lower()) res1 = ldb.search("cn=ldaptestgroup2, cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["member"]) self.assertTrue(len(res1) == 1) self.assertFalse("member" in res1[0]) # Primary group member m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_DELETE, "member") try: ldb.modify(m) self.fail() except LdbError as e21: (num, _) = e21.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Delete invalid group member m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser1,cn=users," + self.base_dn, FLAG_MOD_DELETE, "member") try: ldb.modify(m) self.fail() except LdbError as e22: (num, _) = e22.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Also this should be denied try: ldb.add({ "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, "objectclass": "user", "primaryGroupID": "0"}) self.fail() except LdbError as e23: (num, _) = e23.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Recreate user accounts delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) ldb.add({ "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, "objectclass": "user"}) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") ldb.modify(m) # Already added m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError as e24: (num, _) = e24.args self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) # Already added, but as res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["objectSid"]) self.assertTrue(len(res1) == 1) sid_bin = res1[0]["objectSid"][0] sid_str = ("").upper() m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement(sid_str, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError as e25: (num, _) = e25.args self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) # Invalid member m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser1,cn=users," + self.base_dn, FLAG_MOD_REPLACE, "member") try: ldb.modify(m) self.fail() except LdbError as e26: (num, _) = e26.args self.assertEqual(num, ERR_NO_SUCH_OBJECT) # Invalid member m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement(["cn=ldaptestuser,cn=users," + self.base_dn, "cn=ldaptestuser1,cn=users," + self.base_dn], FLAG_MOD_REPLACE, "member") try: ldb.modify(m) self.fail() except LdbError as e27: (num, _) = e27.args self.assertEqual(num, ERR_NO_SUCH_OBJECT) # Invalid member m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_REPLACE, "member") m["member"] = MessageElement("cn=ldaptestuser1,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError as e28: (num, _) = e28.args self.assertEqual(num, ERR_NO_SUCH_OBJECT) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement(["cn=ldaptestuser,cn=users," + self.base_dn, "cn=ldaptestuser2,cn=users," + self.base_dn], FLAG_MOD_REPLACE, "member") ldb.modify(m) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) # Make also a small test for accounts with special DNs ("," in this case) ldb.add({ "dn": "cn=ldaptest\,specialuser,cn=users," + self.base_dn, "objectclass": "user"}) delete_force(self.ldb, "cn=ldaptest\,specialuser,cn=users," + self.base_dn) def test_sam_attributes(self): """Test the behaviour of special attributes of SAM objects""" print("Testing the behaviour of special attributes of SAM objects\n") ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement(str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_ADD, "groupType") try: ldb.modify(m) self.fail() except LdbError as e29: (num, _) = e29.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) # Delete protection tests for attr in ["nTSecurityDescriptor", "objectSid", "sAMAccountType", "sAMAccountName", "groupType"]: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m[attr] = MessageElement([], FLAG_MOD_REPLACE, attr) try: ldb.modify(m) self.fail() except LdbError as e: (num, _) = e.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m[attr] = MessageElement([], FLAG_MOD_DELETE, attr) try: ldb.modify(m) self.fail() except LdbError as e1: (num, _) = e1.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("513", FLAG_MOD_ADD, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError as e30: (num, _) = e30.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_ADD, "userAccountControl") try: ldb.modify(m) self.fail() except LdbError as e31: (num, _) = e31.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["objectSid"] = MessageElement("xxxxxxxxxxxxxxxx", FLAG_MOD_ADD, "objectSid") try: ldb.modify(m) self.fail() except LdbError as e32: (num, _) = e32.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountType"] = MessageElement("0", FLAG_MOD_ADD, "sAMAccountType") try: ldb.modify(m) self.fail() except LdbError as e33: (num, _) = e33.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountName"] = MessageElement("test", FLAG_MOD_ADD, "sAMAccountName") try: ldb.modify(m) self.fail() except LdbError as e34: (num, _) = e34.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) # Delete protection tests for attr in ["nTSecurityDescriptor", "objectSid", "sAMAccountType", "sAMAccountName", "primaryGroupID", "userAccountControl", "accountExpires", "badPasswordTime", "badPwdCount", "codePage", "countryCode", "lastLogoff", "lastLogon", "logonCount", "pwdLastSet"]: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m[attr] = MessageElement([], FLAG_MOD_REPLACE, attr) try: ldb.modify(m) self.fail() except LdbError as e2: (num, _) = e2.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m[attr] = MessageElement([], FLAG_MOD_DELETE, attr) try: ldb.modify(m) self.fail() except LdbError as e3: (num, _) = e3.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_primary_group_token_constructed(self): """Test the primary group token behaviour (hidden-generated-readonly attribute on groups) and some other constructed attributes""" print("Testing primary group token behaviour and other constructed attributes\n") try: ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "primaryGroupToken": "100"}) self.fail() except LdbError as e35: (num, _) = e35.args self.assertEqual(num, ERR_UNDEFINED_ATTRIBUTE_TYPE) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) # Testing for one invalid, and one valid operational attribute, but also the things they are built from res1 = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken", "canonicalName", "objectClass", "objectSid"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) self.assertTrue("canonicalName" in res1[0]) self.assertTrue("objectClass" in res1[0]) self.assertTrue("objectSid" in res1[0]) res1 = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken", "canonicalName"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) self.assertFalse("objectSid" in res1[0]) self.assertFalse("objectClass" in res1[0]) self.assertTrue("canonicalName" in res1[0]) res1 = ldb.search("cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) res1 = ldb.search("cn=ldaptestuser, cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken", "objectSID"]) self.assertTrue(len(res1) == 1) primary_group_token = int(res1[0]["primaryGroupToken"][0]) obj_sid = get_string(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])) rid = security.dom_sid(obj_sid).split()[1] self.assertEqual(primary_group_token, rid) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["primaryGroupToken"] = "100" try: ldb.modify(m) self.fail() except LdbError as e36: (num, _) = e36.args self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_tokenGroups(self): """Test the tokenGroups behaviour (hidden-generated-readonly attribute on SAM objects)""" print("Testing tokenGroups behaviour\n") # The domain object shouldn't contain any "tokenGroups" entry res = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) self.assertTrue(len(res) == 1) self.assertFalse("tokenGroups" in res[0]) # The domain administrator should contain "tokenGroups" entries # (the exact number depends on the domain/forest function level and the # DC software versions) res = ldb.search("cn=Administrator,cn=Users," + self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) self.assertTrue(len(res) == 1) self.assertTrue("tokenGroups" in res[0]) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) # This testuser should contain at least two "tokenGroups" entries # (exactly two on an unmodified "Domain Users" and "Users" group) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) self.assertTrue(len(res) == 1) self.assertTrue(len(res[0]["tokenGroups"]) >= 2) # one entry which we need to find should point to domains "Domain Users" # group and another entry should point to the builtin "Users"group domain_users_group_found = False users_group_found = False for sid in res[0]["tokenGroups"]: obj_sid = get_string(ldb.schema_format_value("objectSID", sid)) rid = security.dom_sid(obj_sid).split()[1] if rid == 513: domain_users_group_found = True if rid == 545: users_group_found = True self.assertTrue(domain_users_group_found) self.assertTrue(users_group_found) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_groupType(self): """Test the groupType behaviour""" print("Testing groupType behaviour\n") # You can never create or change to a # "GTYPE_SECURITY_BUILTIN_LOCAL_GROUP" # Add operation # Invalid attribute try: ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": "0"}) self.fail() except LdbError as e37: (num, _) = e37.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) try: ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP)}) self.fail() except LdbError as e38: (num, _) = e38.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_GLOBAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_UNIVERSAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_LOCAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_DISTRIBUTION_GLOBAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_LOCAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) # Modify operation ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) # We can change in this direction: global <-> universal <-> local # On each step also the group type itself (security/distribution) is # variable. # After creation we should have a "security global group" res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Invalid attribute try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement("0", FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e39: (num, _) = e39.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Security groups # Default is "global group" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change to "local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e40: (num, _) = e40.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change to "local" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_LOCAL_GROUP) # Change to "global" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e41: (num, _) = e41.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change to "builtin local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e42: (num, _) = e42.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change to "builtin local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e43: (num, _) = e43.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change to "builtin local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e44: (num, _) = e44.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Distribution groups # Default is "global group" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) # Change to local (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e45: (num, _) = e45.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change to "local" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_LOCAL_GROUP) # Change to "global" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e46: (num, _) = e46.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change back to "universal" # Try to add invalid member to group 1 - should be denied m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement( "cn=ldaptestuser3,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError as e47: (num, _) = e47.args self.assertEqual(num, ERR_NO_SUCH_OBJECT) # Make group 2 secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) # Both group types: this performs only random checks - all possibilities # would require too much code. # Default is "global group" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change to "local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e48: (num, _) = e48.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change to "local" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_LOCAL_GROUP) # Change to "global" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError as e49: (num, _) = e49.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_pwdLastSet(self): """Test the pwdLastSet behaviour""" print("Testing pwdLastSet behaviour\n") ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "pwdLastSet": "0"}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "pwdLastSet": "-1"}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0) lastset = int(res1[0]["pwdLastSet"][0]) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "pwdLastSet": str(1)}) self.fail() except LdbError as e50: (num, msg) = e50.args self.assertEqual(num, ERR_OTHER) self.assertTrue('00000057' in msg) try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "pwdLastSet": str(lastset)}) self.fail() except LdbError as e51: (num, msg) = e51.args self.assertEqual(num, ERR_OTHER) self.assertTrue('00000057' in msg) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(0), FLAG_MOD_REPLACE, "pwdLastSet") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(0), FLAG_MOD_DELETE, "pwdLastSet") m["pls2"] = MessageElement(str(0), FLAG_MOD_ADD, "pwdLastSet") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(-1), FLAG_MOD_REPLACE, "pwdLastSet") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) self.assertGreater(int(res1[0]["pwdLastSet"][0]), lastset) lastset = int(res1[0]["pwdLastSet"][0]) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(0), FLAG_MOD_DELETE, "pwdLastSet") m["pls2"] = MessageElement(str(0), FLAG_MOD_ADD, "pwdLastSet") ldb.modify(m) self.fail() except LdbError as e52: (num, msg) = e52.args self.assertEqual(num, ERR_NO_SUCH_ATTRIBUTE) self.assertTrue('00002085' in msg) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(-1), FLAG_MOD_DELETE, "pwdLastSet") m["pls2"] = MessageElement(str(0), FLAG_MOD_ADD, "pwdLastSet") ldb.modify(m) self.fail() except LdbError as e53: (num, msg) = e53.args self.assertEqual(num, ERR_NO_SUCH_ATTRIBUTE) self.assertTrue('00002085' in msg) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(lastset), FLAG_MOD_DELETE, "pwdLastSet") m["pls2"] = MessageElement(str(-1), FLAG_MOD_ADD, "pwdLastSet") time.sleep(0.2) ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) self.assertEqual(int(res1[0]["pwdLastSet"][0]), lastset) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(lastset), FLAG_MOD_DELETE, "pwdLastSet") m["pls2"] = MessageElement(str(lastset), FLAG_MOD_ADD, "pwdLastSet") ldb.modify(m) self.fail() except LdbError as e54: (num, msg) = e54.args self.assertEqual(num, ERR_OTHER) self.assertTrue('00000057' in msg) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(lastset), FLAG_MOD_DELETE, "pwdLastSet") m["pls2"] = MessageElement(str(0), FLAG_MOD_ADD, "pwdLastSet") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) uac = int(res1[0]["userAccountControl"][0]) self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["uac1"] = MessageElement(str(uac |UF_PASSWORD_EXPIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_ldap_bind_must_change_pwd(self): """Test the error messages for failing LDAP binds""" print("Test the error messages for failing LDAP binds\n") delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def format_error_msg(hresult_v, dsid_v, werror_v): # # There are 4 lower case hex digits following 'v' at the end, # but different Windows Versions return different values: # # Windows 2008R2 uses 'v1db1' # Windows 2012R2 uses 'v2580' # return "%08X: LdapErr: DSID-%08X, comment: AcceptSecurityContext error, data %x, v" % ( hresult_v, dsid_v, werror_v) HRES_SEC_E_LOGON_DENIED = 0x8009030C HRES_SEC_E_INVALID_TOKEN = 0x80090308 sasl_bind_dsid = 0x0C0904DC simple_bind_dsid = 0x0C0903A9 error_msg_sasl_wrong_pw = format_error_msg( HRES_SEC_E_LOGON_DENIED, sasl_bind_dsid, werror.WERR_LOGON_FAILURE) error_msg_sasl_must_change = format_error_msg( HRES_SEC_E_LOGON_DENIED, sasl_bind_dsid, werror.WERR_PASSWORD_MUST_CHANGE) error_msg_simple_wrong_pw = format_error_msg( HRES_SEC_E_INVALID_TOKEN, simple_bind_dsid, werror.WERR_LOGON_FAILURE) error_msg_simple_must_change = format_error_msg( HRES_SEC_E_INVALID_TOKEN, simple_bind_dsid, werror.WERR_PASSWORD_MUST_CHANGE) username = "ldaptestuser" password = "thatsAcomplPASS2" utf16pw = ('"' + password + '"').encode('utf-16-le') ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "sAMAccountName": username, "userAccountControl": str(UF_NORMAL_ACCOUNT), "unicodePwd": utf16pw, }) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountName", "sAMAccountType", "userAccountControl", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["sAMAccountName"][0]), username) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT) self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0) # Open a second LDB connection with the user credentials. Use the # command line credentials for information like the domain, the realm # and the workstation. sasl_creds = Credentials() sasl_creds.set_username(username) sasl_creds.set_password(password) sasl_creds.set_domain(creds.get_domain()) sasl_creds.set_workstation(creds.get_workstation()) sasl_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) sasl_creds.set_kerberos_state(DONT_USE_KERBEROS) sasl_wrong_creds = Credentials() sasl_wrong_creds.set_username(username) sasl_wrong_creds.set_password("wrong") sasl_wrong_creds.set_domain(creds.get_domain()) sasl_wrong_creds.set_workstation(creds.get_workstation()) sasl_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) sasl_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS) simple_creds = Credentials() simple_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn) simple_creds.set_username(username) simple_creds.set_password(password) simple_creds.set_domain(creds.get_domain()) simple_creds.set_workstation(creds.get_workstation()) simple_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) simple_creds.set_kerberos_state(DONT_USE_KERBEROS) simple_wrong_creds = Credentials() simple_wrong_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn) simple_wrong_creds.set_username(username) simple_wrong_creds.set_password("wrong") simple_wrong_creds.set_domain(creds.get_domain()) simple_wrong_creds.set_workstation(creds.get_workstation()) simple_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) simple_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS) sasl_ldb = SamDB(url=host, credentials=sasl_creds, lp=lp) self.assertIsNotNone(sasl_ldb) sasl_ldb = None requires_strong_auth = False try: simple_ldb = SamDB(url=host, credentials=simple_creds, lp=lp) self.assertIsNotNone(simple_ldb) simple_ldb = None except LdbError as e55: (num, msg) = e55.args if num != ERR_STRONG_AUTH_REQUIRED: raise requires_strong_auth = True def assertLDAPErrorMsg(msg, expected_msg): self.assertTrue(expected_msg in msg, "msg[%s] does not contain expected[%s]" % ( msg, expected_msg)) try: ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp) self.fail() except LdbError as e56: (num, msg) = e56.args self.assertEqual(num, ERR_INVALID_CREDENTIALS) self.assertTrue(error_msg_sasl_wrong_pw in msg) if not requires_strong_auth: try: ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp) self.fail() except LdbError as e4: (num, msg) = e4.args self.assertEqual(num, ERR_INVALID_CREDENTIALS) assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["pls1"] = MessageElement(str(0), FLAG_MOD_REPLACE, "pwdLastSet") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["pwdLastSet"]) self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) try: ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp) self.fail() except LdbError as e57: (num, msg) = e57.args self.assertEqual(num, ERR_INVALID_CREDENTIALS) assertLDAPErrorMsg(msg, error_msg_sasl_wrong_pw) try: ldb_fail = SamDB(url=host, credentials=sasl_creds, lp=lp) self.fail() except LdbError as e58: (num, msg) = e58.args self.assertEqual(num, ERR_INVALID_CREDENTIALS) assertLDAPErrorMsg(msg, error_msg_sasl_must_change) if not requires_strong_auth: try: ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp) self.fail() except LdbError as e5: (num, msg) = e5.args self.assertEqual(num, ERR_INVALID_CREDENTIALS) assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw) try: ldb_fail = SamDB(url=host, credentials=simple_creds, lp=lp) self.fail() except LdbError as e6: (num, msg) = e6.args self.assertEqual(num, ERR_INVALID_CREDENTIALS) assertLDAPErrorMsg(msg, error_msg_simple_must_change) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_0_uac(self): """Test the userAccountControl behaviour""" print("Testing userAccountControl behaviour\n") # With a user object # Add operation # As user you can only set a normal account. # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can set a interdomain trust account. ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": "0"}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_normal(self): """Test the userAccountControl behaviour""" ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT)}) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_normal_pwnotreq(self): ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_normal_pwnotreq_lockout_expired(self): ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_LOCKOUT | UF_PASSWORD_EXPIRED)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) self.assertFalse("lockoutTime" in res1[0]) self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_temp_dup(self): try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_TEMP_DUPLICATE_ACCOUNT)}) self.fail() except LdbError as e59: (num, _) = e59.args self.assertEqual(num, ERR_OTHER) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_server(self): try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) self.fail() except LdbError as e60: (num, _) = e60.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_workstation(self): try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) except LdbError as e61: (num, _) = e61.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_rodc(self): try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)}) except LdbError as e62: (num, _) = e62.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl_user_add_trust(self): try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) self.fail() except LdbError as e63: (num, _) = e63.args self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Modify operation def test_userAccountControl_user_modify(self): ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) # After creation we should have a normal account res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) # As user you can only switch from a normal account to a workstation # trust account and back. # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can switch to a interdomain trust account. # Invalid attribute try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) except LdbError as e64: (num, _) = e64.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) except LdbError as e65: (num, _) = e65.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_ACCOUNTDISABLE), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime") m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_LOCKOUT | UF_PASSWORD_EXPIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0) self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_TEMP_DUPLICATE_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError as e66: (num, _) = e66.args self.assertEqual(num, ERR_OTHER) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_SERVER_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError as e67: (num, _) = e67.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError as e68: (num, _) = e68.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_INTERDOMAIN_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError as e69: (num, _) = e69.args self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) def test_userAccountControl_computer_add_0_uac(self): # With a computer object # Add operation # As computer you can set a normal account and a server trust account. # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can set a interdomain trust account. ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": "0"}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_normal(self): ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_NORMAL_ACCOUNT)}) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_normal_pwnotreqd(self): ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_normal_pwnotreqd_lockout_expired(self): ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_LOCKOUT | UF_PASSWORD_EXPIRED)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) self.assertFalse("lockoutTime" in res1[0]) self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_temp_dup(self): try: ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_TEMP_DUPLICATE_ACCOUNT)}) self.fail() except LdbError as e70: (num, _) = e70.args self.assertEqual(num, ERR_OTHER) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_server(self): ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_workstation(self): try: ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) except LdbError as e71: (num, _) = e71.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_add_trust(self): try: ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) self.fail() except LdbError as e72: (num, _) = e72.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_userAccountControl_computer_modify(self): # Modify operation ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer"}) # After creation we should have a normal account res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) # As computer you can switch from a normal account to a workstation # or server trust account and back (also swapping between trust # accounts is allowed). # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can switch to a interdomain trust account. # Invalid attribute try: m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) except LdbError as e73: (num, _) = e73.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) except LdbError as e74: (num, _) = e74.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_ACCOUNTDISABLE), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime") m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_LOCKOUT | UF_PASSWORD_EXPIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0) self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_TEMP_DUPLICATE_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError as e75: (num, _) = e75.args self.assertEqual(num, ERR_OTHER) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_SERVER_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_SERVER_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_INTERDOMAIN_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError as e76: (num, _) = e76.args self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) # "primaryGroupID" does not change if account type remains the same # For a user account ldb.add({ "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)}) res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["userAccountControl"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE) m = Message() m.dn = Dn(ldb, "") m["member"] = MessageElement( "cn=ldaptestuser2,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_ADMINS), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["userAccountControl", "primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_ADMINS) # For a workstation account res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS) m = Message() m.dn = Dn(ldb, "") m["member"] = MessageElement( "cn=ldaptestcomputer,cn=computers," + self.base_dn, FLAG_MOD_ADD, "member") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def find_repl_meta_data(self, rpmd, attid): for i in range(0, rpmd.ctr.count): m = rpmd.ctr.array[i] if m.attid == attid: return m return None def test_smartcard_required1(self): """Test the UF_SMARTCARD_REQUIRED behaviour""" print("Testing UF_SMARTCARD_REQUIRED behaviour\n") delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT), "unicodePwd": "\"thatsAcomplPASS2\"".encode('utf-16-le') }) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT) self.assertNotEqual(int(res[0]["pwdLastSet"][0]), 0) lastset = int(res[0]["pwdLastSet"][0]) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 1) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 1) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 1) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 1) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNotNone(spcbmd) self.assertEqual(spcbmd.version, 1) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED) self.assertEqual(int(res[0]["pwdLastSet"][0]), lastset) lastset1 = int(res[0]["pwdLastSet"][0]) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 2) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 2) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 2) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 2) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 2) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNotNone(spcbmd) self.assertEqual(spcbmd.version, 2) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_smartcard_required2(self): """Test the UF_SMARTCARD_REQUIRED behaviour""" print("Testing UF_SMARTCARD_REQUIRED behaviour\n") delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE), }) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE) self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) self.assertTrue("msDS-KeyVersionNumber" in res[0]) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 1) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 1) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 1) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 1) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNone(spcbmd) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE |UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE |UF_SMARTCARD_REQUIRED) self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 2) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 2) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 2) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 2) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 2) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNotNone(spcbmd) self.assertEqual(spcbmd.version, 1) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED) self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 2) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 2) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 2) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 2) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 2) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNotNone(spcbmd) self.assertEqual(spcbmd.version, 1) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_smartcard_required3(self): """Test the UF_SMARTCARD_REQUIRED behaviour""" print("Testing UF_SMARTCARD_REQUIRED behaviour\n") delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user", "userAccountControl": str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED |UF_ACCOUNTDISABLE), }) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED |UF_ACCOUNTDISABLE) self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 1) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 1) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 1) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 1) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNotNone(spcbmd) self.assertEqual(spcbmd.version, 1) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType", "userAccountControl", "pwdLastSet", "msDS-KeyVersionNumber", "replPropertyMetaData"]) self.assertTrue(len(res) == 1) self.assertEqual(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED) self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, res[0]["replPropertyMetaData"][0]) lastsetmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_pwdLastSet) self.assertIsNotNone(lastsetmd) self.assertEqual(lastsetmd.version, 1) nthashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_unicodePwd) self.assertIsNotNone(nthashmd) self.assertEqual(nthashmd.version, 1) nthistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_ntPwdHistory) self.assertIsNotNone(nthistmd) self.assertEqual(nthistmd.version, 1) lmhashmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_dBCSPwd) self.assertIsNotNone(lmhashmd) self.assertEqual(lmhashmd.version, 1) lmhistmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_lmPwdHistory) self.assertIsNotNone(lmhistmd) self.assertEqual(lmhistmd.version, 1) spcbmd = self.find_repl_meta_data(rpmd, drsuapi.DRSUAPI_ATTID_supplementalCredentials) self.assertIsNotNone(spcbmd) self.assertEqual(spcbmd.version, 1) delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_isCriticalSystemObject_user(self): """Test the isCriticalSystemObject behaviour""" print("Testing isCriticalSystemObject behaviour\n") # Add tests (of a user) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": "user"}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertTrue("isCriticalSystemObject" not in res1[0]) # Modification tests m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertTrue("isCriticalSystemObject" in res1[0]) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_isCriticalSystemObject(self): """Test the isCriticalSystemObject behaviour""" print("Testing isCriticalSystemObject behaviour\n") # Add tests ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer"}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertTrue("isCriticalSystemObject" in res1[0]) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") # Modification tests m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["isCriticalSystemObject"]) self.assertTrue(len(res1) == 1) self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_service_principal_name_updates(self): """Test the servicePrincipalNames update behaviour""" print("Testing servicePrincipalNames update behaviour\n") ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "dNSHostName": "testname.testdom"}) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertFalse("servicePrincipalName" in res[0]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "servicePrincipalName": "HOST/testname.testdom"}) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["dNSHostName"]) self.assertTrue(len(res) == 1) self.assertFalse("dNSHostName" in res[0]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "dNSHostName": "testname2.testdom", "servicePrincipalName": "HOST/testname.testdom"}) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["dNSHostName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname.testdom") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname.testdoM", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname.testdom") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname2.testdom2", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname2.testdom2") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement([], FLAG_MOD_DELETE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname2.testdom2") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname.testdom3", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname2.testdom2") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname2.testdom2", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname3.testdom3", FLAG_MOD_REPLACE, "dNSHostName") m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom2", FLAG_MOD_REPLACE, "servicePrincipalName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname3.testdom3") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom2", FLAG_MOD_REPLACE, "servicePrincipalName") m["dNSHostName"] = MessageElement("testname4.testdom4", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname2.testdom2") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["servicePrincipalName"] = MessageElement([], FLAG_MOD_DELETE, "servicePrincipalName") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname2.testdom2", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertFalse("servicePrincipalName" in res[0]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "sAMAccountName": "testname$"}) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertFalse("servicePrincipalName" in res[0]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "servicePrincipalName": "HOST/testname"}) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountName"]) self.assertTrue(len(res) == 1) self.assertTrue("sAMAccountName" in res[0]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "sAMAccountName": "testname$", "servicePrincipalName": "HOST/testname"}) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname$") res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("testnamE$", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("testname", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("test$name$", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/test$name") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("testname2", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname2") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("testname3", FLAG_MOD_REPLACE, "sAMAccountName") m["servicePrincipalName"] = MessageElement("HOST/testname2", FLAG_MOD_REPLACE, "servicePrincipalName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname3") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["servicePrincipalName"] = MessageElement("HOST/testname2", FLAG_MOD_REPLACE, "servicePrincipalName") m["sAMAccountName"] = MessageElement("testname4", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["servicePrincipalName"][0]), "HOST/testname2") m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["servicePrincipalName"] = MessageElement([], FLAG_MOD_DELETE, "servicePrincipalName") ldb.modify(m) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("testname2", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertFalse("servicePrincipalName" in res[0]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "dNSHostName": "testname.testdom", "sAMAccountName": "testname$", "servicePrincipalName": ["HOST/testname.testdom", "HOST/testname"] }) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname2.testdom", FLAG_MOD_REPLACE, "dNSHostName") m["sAMAccountName"] = MessageElement("testname2$", FLAG_MOD_REPLACE, "sAMAccountName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") self.assertTrue(str(res[0]["servicePrincipalName"][0]) == "HOST/testname2" or str(res[0]["servicePrincipalName"][1]) == "HOST/testname2") self.assertTrue(str(res[0]["servicePrincipalName"][0]) == "HOST/testname2.testdom" or str(res[0]["servicePrincipalName"][1]) == "HOST/testname2.testdom") delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "dNSHostName": "testname.testdom", "sAMAccountName": "testname$", "servicePrincipalName": ["HOST/testname.testdom", "HOST/testname"] }) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["sAMAccountName"] = MessageElement("testname2$", FLAG_MOD_REPLACE, "sAMAccountName") m["dNSHostName"] = MessageElement("testname2.testdom", FLAG_MOD_REPLACE, "dNSHostName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") self.assertTrue(len(res[0]["servicePrincipalName"]) == 2) self.assertTrue("HOST/testname2" in [str(x) for x in res[0]["servicePrincipalName"]]) self.assertTrue("HOST/testname2.testdom" in [str(x) for x in res[0]["servicePrincipalName"]]) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom", FLAG_MOD_ADD, "servicePrincipalName") try: ldb.modify(m) self.fail() except LdbError as e77: (num, _) = e77.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["servicePrincipalName"] = MessageElement("HOST/testname3", FLAG_MOD_ADD, "servicePrincipalName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") self.assertTrue(len(res[0]["servicePrincipalName"]) == 3) self.assertTrue("HOST/testname2" in [str(x) for x in res[0]["servicePrincipalName"]]) self.assertTrue("HOST/testname3" in [str(x) for x in res[0]["servicePrincipalName"]]) self.assertTrue("HOST/testname2.testdom" in [str(x) for x in res[0]["servicePrincipalName"]]) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["dNSHostName"] = MessageElement("testname3.testdom", FLAG_MOD_REPLACE, "dNSHostName") m["servicePrincipalName"] = MessageElement("HOST/testname3.testdom", FLAG_MOD_ADD, "servicePrincipalName") ldb.modify(m) res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) self.assertTrue(len(res) == 1) self.assertEqual(str(res[0]["dNSHostName"][0]), "testname3.testdom") self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") self.assertTrue(len(res[0]["servicePrincipalName"]) == 3) self.assertTrue("HOST/testname2" in [str(x) for x in res[0]["servicePrincipalName"]]) self.assertTrue("HOST/testname3" in [str(x) for x in res[0]["servicePrincipalName"]]) self.assertTrue("HOST/testname3.testdom" in [str(x) for x in res[0]["servicePrincipalName"]]) delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) def test_service_principal_name_uniqueness(self): """Test the servicePrincipalName uniqueness behaviour""" print("Testing servicePrincipalName uniqueness behaviour") ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": "computer", "servicePrincipalName": "HOST/testname.testdom"}) try: ldb.add({ "dn": "cn=ldaptestcomputer2,cn=computers," + self.base_dn, "objectclass": "computer", "servicePrincipalName": "HOST/testname.testdom"}) except LdbError as e: num, _ = e.args self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) else: self.fail() def test_sam_description_attribute(self): """Test SAM description attribute""" print("Test SAM description attribute") self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "description": "desc1" }) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertTrue("description" in res[0]) self.assertTrue(len(res[0]["description"]) == 1) self.assertEqual(str(res[0]["description"][0]), "desc1") delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "description": ["desc1", "desc2"]}) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertTrue("description" in res[0]) self.assertTrue(len(res[0]["description"]) == 2) self.assertTrue(str(res[0]["description"][0]) == "desc1" or str(res[0]["description"][1]) == "desc1") self.assertTrue(str(res[0]["description"][0]) == "desc2" or str(res[0]["description"][1]) == "desc2") m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_REPLACE, "description") try: ldb.modify(m) self.fail() except LdbError as e78: (num, _) = e78.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_DELETE, "description") ldb.modify(m) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement("desc1", FLAG_MOD_REPLACE, "description") ldb.modify(m) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertTrue("description" in res[0]) self.assertTrue(len(res[0]["description"]) == 1) self.assertEqual(str(res[0]["description"][0]), "desc1") delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "description": ["desc1", "desc2"]}) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement("desc1", FLAG_MOD_REPLACE, "description") ldb.modify(m) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertTrue("description" in res[0]) self.assertTrue(len(res[0]["description"]) == 1) self.assertEqual(str(res[0]["description"][0]), "desc1") m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement("desc3", FLAG_MOD_ADD, "description") try: ldb.modify(m) self.fail() except LdbError as e79: (num, _) = e79.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_DELETE, "description") try: ldb.modify(m) self.fail() except LdbError as e80: (num, _) = e80.args self.assertEqual(num, ERR_NO_SUCH_ATTRIBUTE) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement("desc1", FLAG_MOD_DELETE, "description") ldb.modify(m) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertFalse("description" in res[0]) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_REPLACE, "description") try: ldb.modify(m) self.fail() except LdbError as e81: (num, _) = e81.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement(["desc3", "desc4"], FLAG_MOD_ADD, "description") try: ldb.modify(m) self.fail() except LdbError as e82: (num, _) = e82.args self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["description"] = MessageElement("desc1", FLAG_MOD_ADD, "description") ldb.modify(m) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertTrue("description" in res[0]) self.assertTrue(len(res[0]["description"]) == 1) self.assertEqual(str(res[0]["description"][0]), "desc1") m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m.add(MessageElement("desc1", FLAG_MOD_DELETE, "description")) m.add(MessageElement("desc2", FLAG_MOD_ADD, "description")) ldb.modify(m) res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["description"]) self.assertTrue(len(res) == 1) self.assertTrue("description" in res[0]) self.assertTrue(len(res[0]["description"]) == 1) self.assertEqual(str(res[0]["description"][0]), "desc2") delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_fSMORoleOwner_attribute(self): """Test fSMORoleOwner attribute""" print("Test fSMORoleOwner attribute") ds_service_name = self.ldb.get_dsServiceName() # The "fSMORoleOwner" attribute can only be set to "nTDSDSA" entries, # invalid DNs return ERR_UNWILLING_TO_PERFORM try: self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "fSMORoleOwner": self.base_dn}) self.fail() except LdbError as e83: (num, _) = e83.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) try: self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "fSMORoleOwner": []}) self.fail() except LdbError as e84: (num, _) = e84.args self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) # We are able to set it to a valid "nTDSDSA" entry if the server is # capable of handling the role self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "fSMORoleOwner": ds_service_name}) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) self.ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m.add(MessageElement(self.base_dn, FLAG_MOD_REPLACE, "fSMORoleOwner")) try: ldb.modify(m) self.fail() except LdbError as e85: (num, _) = e85.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m.add(MessageElement([], FLAG_MOD_REPLACE, "fSMORoleOwner")) try: ldb.modify(m) self.fail() except LdbError as e86: (num, _) = e86.args self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) # We are able to set it to a valid "nTDSDSA" entry if the server is # capable of handling the role m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m.add(MessageElement(ds_service_name, FLAG_MOD_REPLACE, "fSMORoleOwner")) ldb.modify(m) # A clean-out works on plain entries, not master (schema, PDC...) DNs m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m.add(MessageElement([], FLAG_MOD_DELETE, "fSMORoleOwner")) ldb.modify(m) delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_protected_sid_objects(self): """Test deletion of objects with RID < 1000""" # a list of some well-known sids # objects in Builtin are already covered by objectclass protected_list = [ ["CN=Domain Admins", "CN=Users,"], ["CN=Schema Admins", "CN=Users,"], ["CN=Enterprise Admins", "CN=Users,"], ["CN=Administrator", "CN=Users,"], ["CN=Domain Controllers", "CN=Users,"], ["CN=Protected Users", "CN=Users,"], ] for pr_object in protected_list: try: self.ldb.delete(pr_object[0] + "," + pr_object[1] + self.base_dn) except LdbError as e7: (num, _) = e7.args self.assertEqual(num, ERR_OTHER) else: self.fail("Deleted " + pr_object[0]) try: self.ldb.rename(pr_object[0] + "," + pr_object[1] + self.base_dn, pr_object[0] + "2," + pr_object[1] + self.base_dn) except LdbError as e8: (num, _) = e8.args self.fail("Could not rename " + pr_object[0]) self.ldb.rename(pr_object[0] + "2," + pr_object[1] + self.base_dn, pr_object[0] + "," + pr_object[1] + self.base_dn) def test_new_user_default_attributes(self): """Test default attributes for new user objects""" print("Test default attributes for new User objects\n") user_name = "ldaptestuser" user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn) ldb.add({ "dn": user_dn, "objectclass": "user", "sAMAccountName": user_name}) res = ldb.search(user_dn, scope=SCOPE_BASE) self.assertTrue(len(res) == 1) user_obj = res[0] expected_attrs = {"primaryGroupID": MessageElement(["513"]), "logonCount": MessageElement(["0"]), "cn": MessageElement([user_name]), "countryCode": MessageElement(["0"]), "objectClass": MessageElement(["top", "person", "organizationalPerson", "user"]), "instanceType": MessageElement(["4"]), "distinguishedName": MessageElement([user_dn]), "sAMAccountType": MessageElement(["805306368"]), "objectSid": "**SKIP**", "whenCreated": "**SKIP**", "uSNCreated": "**SKIP**", "badPasswordTime": MessageElement(["0"]), "dn": Dn(ldb, user_dn), "pwdLastSet": MessageElement(["0"]), "sAMAccountName": MessageElement([user_name]), "objectCategory": MessageElement(["CN=Person,%s" % ldb.get_schema_basedn().get_linearized()]), "objectGUID": "**SKIP**", "whenChanged": "**SKIP**", "badPwdCount": MessageElement(["0"]), "accountExpires": MessageElement(["9223372036854775807"]), "name": MessageElement([user_name]), "codePage": MessageElement(["0"]), "userAccountControl": MessageElement(["546"]), "lastLogon": MessageElement(["0"]), "uSNChanged": "**SKIP**", "lastLogoff": MessageElement(["0"])} # assert we have expected attribute names actual_names = set(user_obj.keys()) # Samba does not use 'dSCorePropagationData', so skip it actual_names -= set(['dSCorePropagationData']) self.assertEqual(set(expected_attrs.keys()), actual_names, "Actual object does not have expected attributes") # check attribute values for name in expected_attrs.keys(): actual_val = user_obj.get(name) self.assertFalse(actual_val is None, "No value for attribute '%s'" % name) expected_val = expected_attrs[name] if expected_val == "**SKIP**": # "**ANY**" values means "any" continue self.assertEqual(expected_val, actual_val, "Unexpected value[%r] for '%s' expected[%r]" % (actual_val, name, expected_val)) # clean up delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) if "://" not in host: if os.path.isfile(host): host = "tdb://%s" % host else: host = "ldap://%s" % host ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) TestProgram(module=__name__, opts=subunitopts)