diff options
Diffstat (limited to '')
-rwxr-xr-x | source4/dsdb/tests/python/sam.py | 3877 |
1 files changed, 3877 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/sam.py b/source4/dsdb/tests/python/sam.py new file mode 100755 index 0000000..abd6bb7 --- /dev/null +++ b/source4/dsdb/tests/python/sam.py @@ -0,0 +1,3877 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# This is a port of the original in testprogs/ejs/ldap.js + +import optparse +import sys +import os +import time + +sys.path.insert(0, "bin/python") +import samba +from samba.tests.subunitrun import SubunitOptions, TestProgram + +import samba.getopt as options + +from samba.credentials import Credentials, DONT_USE_KERBEROS +from samba.auth import system_session +from samba.common import get_string + +from ldb import SCOPE_BASE, LdbError +from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS +from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM +from ldb import ERR_OTHER, ERR_NO_SUCH_ATTRIBUTE +from ldb import ERR_OBJECT_CLASS_VIOLATION +from ldb import ERR_CONSTRAINT_VIOLATION +from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE +from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS +from ldb import ERR_INVALID_CREDENTIALS +from ldb import ERR_STRONG_AUTH_REQUIRED +from ldb import Message, MessageElement, Dn +from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE +from samba.samdb import SamDB +from samba.dsdb import (UF_NORMAL_ACCOUNT, UF_ACCOUNTDISABLE, + UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT, + UF_PARTIAL_SECRETS_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT, + UF_INTERDOMAIN_TRUST_ACCOUNT, UF_SMARTCARD_REQUIRED, + UF_PASSWD_NOTREQD, UF_LOCKOUT, UF_PASSWORD_EXPIRED, ATYPE_NORMAL_ACCOUNT, + GTYPE_SECURITY_BUILTIN_LOCAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP, + GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP, + GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP, GTYPE_DISTRIBUTION_GLOBAL_GROUP, + GTYPE_DISTRIBUTION_UNIVERSAL_GROUP, + ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_UNIVERSAL_GROUP, + ATYPE_SECURITY_LOCAL_GROUP, ATYPE_DISTRIBUTION_GLOBAL_GROUP, + ATYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_DISTRIBUTION_LOCAL_GROUP, + ATYPE_WORKSTATION_TRUST) +from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_ADMINS, + DOMAIN_RID_DOMAIN_MEMBERS, DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS) + +from samba.ndr import ndr_unpack +from samba.dcerpc import drsblobs +from samba.dcerpc import drsuapi +from samba.dcerpc import security +from samba.tests import delete_force +from samba import gensec +from samba import werror + +parser = optparse.OptionParser("sam.py [options] <host>") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +host = args[0] + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) +creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + + +class SamTests(samba.tests.TestCase): + + def setUp(self): + super(SamTests, self).setUp() + self.ldb = ldb + self.base_dn = ldb.domain_dn() + + print("baseDN: %s\n" % self.base_dn) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptest\,specialuser,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestcomputer2,cn=computers," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + + def test_users_groups(self): + """This tests the SAM users and groups behaviour""" + print("Testing users and groups behaviour\n") + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group"}) + + ldb.add({ + "dn": "cn=ldaptestgroup2,cn=users," + self.base_dn, + "objectclass": "group"}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["objectSID"]) + self.assertTrue(len(res1) == 1) + obj_sid = get_string(ldb.schema_format_value("objectSID", + res1[0]["objectSID"][0])) + group_rid_1 = security.dom_sid(obj_sid).split()[1] + + res1 = ldb.search("cn=ldaptestgroup2,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["objectSID"]) + self.assertTrue(len(res1) == 1) + obj_sid = get_string(ldb.schema_format_value("objectSID", + res1[0]["objectSID"][0])) + group_rid_2 = security.dom_sid(obj_sid).split()[1] + + # Try to create a user with an invalid account name + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "sAMAccountName": "administrator"}) + self.fail() + except LdbError as e9: + (num, _) = e9.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Try to create a user with an invalid account name + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "sAMAccountName": []}) + self.fail() + except LdbError as e10: + (num, _) = e10.args + self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Try to create a user with an invalid primary group + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "primaryGroupID": "0"}) + self.fail() + except LdbError as e11: + (num, _) = e11.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Try to Create a user with a valid primary group + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "primaryGroupID": str(group_rid_1)}) + self.fail() + except LdbError as e12: + (num, _) = e12.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Test to see how we should behave when the user account doesn't + # exist + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, + "primaryGroupID") + try: + ldb.modify(m) + self.fail() + except LdbError as e13: + (num, _) = e13.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + # Test to see how we should behave when the account isn't a user + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, + "primaryGroupID") + try: + ldb.modify(m) + self.fail() + except LdbError as e14: + (num, _) = e14.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + + # Test default primary groups on add operations + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # unfortunately the INTERDOMAIN_TRUST_ACCOUNT case cannot be tested + # since such accounts aren't directly creatable (ACCESS_DENIED) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | + UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), + DOMAIN_RID_DOMAIN_MEMBERS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT | + UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DCS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Read-only DC accounts are only creatable by + # UF_WORKSTATION_TRUST_ACCOUNT and work only on DCs >= 2008 (therefore + # we have a fallback in the assertion) + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_PARTIAL_SECRETS_ACCOUNT | + UF_WORKSTATION_TRUST_ACCOUNT | + UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertTrue(int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_READONLY_DCS or + int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_DOMAIN_MEMBERS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Test default primary groups on modify operations + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | + UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, + "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) + + # unfortunately the INTERDOMAIN_TRUST_ACCOUNT case cannot be tested + # since such accounts aren't directly creatable (ACCESS_DENIED) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_NORMAL_ACCOUNT | + UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT | + UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, + "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT | + UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, + "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DCS) + + # Read-only DC accounts are only creatable by + # UF_WORKSTATION_TRUST_ACCOUNT and work only on DCs >= 2008 (therefore + # we have a fallback in the assertion) + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_PARTIAL_SECRETS_ACCOUNT | + UF_WORKSTATION_TRUST_ACCOUNT | + UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, + "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertTrue(int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_READONLY_DCS or + int(res1[0]["primaryGroupID"][0]) == DOMAIN_RID_DOMAIN_MEMBERS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Recreate account for further tests + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + # Try to set an invalid account name + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["sAMAccountName"] = MessageElement("administrator", FLAG_MOD_REPLACE, + "sAMAccountName") + try: + ldb.modify(m) + self.fail() + except LdbError as e15: + (num, _) = e15.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + + # But to reset the actual "sAMAccountName" should still be possible + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountName"]) + self.assertTrue(len(res1) == 1) + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["sAMAccountName"] = MessageElement(res1[0]["sAMAccountName"][0], FLAG_MOD_REPLACE, + "sAMAccountName") + ldb.modify(m) + + # And another (free) name should be possible as well + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["sAMAccountName"] = MessageElement("xxx_ldaptestuser_xxx", FLAG_MOD_REPLACE, + "sAMAccountName") + ldb.modify(m) + + # We should be able to reset our actual primary group + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS), FLAG_MOD_REPLACE, + "primaryGroupID") + ldb.modify(m) + + # Try to add invalid primary group + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, + "primaryGroupID") + try: + ldb.modify(m) + self.fail() + except LdbError as e16: + (num, _) = e16.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Try to make group 1 primary - should be denied since it is not yet + # secondary + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(group_rid_1), + FLAG_MOD_REPLACE, "primaryGroupID") + try: + ldb.modify(m) + self.fail() + except LdbError as e17: + (num, _) = e17.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Make group 1 secondary + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_REPLACE, "member") + ldb.modify(m) + + # Make group 1 primary + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(group_rid_1), + FLAG_MOD_REPLACE, "primaryGroupID") + ldb.modify(m) + + # Try to delete group 1 - should be denied + try: + ldb.delete("cn=ldaptestgroup,cn=users," + self.base_dn) + self.fail() + except LdbError as e18: + (num, _) = e18.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + + # Try to add group 1 also as secondary - should be denied + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e19: + (num, _) = e19.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + + # Try to add invalid member to group 1 - should be denied + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["member"] = MessageElement( + "cn=ldaptestuser3,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e20: + (num, _) = e20.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + # Make group 2 secondary + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + ldb.modify(m) + + # Swap the groups + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(group_rid_2), + FLAG_MOD_REPLACE, "primaryGroupID") + ldb.modify(m) + + # Swap the groups (does not really make sense but does the same) + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(group_rid_1), + FLAG_MOD_REPLACE, "primaryGroupID") + m["primaryGroupID"] = MessageElement(str(group_rid_2), + FLAG_MOD_REPLACE, "primaryGroupID") + ldb.modify(m) + + # Old primary group should contain a "member" attribute for the user, + # the new shouldn't contain anymore one + res1 = ldb.search("cn=ldaptestgroup, cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["member"]) + self.assertTrue(len(res1) == 1) + self.assertTrue(len(res1[0]["member"]) == 1) + self.assertEqual(str(res1[0]["member"][0]).lower(), + ("cn=ldaptestuser,cn=users," + self.base_dn).lower()) + + res1 = ldb.search("cn=ldaptestgroup2, cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["member"]) + self.assertTrue(len(res1) == 1) + self.assertFalse("member" in res1[0]) + + # Primary group member + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_DELETE, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e21: + (num, _) = e21.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Delete invalid group member + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser1,cn=users," + self.base_dn, + FLAG_MOD_DELETE, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e22: + (num, _) = e22.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Also this should be denied + try: + ldb.add({ + "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, + "objectclass": "user", + "primaryGroupID": "0"}) + self.fail() + except LdbError as e23: + (num, _) = e23.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Recreate user accounts + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + ldb.add({ + "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, + "objectclass": "user"}) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + ldb.modify(m) + + # Already added + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e24: + (num, _) = e24.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + + # Already added, but as <SID=...> + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["objectSid"]) + self.assertTrue(len(res1) == 1) + sid_bin = res1[0]["objectSid"][0] + sid_str = ("<SID=" + get_string(ldb.schema_format_value("objectSid", sid_bin)) + ">").upper() + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement(sid_str, FLAG_MOD_ADD, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e25: + (num, _) = e25.args + self.assertEqual(num, ERR_ENTRY_ALREADY_EXISTS) + + # Invalid member + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser1,cn=users," + self.base_dn, + FLAG_MOD_REPLACE, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e26: + (num, _) = e26.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + # Invalid member + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement(["cn=ldaptestuser,cn=users," + self.base_dn, + "cn=ldaptestuser1,cn=users," + self.base_dn], + FLAG_MOD_REPLACE, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e27: + (num, _) = e27.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + # Invalid member + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, + FLAG_MOD_REPLACE, "member") + m["member"] = MessageElement("cn=ldaptestuser1,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e28: + (num, _) = e28.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + m["member"] = MessageElement(["cn=ldaptestuser,cn=users," + self.base_dn, + "cn=ldaptestuser2,cn=users," + self.base_dn], + FLAG_MOD_REPLACE, "member") + ldb.modify(m) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) + + # Make also a small test for accounts with special DNs ("," in this case) + ldb.add({ + "dn": "cn=ldaptest\,specialuser,cn=users," + self.base_dn, + "objectclass": "user"}) + delete_force(self.ldb, "cn=ldaptest\,specialuser,cn=users," + self.base_dn) + + def test_sam_attributes(self): + """Test the behaviour of special attributes of SAM objects""" + print("Testing the behaviour of special attributes of SAM objects\n") + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group"}) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement(str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_ADD, + "groupType") + try: + ldb.modify(m) + self.fail() + except LdbError as e29: + (num, _) = e29.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + # Delete protection tests + + for attr in ["nTSecurityDescriptor", "objectSid", "sAMAccountType", + "sAMAccountName", "groupType"]: + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m[attr] = MessageElement([], FLAG_MOD_REPLACE, attr) + try: + ldb.modify(m) + self.fail() + except LdbError as e: + (num, _) = e.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m[attr] = MessageElement([], FLAG_MOD_DELETE, attr) + try: + ldb.modify(m) + self.fail() + except LdbError as e1: + (num, _) = e1.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement("513", FLAG_MOD_ADD, + "primaryGroupID") + try: + ldb.modify(m) + self.fail() + except LdbError as e30: + (num, _) = e30.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | + UF_PASSWD_NOTREQD), + FLAG_MOD_ADD, + "userAccountControl") + try: + ldb.modify(m) + self.fail() + except LdbError as e31: + (num, _) = e31.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["objectSid"] = MessageElement("xxxxxxxxxxxxxxxx", FLAG_MOD_ADD, + "objectSid") + try: + ldb.modify(m) + self.fail() + except LdbError as e32: + (num, _) = e32.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["sAMAccountType"] = MessageElement("0", FLAG_MOD_ADD, + "sAMAccountType") + try: + ldb.modify(m) + self.fail() + except LdbError as e33: + (num, _) = e33.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["sAMAccountName"] = MessageElement("test", FLAG_MOD_ADD, + "sAMAccountName") + try: + ldb.modify(m) + self.fail() + except LdbError as e34: + (num, _) = e34.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + # Delete protection tests + + for attr in ["nTSecurityDescriptor", "objectSid", "sAMAccountType", + "sAMAccountName", "primaryGroupID", "userAccountControl", + "accountExpires", "badPasswordTime", "badPwdCount", + "codePage", "countryCode", "lastLogoff", "lastLogon", + "logonCount", "pwdLastSet"]: + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m[attr] = MessageElement([], FLAG_MOD_REPLACE, attr) + try: + ldb.modify(m) + self.fail() + except LdbError as e2: + (num, _) = e2.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m[attr] = MessageElement([], FLAG_MOD_DELETE, attr) + try: + ldb.modify(m) + self.fail() + except LdbError as e3: + (num, _) = e3.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + def test_primary_group_token_constructed(self): + """Test the primary group token behaviour (hidden-generated-readonly attribute on groups) and some other constructed attributes""" + print("Testing primary group token behaviour and other constructed attributes\n") + + try: + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "primaryGroupToken": "100"}) + self.fail() + except LdbError as e35: + (num, _) = e35.args + self.assertEqual(num, ERR_UNDEFINED_ATTRIBUTE_TYPE) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group"}) + + # Testing for one invalid, and one valid operational attribute, but also the things they are built from + res1 = ldb.search(self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupToken", "canonicalName", "objectClass", "objectSid"]) + self.assertTrue(len(res1) == 1) + self.assertFalse("primaryGroupToken" in res1[0]) + self.assertTrue("canonicalName" in res1[0]) + self.assertTrue("objectClass" in res1[0]) + self.assertTrue("objectSid" in res1[0]) + + res1 = ldb.search(self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupToken", "canonicalName"]) + self.assertTrue(len(res1) == 1) + self.assertFalse("primaryGroupToken" in res1[0]) + self.assertFalse("objectSid" in res1[0]) + self.assertFalse("objectClass" in res1[0]) + self.assertTrue("canonicalName" in res1[0]) + + res1 = ldb.search("cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupToken"]) + self.assertTrue(len(res1) == 1) + self.assertFalse("primaryGroupToken" in res1[0]) + + res1 = ldb.search("cn=ldaptestuser, cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupToken"]) + self.assertTrue(len(res1) == 1) + self.assertFalse("primaryGroupToken" in res1[0]) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE) + self.assertTrue(len(res1) == 1) + self.assertFalse("primaryGroupToken" in res1[0]) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["primaryGroupToken", "objectSID"]) + self.assertTrue(len(res1) == 1) + primary_group_token = int(res1[0]["primaryGroupToken"][0]) + + obj_sid = get_string(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])) + rid = security.dom_sid(obj_sid).split()[1] + self.assertEqual(primary_group_token, rid) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["primaryGroupToken"] = "100" + try: + ldb.modify(m) + self.fail() + except LdbError as e36: + (num, _) = e36.args + self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + def test_tokenGroups(self): + """Test the tokenGroups behaviour (hidden-generated-readonly attribute on SAM objects)""" + print("Testing tokenGroups behaviour\n") + + # The domain object shouldn't contain any "tokenGroups" entry + res = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) + self.assertTrue(len(res) == 1) + self.assertFalse("tokenGroups" in res[0]) + + # The domain administrator should contain "tokenGroups" entries + # (the exact number depends on the domain/forest function level and the + # DC software versions) + res = ldb.search("cn=Administrator,cn=Users," + self.base_dn, + scope=SCOPE_BASE, attrs=["tokenGroups"]) + self.assertTrue(len(res) == 1) + self.assertTrue("tokenGroups" in res[0]) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + # This testuser should contain at least two "tokenGroups" entries + # (exactly two on an unmodified "Domain Users" and "Users" group) + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["tokenGroups"]) + self.assertTrue(len(res) == 1) + self.assertTrue(len(res[0]["tokenGroups"]) >= 2) + + # one entry which we need to find should point to domains "Domain Users" + # group and another entry should point to the builtin "Users"group + domain_users_group_found = False + users_group_found = False + for sid in res[0]["tokenGroups"]: + obj_sid = get_string(ldb.schema_format_value("objectSID", sid)) + rid = security.dom_sid(obj_sid).split()[1] + if rid == 513: + domain_users_group_found = True + if rid == 545: + users_group_found = True + + self.assertTrue(domain_users_group_found) + self.assertTrue(users_group_found) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_groupType(self): + """Test the groupType behaviour""" + print("Testing groupType behaviour\n") + + # You can never create or change to a + # "GTYPE_SECURITY_BUILTIN_LOCAL_GROUP" + + # Add operation + + # Invalid attribute + try: + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": "0"}) + self.fail() + except LdbError as e37: + (num, _) = e37.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + try: + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP)}) + self.fail() + except LdbError as e38: + (num, _) = e38.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_SECURITY_GLOBAL_GROUP)}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_SECURITY_UNIVERSAL_GROUP)}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_UNIVERSAL_GROUP) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_LOCAL_GROUP) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_DISTRIBUTION_GLOBAL_GROUP)}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_GLOBAL_GROUP) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP)}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "groupType": str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)}) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_LOCAL_GROUP) + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + # Modify operation + + ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group"}) + + # We can change in this direction: global <-> universal <-> local + # On each step also the group type itself (security/distribution) is + # variable. + + # After creation we should have a "security global group" + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + # Invalid attribute + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement("0", + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e39: + (num, _) = e39.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Security groups + + # Default is "global group" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + # Change to "local" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e40: + (num, _) = e40.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_UNIVERSAL_GROUP) + + # Change back to "global" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + # Change back to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_UNIVERSAL_GROUP) + + # Change to "local" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_LOCAL_GROUP) + + # Change to "global" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e41: + (num, _) = e41.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change to "builtin local" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e42: + (num, _) = e42.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + # Change back to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_UNIVERSAL_GROUP) + + # Change to "builtin local" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e43: + (num, _) = e43.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change back to "global" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + # Change to "builtin local" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e44: + (num, _) = e44.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Distribution groups + + # Default is "global group" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_GLOBAL_GROUP) + + # Change to local (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e45: + (num, _) = e45.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) + + # Change back to "global" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_GLOBAL_GROUP) + + # Change back to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) + + # Change to "local" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_LOCAL_GROUP) + + # Change to "global" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e46: + (num, _) = e46.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change back to "universal" + + # Try to add invalid member to group 1 - should be denied + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["member"] = MessageElement( + "cn=ldaptestuser3,cn=users," + self.base_dn, + FLAG_MOD_ADD, "member") + try: + ldb.modify(m) + self.fail() + except LdbError as e47: + (num, _) = e47.args + self.assertEqual(num, ERR_NO_SUCH_OBJECT) + + # Make group 2 secondary + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) + + # Change back to "global" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_GLOBAL_GROUP) + + # Both group types: this performs only random checks - all possibilities + # would require too much code. + + # Default is "global group" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + # Change to "local" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e48: + (num, _) = e48.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) + + # Change back to "global" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + # Change back to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_UNIVERSAL_GROUP) + + # Change to "local" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_DISTRIBUTION_LOCAL_GROUP) + + # Change to "global" (shouldn't work) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + self.fail() + except LdbError as e49: + (num, _) = e49.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # Change back to "universal" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_UNIVERSAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_UNIVERSAL_GROUP) + + # Change back to "global" + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["groupType"] = MessageElement( + str(GTYPE_SECURITY_GLOBAL_GROUP), + FLAG_MOD_REPLACE, "groupType") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_SECURITY_GLOBAL_GROUP) + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + def test_pwdLastSet(self): + """Test the pwdLastSet behaviour""" + print("Testing pwdLastSet behaviour\n") + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "pwdLastSet": "0"}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "pwdLastSet": "-1"}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0) + lastset = int(res1[0]["pwdLastSet"][0]) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "pwdLastSet": str(1)}) + self.fail() + except LdbError as e50: + (num, msg) = e50.args + self.assertEqual(num, ERR_OTHER) + self.assertTrue('00000057' in msg) + + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "pwdLastSet": str(lastset)}) + self.fail() + except LdbError as e51: + (num, msg) = e51.args + self.assertEqual(num, ERR_OTHER) + self.assertTrue('00000057' in msg) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(0), + FLAG_MOD_REPLACE, + "pwdLastSet") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(0), + FLAG_MOD_DELETE, + "pwdLastSet") + m["pls2"] = MessageElement(str(0), + FLAG_MOD_ADD, + "pwdLastSet") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(-1), + FLAG_MOD_REPLACE, + "pwdLastSet") + ldb.modify(m) + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + self.assertGreater(int(res1[0]["pwdLastSet"][0]), lastset) + lastset = int(res1[0]["pwdLastSet"][0]) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(0), + FLAG_MOD_DELETE, + "pwdLastSet") + m["pls2"] = MessageElement(str(0), + FLAG_MOD_ADD, + "pwdLastSet") + ldb.modify(m) + self.fail() + except LdbError as e52: + (num, msg) = e52.args + self.assertEqual(num, ERR_NO_SUCH_ATTRIBUTE) + self.assertTrue('00002085' in msg) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(-1), + FLAG_MOD_DELETE, + "pwdLastSet") + m["pls2"] = MessageElement(str(0), + FLAG_MOD_ADD, + "pwdLastSet") + ldb.modify(m) + self.fail() + except LdbError as e53: + (num, msg) = e53.args + self.assertEqual(num, ERR_NO_SUCH_ATTRIBUTE) + self.assertTrue('00002085' in msg) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(lastset), + FLAG_MOD_DELETE, + "pwdLastSet") + m["pls2"] = MessageElement(str(-1), + FLAG_MOD_ADD, + "pwdLastSet") + time.sleep(0.2) + ldb.modify(m) + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), lastset) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(lastset), + FLAG_MOD_DELETE, + "pwdLastSet") + m["pls2"] = MessageElement(str(lastset), + FLAG_MOD_ADD, + "pwdLastSet") + ldb.modify(m) + self.fail() + except LdbError as e54: + (num, msg) = e54.args + self.assertEqual(num, ERR_OTHER) + self.assertTrue('00000057' in msg) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(lastset), + FLAG_MOD_DELETE, + "pwdLastSet") + m["pls2"] = MessageElement(str(0), + FLAG_MOD_ADD, + "pwdLastSet") + ldb.modify(m) + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + uac = int(res1[0]["userAccountControl"][0]) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["uac1"] = MessageElement(str(uac |UF_PASSWORD_EXPIRED), + FLAG_MOD_REPLACE, + "userAccountControl") + ldb.modify(m) + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_ldap_bind_must_change_pwd(self): + """Test the error messages for failing LDAP binds""" + print("Test the error messages for failing LDAP binds\n") + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def format_error_msg(hresult_v, dsid_v, werror_v): + # + # There are 4 lower case hex digits following 'v' at the end, + # but different Windows Versions return different values: + # + # Windows 2008R2 uses 'v1db1' + # Windows 2012R2 uses 'v2580' + # + return "%08X: LdapErr: DSID-%08X, comment: AcceptSecurityContext error, data %x, v" % ( + hresult_v, dsid_v, werror_v) + + HRES_SEC_E_LOGON_DENIED = 0x8009030C + HRES_SEC_E_INVALID_TOKEN = 0x80090308 + + sasl_bind_dsid = 0x0C0904DC + simple_bind_dsid = 0x0C0903A9 + + error_msg_sasl_wrong_pw = format_error_msg( + HRES_SEC_E_LOGON_DENIED, + sasl_bind_dsid, + werror.WERR_LOGON_FAILURE) + error_msg_sasl_must_change = format_error_msg( + HRES_SEC_E_LOGON_DENIED, + sasl_bind_dsid, + werror.WERR_PASSWORD_MUST_CHANGE) + error_msg_simple_wrong_pw = format_error_msg( + HRES_SEC_E_INVALID_TOKEN, + simple_bind_dsid, + werror.WERR_LOGON_FAILURE) + error_msg_simple_must_change = format_error_msg( + HRES_SEC_E_INVALID_TOKEN, + simple_bind_dsid, + werror.WERR_PASSWORD_MUST_CHANGE) + + username = "ldaptestuser" + password = "thatsAcomplPASS2" + utf16pw = ('"' + password + '"').encode('utf-16-le') + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "sAMAccountName": username, + "userAccountControl": str(UF_NORMAL_ACCOUNT), + "unicodePwd": utf16pw, + }) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountName", "sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["sAMAccountName"][0]), username) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT) + self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0) + + # Open a second LDB connection with the user credentials. Use the + # command line credentials for information like the domain, the realm + # and the workstation. + sasl_creds = Credentials() + sasl_creds.set_username(username) + sasl_creds.set_password(password) + sasl_creds.set_domain(creds.get_domain()) + sasl_creds.set_workstation(creds.get_workstation()) + sasl_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + sasl_creds.set_kerberos_state(DONT_USE_KERBEROS) + + sasl_wrong_creds = Credentials() + sasl_wrong_creds.set_username(username) + sasl_wrong_creds.set_password("wrong") + sasl_wrong_creds.set_domain(creds.get_domain()) + sasl_wrong_creds.set_workstation(creds.get_workstation()) + sasl_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + sasl_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS) + + simple_creds = Credentials() + simple_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn) + simple_creds.set_username(username) + simple_creds.set_password(password) + simple_creds.set_domain(creds.get_domain()) + simple_creds.set_workstation(creds.get_workstation()) + simple_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + simple_creds.set_kerberos_state(DONT_USE_KERBEROS) + + simple_wrong_creds = Credentials() + simple_wrong_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn) + simple_wrong_creds.set_username(username) + simple_wrong_creds.set_password("wrong") + simple_wrong_creds.set_domain(creds.get_domain()) + simple_wrong_creds.set_workstation(creds.get_workstation()) + simple_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + simple_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS) + + sasl_ldb = SamDB(url=host, credentials=sasl_creds, lp=lp) + self.assertIsNotNone(sasl_ldb) + sasl_ldb = None + + requires_strong_auth = False + try: + simple_ldb = SamDB(url=host, credentials=simple_creds, lp=lp) + self.assertIsNotNone(simple_ldb) + simple_ldb = None + except LdbError as e55: + (num, msg) = e55.args + if num != ERR_STRONG_AUTH_REQUIRED: + raise + requires_strong_auth = True + + def assertLDAPErrorMsg(msg, expected_msg): + self.assertTrue(expected_msg in msg, + "msg[%s] does not contain expected[%s]" % ( + msg, expected_msg)) + + try: + ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp) + self.fail() + except LdbError as e56: + (num, msg) = e56.args + self.assertEqual(num, ERR_INVALID_CREDENTIALS) + self.assertTrue(error_msg_sasl_wrong_pw in msg) + + if not requires_strong_auth: + try: + ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp) + self.fail() + except LdbError as e4: + (num, msg) = e4.args + self.assertEqual(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(0), + FLAG_MOD_REPLACE, + "pwdLastSet") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["pwdLastSet"]) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) + + try: + ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp) + self.fail() + except LdbError as e57: + (num, msg) = e57.args + self.assertEqual(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_sasl_wrong_pw) + + try: + ldb_fail = SamDB(url=host, credentials=sasl_creds, lp=lp) + self.fail() + except LdbError as e58: + (num, msg) = e58.args + self.assertEqual(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_sasl_must_change) + + if not requires_strong_auth: + try: + ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp) + self.fail() + except LdbError as e5: + (num, msg) = e5.args + self.assertEqual(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw) + + try: + ldb_fail = SamDB(url=host, credentials=simple_creds, lp=lp) + self.fail() + except LdbError as e6: + (num, msg) = e6.args + self.assertEqual(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_simple_must_change) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_0_uac(self): + """Test the userAccountControl behaviour""" + print("Testing userAccountControl behaviour\n") + + # With a user object + + # Add operation + + # As user you can only set a normal account. + # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a + # password yet. + # With SYSTEM rights you can set a interdomain trust account. + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": "0"}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_normal(self): + """Test the userAccountControl behaviour""" + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT)}) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_normal_pwnotreq(self): + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_normal_pwnotreq_lockout_expired(self): + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT | + UF_PASSWD_NOTREQD | + UF_LOCKOUT | + UF_PASSWORD_EXPIRED)}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) + self.assertFalse("lockoutTime" in res1[0]) + self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_temp_dup(self): + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_TEMP_DUPLICATE_ACCOUNT)}) + self.fail() + except LdbError as e59: + (num, _) = e59.args + self.assertEqual(num, ERR_OTHER) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_server(self): + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) + self.fail() + except LdbError as e60: + (num, _) = e60.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_workstation(self): + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) + except LdbError as e61: + (num, _) = e61.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_rodc(self): + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)}) + except LdbError as e62: + (num, _) = e62.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_userAccountControl_user_add_trust(self): + try: + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) + self.fail() + except LdbError as e63: + (num, _) = e63.args + self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + # Modify operation + + def test_userAccountControl_user_modify(self): + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + # After creation we should have a normal account + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) + + # As user you can only switch from a normal account to a workstation + # trust account and back. + # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a + # password yet. + # With SYSTEM rights you can switch to a interdomain trust account. + + # Invalid attribute + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement("0", + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + except LdbError as e64: + (num, _) = e64.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + except LdbError as e65: + (num, _) = e65.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_ACCOUNTDISABLE), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime") + m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_LOCKOUT | UF_PASSWORD_EXPIRED), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) + self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0) + self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_TEMP_DUPLICATE_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + self.fail() + except LdbError as e66: + (num, _) = e66.args + self.assertEqual(num, ERR_OTHER) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_SERVER_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + self.fail() + except LdbError as e67: + (num, _) = e67.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + self.fail() + except LdbError as e68: + (num, _) = e68.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_INTERDOMAIN_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + self.fail() + except LdbError as e69: + (num, _) = e69.args + self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) + + def test_userAccountControl_computer_add_0_uac(self): + # With a computer object + + # Add operation + + # As computer you can set a normal account and a server trust account. + # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a + # password yet. + # With SYSTEM rights you can set a interdomain trust account. + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": "0"}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_normal(self): + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_NORMAL_ACCOUNT)}) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_normal_pwnotreqd(self): + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_normal_pwnotreqd_lockout_expired(self): + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_NORMAL_ACCOUNT | + UF_PASSWD_NOTREQD | + UF_LOCKOUT | + UF_PASSWORD_EXPIRED)}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) + self.assertFalse("lockoutTime" in res1[0]) + self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_temp_dup(self): + try: + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_TEMP_DUPLICATE_ACCOUNT)}) + self.fail() + except LdbError as e70: + (num, _) = e70.args + self.assertEqual(num, ERR_OTHER) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_server(self): + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_workstation(self): + try: + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) + except LdbError as e71: + (num, _) = e71.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_add_trust(self): + try: + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) + self.fail() + except LdbError as e72: + (num, _) = e72.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_userAccountControl_computer_modify(self): + # Modify operation + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer"}) + + # After creation we should have a normal account + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) + + # As computer you can switch from a normal account to a workstation + # or server trust account and back (also swapping between trust + # accounts is allowed). + # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a + # password yet. + # With SYSTEM rights you can switch to a interdomain trust account. + + # Invalid attribute + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement("0", + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + except LdbError as e73: + (num, _) = e73.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + except LdbError as e74: + (num, _) = e74.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_ACCOUNTDISABLE), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime") + m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_LOCKOUT | UF_PASSWORD_EXPIRED), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) + self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0) + self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_TEMP_DUPLICATE_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + self.fail() + except LdbError as e75: + (num, _) = e75.args + self.assertEqual(num, ERR_OTHER) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_SERVER_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_SERVER_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountType"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), + ATYPE_WORKSTATION_TRUST) + + try: + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_INTERDOMAIN_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + self.fail() + except LdbError as e76: + (num, _) = e76.args + self.assertEqual(num, ERR_OBJECT_CLASS_VIOLATION) + + # "primaryGroupID" does not change if account type remains the same + + # For a user account + + ldb.add({ + "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT | + UF_PASSWD_NOTREQD | + UF_ACCOUNTDISABLE)}) + + res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["userAccountControl"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE) + + m = Message() + m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_ADMINS) + ">") + m["member"] = MessageElement( + "cn=ldaptestuser2,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_ADMINS), + FLAG_MOD_REPLACE, "primaryGroupID") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["userAccountControl", "primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_ADMINS) + + # For a workstation account + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS) + + m = Message() + m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_USERS) + ">") + m["member"] = MessageElement( + "cn=ldaptestcomputer,cn=computers," + self.base_dn, FLAG_MOD_ADD, "member") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS), + FLAG_MOD_REPLACE, "primaryGroupID") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["primaryGroupID"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def find_repl_meta_data(self, rpmd, attid): + for i in range(0, rpmd.ctr.count): + m = rpmd.ctr.array[i] + if m.attid == attid: + return m + return None + + def test_smartcard_required1(self): + """Test the UF_SMARTCARD_REQUIRED behaviour""" + print("Testing UF_SMARTCARD_REQUIRED behaviour\n") + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT), + "unicodePwd": "\"thatsAcomplPASS2\"".encode('utf-16-le') + }) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT) + self.assertNotEqual(int(res[0]["pwdLastSet"][0]), 0) + lastset = int(res[0]["pwdLastSet"][0]) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 1) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 1) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 1) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 1) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNotNone(spcbmd) + self.assertEqual(spcbmd.version, 1) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED) + self.assertEqual(int(res[0]["pwdLastSet"][0]), lastset) + lastset1 = int(res[0]["pwdLastSet"][0]) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 2) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 2) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 2) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 2) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 2) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNotNone(spcbmd) + self.assertEqual(spcbmd.version, 2) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_smartcard_required2(self): + """Test the UF_SMARTCARD_REQUIRED behaviour""" + print("Testing UF_SMARTCARD_REQUIRED behaviour\n") + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE), + }) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE) + self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) + self.assertTrue("msDS-KeyVersionNumber" in res[0]) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 1) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 1) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 1) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 1) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNone(spcbmd) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE |UF_SMARTCARD_REQUIRED), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT |UF_ACCOUNTDISABLE |UF_SMARTCARD_REQUIRED) + self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 2) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 2) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 2) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 2) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 2) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNotNone(spcbmd) + self.assertEqual(spcbmd.version, 1) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED) + self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 2) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 2) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 2) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 2) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 2) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNotNone(spcbmd) + self.assertEqual(spcbmd.version, 1) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_smartcard_required3(self): + """Test the UF_SMARTCARD_REQUIRED behaviour""" + print("Testing UF_SMARTCARD_REQUIRED behaviour\n") + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "userAccountControl": str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED |UF_ACCOUNTDISABLE), + }) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED |UF_ACCOUNTDISABLE) + self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 1) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 1) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 1) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 1) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNotNone(spcbmd) + self.assertEqual(spcbmd.version, 1) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountType", "userAccountControl", + "pwdLastSet", "msDS-KeyVersionNumber", + "replPropertyMetaData"]) + self.assertTrue(len(res) == 1) + self.assertEqual(int(res[0]["sAMAccountType"][0]), + ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res[0]["userAccountControl"][0]), + UF_NORMAL_ACCOUNT |UF_SMARTCARD_REQUIRED) + self.assertEqual(int(res[0]["pwdLastSet"][0]), 0) + self.assertEqual(int(res[0]["msDS-KeyVersionNumber"][0]), 1) + self.assertTrue(len(res[0]["replPropertyMetaData"]) == 1) + rpmd = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + res[0]["replPropertyMetaData"][0]) + lastsetmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_pwdLastSet) + self.assertIsNotNone(lastsetmd) + self.assertEqual(lastsetmd.version, 1) + nthashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_unicodePwd) + self.assertIsNotNone(nthashmd) + self.assertEqual(nthashmd.version, 1) + nthistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_ntPwdHistory) + self.assertIsNotNone(nthistmd) + self.assertEqual(nthistmd.version, 1) + lmhashmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_dBCSPwd) + self.assertIsNotNone(lmhashmd) + self.assertEqual(lmhashmd.version, 1) + lmhistmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_lmPwdHistory) + self.assertIsNotNone(lmhistmd) + self.assertEqual(lmhistmd.version, 1) + spcbmd = self.find_repl_meta_data(rpmd, + drsuapi.DRSUAPI_ATTID_supplementalCredentials) + self.assertIsNotNone(spcbmd) + self.assertEqual(spcbmd.version, 1) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_isCriticalSystemObject_user(self): + """Test the isCriticalSystemObject behaviour""" + print("Testing isCriticalSystemObject behaviour\n") + + # Add tests (of a user) + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user"}) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertTrue("isCriticalSystemObject" not in res1[0]) + + # Modification tests + m = Message() + + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertTrue("isCriticalSystemObject" in res1[0]) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def test_isCriticalSystemObject(self): + """Test the isCriticalSystemObject behaviour""" + print("Testing isCriticalSystemObject behaviour\n") + + # Add tests + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer"}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertTrue("isCriticalSystemObject" in res1[0]) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") + + # Modification tests + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement( + str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "TRUE") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), + FLAG_MOD_REPLACE, "userAccountControl") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, + attrs=["isCriticalSystemObject"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(str(res1[0]["isCriticalSystemObject"][0]), "FALSE") + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_service_principal_name_updates(self): + """Test the servicePrincipalNames update behaviour""" + print("Testing servicePrincipalNames update behaviour\n") + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "dNSHostName": "testname.testdom"}) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertFalse("servicePrincipalName" in res[0]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "servicePrincipalName": "HOST/testname.testdom"}) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["dNSHostName"]) + self.assertTrue(len(res) == 1) + self.assertFalse("dNSHostName" in res[0]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "dNSHostName": "testname2.testdom", + "servicePrincipalName": "HOST/testname.testdom"}) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["dNSHostName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname.testdom") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname.testdoM", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname.testdom") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname2.testdom2", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname2.testdom2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement([], + FLAG_MOD_DELETE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname2.testdom2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname.testdom3", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname2.testdom2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname2.testdom2", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname3.testdom3", + FLAG_MOD_REPLACE, "dNSHostName") + m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom2", + FLAG_MOD_REPLACE, + "servicePrincipalName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname3.testdom3") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom2", + FLAG_MOD_REPLACE, + "servicePrincipalName") + m["dNSHostName"] = MessageElement("testname4.testdom4", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname2.testdom2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["servicePrincipalName"] = MessageElement([], + FLAG_MOD_DELETE, + "servicePrincipalName") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname2.testdom2", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertFalse("servicePrincipalName" in res[0]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "sAMAccountName": "testname$"}) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertFalse("servicePrincipalName" in res[0]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "servicePrincipalName": "HOST/testname"}) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountName"]) + self.assertTrue(len(res) == 1) + self.assertTrue("sAMAccountName" in res[0]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "sAMAccountName": "testname$", + "servicePrincipalName": "HOST/testname"}) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["sAMAccountName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname$") + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("testnamE$", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("testname", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("test$name$", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/test$name") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("testname2", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("testname3", + FLAG_MOD_REPLACE, "sAMAccountName") + m["servicePrincipalName"] = MessageElement("HOST/testname2", + FLAG_MOD_REPLACE, + "servicePrincipalName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname3") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["servicePrincipalName"] = MessageElement("HOST/testname2", + FLAG_MOD_REPLACE, + "servicePrincipalName") + m["sAMAccountName"] = MessageElement("testname4", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["servicePrincipalName"][0]), + "HOST/testname2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["servicePrincipalName"] = MessageElement([], + FLAG_MOD_DELETE, + "servicePrincipalName") + ldb.modify(m) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("testname2", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertFalse("servicePrincipalName" in res[0]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "dNSHostName": "testname.testdom", + "sAMAccountName": "testname$", + "servicePrincipalName": ["HOST/testname.testdom", "HOST/testname"] + }) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname2.testdom", + FLAG_MOD_REPLACE, "dNSHostName") + m["sAMAccountName"] = MessageElement("testname2$", + FLAG_MOD_REPLACE, "sAMAccountName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") + self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") + self.assertTrue(str(res[0]["servicePrincipalName"][0]) == "HOST/testname2" or + str(res[0]["servicePrincipalName"][1]) == "HOST/testname2") + self.assertTrue(str(res[0]["servicePrincipalName"][0]) == "HOST/testname2.testdom" or + str(res[0]["servicePrincipalName"][1]) == "HOST/testname2.testdom") + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "dNSHostName": "testname.testdom", + "sAMAccountName": "testname$", + "servicePrincipalName": ["HOST/testname.testdom", "HOST/testname"] + }) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["sAMAccountName"] = MessageElement("testname2$", + FLAG_MOD_REPLACE, "sAMAccountName") + m["dNSHostName"] = MessageElement("testname2.testdom", + FLAG_MOD_REPLACE, "dNSHostName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") + self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") + self.assertTrue(len(res[0]["servicePrincipalName"]) == 2) + self.assertTrue("HOST/testname2" in [str(x) for x in res[0]["servicePrincipalName"]]) + self.assertTrue("HOST/testname2.testdom" in [str(x) for x in res[0]["servicePrincipalName"]]) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom", + FLAG_MOD_ADD, "servicePrincipalName") + try: + ldb.modify(m) + self.fail() + except LdbError as e77: + (num, _) = e77.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["servicePrincipalName"] = MessageElement("HOST/testname3", + FLAG_MOD_ADD, "servicePrincipalName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["dNSHostName"][0]), "testname2.testdom") + self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") + self.assertTrue(len(res[0]["servicePrincipalName"]) == 3) + self.assertTrue("HOST/testname2" in [str(x) for x in res[0]["servicePrincipalName"]]) + self.assertTrue("HOST/testname3" in [str(x) for x in res[0]["servicePrincipalName"]]) + self.assertTrue("HOST/testname2.testdom" in [str(x) for x in res[0]["servicePrincipalName"]]) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + m["dNSHostName"] = MessageElement("testname3.testdom", + FLAG_MOD_REPLACE, "dNSHostName") + m["servicePrincipalName"] = MessageElement("HOST/testname3.testdom", + FLAG_MOD_ADD, "servicePrincipalName") + ldb.modify(m) + + res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, + scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) + self.assertTrue(len(res) == 1) + self.assertEqual(str(res[0]["dNSHostName"][0]), "testname3.testdom") + self.assertEqual(str(res[0]["sAMAccountName"][0]), "testname2$") + self.assertTrue(len(res[0]["servicePrincipalName"]) == 3) + self.assertTrue("HOST/testname2" in [str(x) for x in res[0]["servicePrincipalName"]]) + self.assertTrue("HOST/testname3" in [str(x) for x in res[0]["servicePrincipalName"]]) + self.assertTrue("HOST/testname3.testdom" in [str(x) for x in res[0]["servicePrincipalName"]]) + + delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) + + def test_service_principal_name_uniqueness(self): + """Test the servicePrincipalName uniqueness behaviour""" + print("Testing servicePrincipalName uniqueness behaviour") + + ldb.add({ + "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, + "objectclass": "computer", + "servicePrincipalName": "HOST/testname.testdom"}) + + try: + ldb.add({ + "dn": "cn=ldaptestcomputer2,cn=computers," + self.base_dn, + "objectclass": "computer", + "servicePrincipalName": "HOST/testname.testdom"}) + except LdbError as e: + num, _ = e.args + self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) + else: + self.fail() + + def test_sam_description_attribute(self): + """Test SAM description attribute""" + print("Test SAM description attribute") + + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "description": "desc1" + }) + + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertTrue("description" in res[0]) + self.assertTrue(len(res[0]["description"]) == 1) + self.assertEqual(str(res[0]["description"][0]), "desc1") + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "description": ["desc1", "desc2"]}) + + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertTrue("description" in res[0]) + self.assertTrue(len(res[0]["description"]) == 2) + self.assertTrue(str(res[0]["description"][0]) == "desc1" or + str(res[0]["description"][1]) == "desc1") + self.assertTrue(str(res[0]["description"][0]) == "desc2" or + str(res[0]["description"][1]) == "desc2") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_REPLACE, + "description") + try: + ldb.modify(m) + self.fail() + except LdbError as e78: + (num, _) = e78.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_DELETE, + "description") + ldb.modify(m) + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group"}) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement("desc1", FLAG_MOD_REPLACE, + "description") + ldb.modify(m) + + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertTrue("description" in res[0]) + self.assertTrue(len(res[0]["description"]) == 1) + self.assertEqual(str(res[0]["description"][0]), "desc1") + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "description": ["desc1", "desc2"]}) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement("desc1", FLAG_MOD_REPLACE, + "description") + ldb.modify(m) + + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertTrue("description" in res[0]) + self.assertTrue(len(res[0]["description"]) == 1) + self.assertEqual(str(res[0]["description"][0]), "desc1") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement("desc3", FLAG_MOD_ADD, + "description") + try: + ldb.modify(m) + self.fail() + except LdbError as e79: + (num, _) = e79.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_DELETE, + "description") + try: + ldb.modify(m) + self.fail() + except LdbError as e80: + (num, _) = e80.args + self.assertEqual(num, ERR_NO_SUCH_ATTRIBUTE) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement("desc1", FLAG_MOD_DELETE, + "description") + ldb.modify(m) + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertFalse("description" in res[0]) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_REPLACE, + "description") + try: + ldb.modify(m) + self.fail() + except LdbError as e81: + (num, _) = e81.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement(["desc3", "desc4"], FLAG_MOD_ADD, + "description") + try: + ldb.modify(m) + self.fail() + except LdbError as e82: + (num, _) = e82.args + self.assertEqual(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m["description"] = MessageElement("desc1", FLAG_MOD_ADD, + "description") + ldb.modify(m) + + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertTrue("description" in res[0]) + self.assertTrue(len(res[0]["description"]) == 1) + self.assertEqual(str(res[0]["description"][0]), "desc1") + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m.add(MessageElement("desc1", FLAG_MOD_DELETE, "description")) + m.add(MessageElement("desc2", FLAG_MOD_ADD, "description")) + ldb.modify(m) + + res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["description"]) + self.assertTrue(len(res) == 1) + self.assertTrue("description" in res[0]) + self.assertTrue(len(res[0]["description"]) == 1) + self.assertEqual(str(res[0]["description"][0]), "desc2") + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + def test_fSMORoleOwner_attribute(self): + """Test fSMORoleOwner attribute""" + print("Test fSMORoleOwner attribute") + + ds_service_name = self.ldb.get_dsServiceName() + + # The "fSMORoleOwner" attribute can only be set to "nTDSDSA" entries, + # invalid DNs return ERR_UNWILLING_TO_PERFORM + + try: + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "fSMORoleOwner": self.base_dn}) + self.fail() + except LdbError as e83: + (num, _) = e83.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + try: + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "fSMORoleOwner": []}) + self.fail() + except LdbError as e84: + (num, _) = e84.args + self.assertEqual(num, ERR_CONSTRAINT_VIOLATION) + + # We are able to set it to a valid "nTDSDSA" entry if the server is + # capable of handling the role + + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group", + "fSMORoleOwner": ds_service_name}) + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + self.ldb.add({ + "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, + "objectclass": "group"}) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m.add(MessageElement(self.base_dn, FLAG_MOD_REPLACE, "fSMORoleOwner")) + try: + ldb.modify(m) + self.fail() + except LdbError as e85: + (num, _) = e85.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m.add(MessageElement([], FLAG_MOD_REPLACE, "fSMORoleOwner")) + try: + ldb.modify(m) + self.fail() + except LdbError as e86: + (num, _) = e86.args + self.assertEqual(num, ERR_UNWILLING_TO_PERFORM) + + # We are able to set it to a valid "nTDSDSA" entry if the server is + # capable of handling the role + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m.add(MessageElement(ds_service_name, FLAG_MOD_REPLACE, "fSMORoleOwner")) + ldb.modify(m) + + # A clean-out works on plain entries, not master (schema, PDC...) DNs + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + m.add(MessageElement([], FLAG_MOD_DELETE, "fSMORoleOwner")) + ldb.modify(m) + + delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) + + def test_protected_sid_objects(self): + """Test deletion of objects with RID < 1000""" + # a list of some well-known sids + # objects in Builtin are already covered by objectclass + protected_list = [ + ["CN=Domain Admins", "CN=Users,"], + ["CN=Schema Admins", "CN=Users,"], + ["CN=Enterprise Admins", "CN=Users,"], + ["CN=Administrator", "CN=Users,"], + ["CN=Domain Controllers", "CN=Users,"], + ["CN=Protected Users", "CN=Users,"], + ] + + for pr_object in protected_list: + try: + self.ldb.delete(pr_object[0] + "," + pr_object[1] + self.base_dn) + except LdbError as e7: + (num, _) = e7.args + self.assertEqual(num, ERR_OTHER) + else: + self.fail("Deleted " + pr_object[0]) + + try: + self.ldb.rename(pr_object[0] + "," + pr_object[1] + self.base_dn, + pr_object[0] + "2," + pr_object[1] + self.base_dn) + except LdbError as e8: + (num, _) = e8.args + self.fail("Could not rename " + pr_object[0]) + + self.ldb.rename(pr_object[0] + "2," + pr_object[1] + self.base_dn, + pr_object[0] + "," + pr_object[1] + self.base_dn) + + def test_new_user_default_attributes(self): + """Test default attributes for new user objects""" + print("Test default attributes for new User objects\n") + + user_name = "ldaptestuser" + user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn) + ldb.add({ + "dn": user_dn, + "objectclass": "user", + "sAMAccountName": user_name}) + + res = ldb.search(user_dn, scope=SCOPE_BASE) + self.assertTrue(len(res) == 1) + user_obj = res[0] + + expected_attrs = {"primaryGroupID": MessageElement(["513"]), + "logonCount": MessageElement(["0"]), + "cn": MessageElement([user_name]), + "countryCode": MessageElement(["0"]), + "objectClass": MessageElement(["top", "person", "organizationalPerson", "user"]), + "instanceType": MessageElement(["4"]), + "distinguishedName": MessageElement([user_dn]), + "sAMAccountType": MessageElement(["805306368"]), + "objectSid": "**SKIP**", + "whenCreated": "**SKIP**", + "uSNCreated": "**SKIP**", + "badPasswordTime": MessageElement(["0"]), + "dn": Dn(ldb, user_dn), + "pwdLastSet": MessageElement(["0"]), + "sAMAccountName": MessageElement([user_name]), + "objectCategory": MessageElement(["CN=Person,%s" % ldb.get_schema_basedn().get_linearized()]), + "objectGUID": "**SKIP**", + "whenChanged": "**SKIP**", + "badPwdCount": MessageElement(["0"]), + "accountExpires": MessageElement(["9223372036854775807"]), + "name": MessageElement([user_name]), + "codePage": MessageElement(["0"]), + "userAccountControl": MessageElement(["546"]), + "lastLogon": MessageElement(["0"]), + "uSNChanged": "**SKIP**", + "lastLogoff": MessageElement(["0"])} + # assert we have expected attribute names + actual_names = set(user_obj.keys()) + # Samba does not use 'dSCorePropagationData', so skip it + actual_names -= set(['dSCorePropagationData']) + self.assertEqual(set(expected_attrs.keys()), actual_names, "Actual object does not have expected attributes") + # check attribute values + for name in expected_attrs.keys(): + actual_val = user_obj.get(name) + self.assertFalse(actual_val is None, "No value for attribute '%s'" % name) + expected_val = expected_attrs[name] + if expected_val == "**SKIP**": + # "**ANY**" values means "any" + continue + self.assertEqual(expected_val, actual_val, + "Unexpected value[%r] for '%s' expected[%r]" % + (actual_val, name, expected_val)) + # clean up + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + +if "://" not in host: + if os.path.isfile(host): + host = "tdb://%s" % host + else: + host = "ldap://%s" % host + +ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) + +TestProgram(module=__name__, opts=subunitopts) |