diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
commit | 4f5791ebd03eaec1c7da0865a383175b05102712 (patch) | |
tree | 8ce7b00f7a76baa386372422adebbe64510812d4 /source4/dsdb/tests/python/token_group.py | |
parent | Initial commit. (diff) | |
download | samba-upstream.tar.xz samba-upstream.zip |
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'source4/dsdb/tests/python/token_group.py')
-rwxr-xr-x | source4/dsdb/tests/python/token_group.py | 736 |
1 files changed, 736 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/token_group.py b/source4/dsdb/tests/python/token_group.py new file mode 100755 index 0000000..3d0cd5d --- /dev/null +++ b/source4/dsdb/tests/python/token_group.py @@ -0,0 +1,736 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# test tokengroups attribute against internal token calculation + +import optparse +import sys +import os + +sys.path.insert(0, "bin/python") +import samba + +from samba.tests.subunitrun import SubunitOptions, TestProgram + +import samba.getopt as options + +from samba.auth import system_session +from samba import ldb, dsdb +from samba.samdb import SamDB +from samba.auth import AuthContext +from samba.ndr import ndr_unpack +from samba import gensec +from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS, AUTO_USE_KERBEROS +from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP +import samba.tests +from samba.tests import delete_force +from samba.dcerpc import samr, security +from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES, AUTH_SESSION_INFO_NTLM + + +parser = optparse.OptionParser("token_group.py [options] <host>") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +url = args[0] + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) +creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + + +def closure(vSet, wSet, aSet): + for edge in aSet: + start, end = edge + if start in wSet: + if end not in wSet and end in vSet: + wSet.add(end) + closure(vSet, wSet, aSet) + + +class StaticTokenTest(samba.tests.TestCase): + + def setUp(self): + super(StaticTokenTest, self).setUp() + + self.assertNotEqual(creds.get_kerberos_state(), AUTO_USE_KERBEROS) + + self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) + self.base_dn = self.ldb.domain_dn() + + res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0])) + + session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS | + AUTH_SESSION_INFO_AUTHENTICATED | + AUTH_SESSION_INFO_SIMPLE_PRIVILEGES) + if creds.get_kerberos_state() == DONT_USE_KERBEROS: + session_info_flags |= AUTH_SESSION_INFO_NTLM + + session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn, + session_info_flags=session_info_flags) + + token = session.security_token + self.user_sids = [] + for s in token.sids: + self.user_sids.append(str(s)) + + # Add asserted identity for Kerberos + if creds.get_kerberos_state() == MUST_USE_KERBEROS: + self.user_sids.append(str(security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)) + + + def test_rootDSE_tokenGroups(self): + """Testing rootDSE tokengroups against internal calculation""" + if not url.startswith("ldap"): + self.fail(msg="This test is only valid on ldap") + + res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + print("Getting tokenGroups from rootDSE") + tokengroups = [] + for sid in res[0]['tokenGroups']: + tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) + + sidset1 = set(tokengroups) + sidset2 = set(self.user_sids) + if len(sidset1.symmetric_difference(sidset2)): + print("token sids don't match") + print("tokengroups: %s" % tokengroups) + print("calculated : %s" % self.user_sids) + print("difference : %s" % sidset1.symmetric_difference(sidset2)) + self.fail(msg="calculated groups don't match against rootDSE tokenGroups") + + def test_dn_tokenGroups(self): + print("Getting tokenGroups from user DN") + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + dn_tokengroups = [] + for sid in res[0]['tokenGroups']: + dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) + + sidset1 = set(dn_tokengroups) + sidset2 = set(self.user_sids) + + # The tokenGroups is just a subset of the user_sids + # so we don't check symmetric_difference() here. + if len(sidset1.difference(sidset2)): + print("dn token sids no subset of user token") + print("tokengroups: %s" % dn_tokengroups) + print("user sids : %s" % self.user_sids) + print("difference : %s" % sidset1.difference(sidset2)) + self.fail(msg="DN tokenGroups no subset of full user token") + + missing_sidset = sidset2.difference(sidset1) + + extra_sids = [] + extra_sids.append(self.user_sids[0]) + extra_sids.append(security.SID_WORLD) + extra_sids.append(security.SID_NT_NETWORK) + extra_sids.append(security.SID_NT_AUTHENTICATED_USERS) + extra_sids.append(security.SID_BUILTIN_PREW2K) + if creds.get_kerberos_state() == MUST_USE_KERBEROS: + extra_sids.append(security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY) + if creds.get_kerberos_state() == DONT_USE_KERBEROS: + extra_sids.append(security.SID_NT_NTLM_AUTHENTICATION) + + extra_sidset = set(extra_sids) + + if len(missing_sidset.symmetric_difference(extra_sidset)): + print("dn token sids unexpected") + print("tokengroups: %s" % dn_tokengroups) + print("user sids: %s" % self.user_sids) + print("actual difference: %s" % missing_sidset) + print("expected difference: %s" % extra_sidset) + print("unexpected difference : %s" % + missing_sidset.symmetric_difference(extra_sidset)) + self.fail(msg="DN tokenGroups unexpected difference to full user token") + + def test_pac_groups(self): + if creds.get_kerberos_state() != MUST_USE_KERBEROS: + self.skipTest("Kerberos disabled, skipping PAC test") + + settings = {} + settings["lp_ctx"] = lp + settings["target_hostname"] = lp.get("netbios name") + + gensec_client = gensec.Security.start_client(settings) + gensec_client.set_credentials(creds) + gensec_client.want_feature(gensec.FEATURE_SEAL) + gensec_client.start_mech_by_sasl_name("GSSAPI") + + auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[]) + + gensec_server = gensec.Security.start_server(settings, auth_context) + machine_creds = Credentials() + machine_creds.guess(lp) + machine_creds.set_machine_account(lp) + gensec_server.set_credentials(machine_creds) + + gensec_server.want_feature(gensec.FEATURE_SEAL) + gensec_server.start_mech_by_sasl_name("GSSAPI") + + client_finished = False + server_finished = False + server_to_client = b"" + + # Run the actual call loop. + while client_finished == False and server_finished == False: + if not client_finished: + print("running client gensec_update") + (client_finished, client_to_server) = gensec_client.update(server_to_client) + if not server_finished: + print("running server gensec_update") + (server_finished, server_to_client) = gensec_server.update(client_to_server) + + session = gensec_server.session_info() + + token = session.security_token + pac_sids = [] + for s in token.sids: + pac_sids.append(str(s)) + + sidset1 = set(pac_sids) + sidset2 = set(self.user_sids) + if len(sidset1.symmetric_difference(sidset2)): + print("token sids don't match") + print("pac sids: %s" % pac_sids) + print("user sids : %s" % self.user_sids) + print("difference : %s" % sidset1.symmetric_difference(sidset2)) + self.fail(msg="calculated groups don't match against user PAC tokenGroups") + + +class DynamicTokenTest(samba.tests.TestCase): + + def get_creds(self, target_username, target_password): + creds_tmp = Credentials() + creds_tmp.set_username(target_username) + creds_tmp.set_password(target_password) + creds_tmp.set_domain(creds.get_domain()) + creds_tmp.set_realm(creds.get_realm()) + creds_tmp.set_kerberos_state(creds.get_kerberos_state()) + creds_tmp.set_workstation(creds.get_workstation()) + creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() + | gensec.FEATURE_SEAL) + return creds_tmp + + def get_ldb_connection(self, target_username, target_password): + creds_tmp = self.get_creds(target_username, target_password) + ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp) + return ldb_target + + def setUp(self): + super(DynamicTokenTest, self).setUp() + + self.assertNotEqual(creds.get_kerberos_state(), AUTO_USE_KERBEROS) + + self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) + + self.base_dn = self.admin_ldb.domain_dn() + + self.test_user = "tokengroups_user1" + self.test_user_pass = "samba123@" + self.admin_ldb.newuser(self.test_user, self.test_user_pass) + self.test_group0 = "tokengroups_group0" + self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP) + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user], + add_members_operation=True) + + self.test_group1 = "tokengroups_group1" + self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP) + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user], + add_members_operation=True) + + self.test_group2 = "tokengroups_group2" + self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP) + + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user], + add_members_operation=True) + + self.test_group3 = "tokengroups_group3" + self.admin_ldb.newgroup(self.test_group3, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP) + + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group3, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group3_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group3, [self.test_group1], + add_members_operation=True) + + self.test_group4 = "tokengroups_group4" + self.admin_ldb.newgroup(self.test_group4, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP) + + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group4, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group4_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group4, [self.test_group3], + add_members_operation=True) + + self.test_group5 = "tokengroups_group5" + self.admin_ldb.newgroup(self.test_group5, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP) + + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group5, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group5_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group5, [self.test_group4], + add_members_operation=True) + + self.test_group6 = "tokengroups_group6" + self.admin_ldb.newgroup(self.test_group6, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP) + + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group6, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + self.test_group6_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + self.admin_ldb.add_remove_group_members(self.test_group6, [self.test_user], + add_members_operation=True) + + self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass) + + res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + self.user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]) + self.user_sid_dn = "<SID=%s>" % str(self.user_sid) + + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[]) + self.assertEqual(len(res), 1) + + self.test_user_dn = res[0].dn + + session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS | + AUTH_SESSION_INFO_AUTHENTICATED | + AUTH_SESSION_INFO_SIMPLE_PRIVILEGES) + + if creds.get_kerberos_state() == DONT_USE_KERBEROS: + session_info_flags |= AUTH_SESSION_INFO_NTLM + + session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn, + session_info_flags=session_info_flags) + + token = session.security_token + self.user_sids = [] + for s in token.sids: + self.user_sids.append(str(s)) + + # Add asserted identity for Kerberos + if creds.get_kerberos_state() == MUST_USE_KERBEROS: + self.user_sids.append(str(security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)) + + def tearDown(self): + super(DynamicTokenTest, self).tearDown() + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_user, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group0, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group1, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group2, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group3, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group4, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group5, "cn=users", self.base_dn)) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (self.test_group6, "cn=users", self.base_dn)) + + def test_rootDSE_tokenGroups(self): + """Testing rootDSE tokengroups against internal calculation""" + if not url.startswith("ldap"): + self.fail(msg="This test is only valid on ldap") + + res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + print("Getting tokenGroups from rootDSE") + tokengroups = [] + for sid in res[0]['tokenGroups']: + tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) + + sidset1 = set(tokengroups) + sidset2 = set(self.user_sids) + if len(sidset1.symmetric_difference(sidset2)): + print("token sids don't match") + print("tokengroups: %s" % tokengroups) + print("calculated : %s" % self.user_sids) + print("difference : %s" % sidset1.symmetric_difference(sidset2)) + self.fail(msg="calculated groups don't match against rootDSE tokenGroups") + + def test_dn_tokenGroups(self): + print("Getting tokenGroups from user DN") + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + dn_tokengroups = [] + for sid in res[0]['tokenGroups']: + dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) + + sidset1 = set(dn_tokengroups) + sidset2 = set(self.user_sids) + + # The tokenGroups is just a subset of the user_sids + # so we don't check symmetric_difference() here. + if len(sidset1.difference(sidset2)): + print("dn token sids no subset of user token") + print("tokengroups: %s" % dn_tokengroups) + print("user sids : %s" % self.user_sids) + print("difference : %s" % sidset1.difference(sidset2)) + self.fail(msg="DN tokenGroups no subset of full user token") + + missing_sidset = sidset2.difference(sidset1) + + extra_sids = [] + extra_sids.append(self.user_sids[0]) + extra_sids.append(security.SID_WORLD) + extra_sids.append(security.SID_NT_NETWORK) + extra_sids.append(security.SID_NT_AUTHENTICATED_USERS) + extra_sids.append(security.SID_BUILTIN_PREW2K) + if creds.get_kerberos_state() == MUST_USE_KERBEROS: + extra_sids.append(security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY) + if creds.get_kerberos_state() == DONT_USE_KERBEROS: + extra_sids.append(security.SID_NT_NTLM_AUTHENTICATION) + + extra_sidset = set(extra_sids) + + if len(missing_sidset.symmetric_difference(extra_sidset)): + print("dn token sids unexpected") + print("tokengroups: %s" % dn_tokengroups) + print("user sids: %s" % self.user_sids) + print("actual difference: %s" % missing_sidset) + print("expected difference: %s" % extra_sidset) + print("unexpected difference : %s" % + missing_sidset.symmetric_difference(extra_sidset)) + self.fail(msg="DN tokenGroups unexpected difference to full user token") + + def test_pac_groups(self): + if creds.get_kerberos_state() != MUST_USE_KERBEROS: + self.skipTest("Kerberos disabled, skipping PAC test") + + settings = {} + settings["lp_ctx"] = lp + settings["target_hostname"] = lp.get("netbios name") + + gensec_client = gensec.Security.start_client(settings) + gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass)) + gensec_client.want_feature(gensec.FEATURE_SEAL) + gensec_client.start_mech_by_sasl_name("GSSAPI") + + auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[]) + + gensec_server = gensec.Security.start_server(settings, auth_context) + machine_creds = Credentials() + machine_creds.guess(lp) + machine_creds.set_machine_account(lp) + gensec_server.set_credentials(machine_creds) + + gensec_server.want_feature(gensec.FEATURE_SEAL) + gensec_server.start_mech_by_sasl_name("GSSAPI") + + client_finished = False + server_finished = False + server_to_client = b"" + + # Run the actual call loop. + while client_finished == False and server_finished == False: + if not client_finished: + print("running client gensec_update") + (client_finished, client_to_server) = gensec_client.update(server_to_client) + if not server_finished: + print("running server gensec_update") + (server_finished, server_to_client) = gensec_server.update(client_to_server) + + session = gensec_server.session_info() + + token = session.security_token + pac_sids = [] + for s in token.sids: + pac_sids.append(str(s)) + + sidset1 = set(pac_sids) + sidset2 = set(self.user_sids) + if len(sidset1.symmetric_difference(sidset2)): + print("token sids don't match") + print("pac sids: %s" % pac_sids) + print("user sids : %s" % self.user_sids) + print("difference : %s" % sidset1.symmetric_difference(sidset2)) + self.fail(msg="calculated groups don't match against user PAC tokenGroups") + + def test_tokenGroups_manual(self): + # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3 + # and compare the result + res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, + expression="(|(objectclass=user)(objectclass=group))", + attrs=["memberOf"]) + aSet = set() + aSetR = set() + vSet = set() + for obj in res: + if "memberOf" in obj: + for dn in obj["memberOf"]: + first = obj.dn.get_casefold() + second = ldb.Dn(self.admin_ldb, dn.decode('utf8')).get_casefold() + aSet.add((first, second)) + aSetR.add((second, first)) + vSet.add(first) + vSet.add(second) + + res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, + expression="(objectclass=user)", + attrs=["primaryGroupID"]) + for obj in res: + if "primaryGroupID" in obj: + sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0])) + res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[]) + first = obj.dn.get_casefold() + second = res2[0].dn.get_casefold() + + aSet.add((first, second)) + aSetR.add((second, first)) + vSet.add(first) + vSet.add(second) + + wSet = set() + wSet.add(self.test_user_dn.get_casefold()) + closure(vSet, wSet, aSet) + wSet.remove(self.test_user_dn.get_casefold()) + + tokenGroupsSet = set() + + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) + self.assertEqual(len(res), 1) + + dn_tokengroups = [] + for sid in res[0]['tokenGroups']: + sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) + res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[]) + tokenGroupsSet.add(res3[0].dn.get_casefold()) + + if len(wSet.difference(tokenGroupsSet)): + self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet)) + + if len(tokenGroupsSet.difference(wSet)): + self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet)) + + def filtered_closure(self, wSet, filter_grouptype): + res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, + expression="(|(objectclass=user)(objectclass=group))", + attrs=["memberOf"]) + aSet = set() + aSetR = set() + vSet = set() + for obj in res: + vSet.add(obj.dn.get_casefold()) + if "memberOf" in obj: + for dn in obj["memberOf"]: + first = obj.dn.get_casefold() + second = ldb.Dn(self.admin_ldb, dn.decode('utf8')).get_casefold() + aSet.add((first, second)) + aSetR.add((second, first)) + vSet.add(first) + vSet.add(second) + + res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, + expression="(objectclass=user)", + attrs=["primaryGroupID"]) + for obj in res: + if "primaryGroupID" in obj: + sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0])) + res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[]) + first = obj.dn.get_casefold() + second = res2[0].dn.get_casefold() + + aSet.add((first, second)) + aSetR.add((second, first)) + vSet.add(first) + vSet.add(second) + + uSet = set() + for v in vSet: + res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE, + attrs=["groupType"], + expression="objectClass=group") + if len(res_group) == 1: + if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype): + uSet.add(v) + else: + uSet.add(v) + + closure(uSet, wSet, aSet) + + def test_tokenGroupsGlobalAndUniversal_manual(self): + # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3 + # and compare the result + + # The variable names come from MS-ADTS May 15, 2014 + + S = set() + S.add(self.test_user_dn.get_casefold()) + + self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP) + + T = set() + # Not really a SID, we do this on DNs... + for sid in S: + X = set() + X.add(sid) + self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP) + + T = T.union(X) + + T.remove(self.test_user_dn.get_casefold()) + + tokenGroupsSet = set() + + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"]) + self.assertEqual(len(res), 1) + + dn_tokengroups = [] + for sid in res[0]['tokenGroupsGlobalAndUniversal']: + sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) + res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[]) + tokenGroupsSet.add(res3[0].dn.get_casefold()) + + if len(T.difference(tokenGroupsSet)): + self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet)) + + if len(tokenGroupsSet.difference(T)): + self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T)) + + def test_samr_GetGroupsForUser(self): + # Confirm that we get the correct results against SAMR also + if not url.startswith("ldap://"): + self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)") + host = url.split("://")[1] + (domain_sid, user_rid) = self.user_sid.split() + samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds) + samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, + domain_sid) + user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid) + rids = samr_conn.GetGroupsForUser(user_handle) + samr_dns = set() + for rid in rids.rids: + self.assertEqual(rid.attributes, security.SE_GROUP_MANDATORY | security.SE_GROUP_ENABLED_BY_DEFAULT | security.SE_GROUP_ENABLED) + sid = "%s-%d" % (domain_sid, rid.rid) + res = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[]) + samr_dns.add(res[0].dn.get_casefold()) + + user_info = samr_conn.QueryUserInfo(user_handle, 1) + self.assertEqual(rids.rids[0].rid, user_info.primary_gid) + + tokenGroupsSet = set() + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"]) + for sid in res[0]['tokenGroupsGlobalAndUniversal']: + sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) + res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[], + expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))" + % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP)) + if len(res) == 1: + tokenGroupsSet.add(res3[0].dn.get_casefold()) + + if len(samr_dns.difference(tokenGroupsSet)): + self.fail(msg="additional samr_GetUserGroups over tokenGroups: %s" % samr_dns.difference(tokenGroupsSet)) + + memberOf = set() + # Add the primary group + primary_group_sid = "%s-%d" % (domain_sid, user_info.primary_gid) + res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, + attrs=[]) + + memberOf.add(res2[0].dn.get_casefold()) + res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["memberOf"]) + for dn in res[0]['memberOf']: + res3 = self.admin_ldb.search(base=dn, scope=ldb.SCOPE_BASE, + attrs=[], + expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))" + % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP)) + if len(res3) == 1: + memberOf.add(res3[0].dn.get_casefold()) + + if len(memberOf.difference(samr_dns)): + self.fail(msg="additional memberOf over samr_GetUserGroups: %s" % memberOf.difference(samr_dns)) + + if len(samr_dns.difference(memberOf)): + self.fail(msg="additional samr_GetUserGroups over memberOf: %s" % samr_dns.difference(memberOf)) + + S = set() + S.add(self.test_user_dn.get_casefold()) + + self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP) + self.filtered_closure(S, GTYPE_SECURITY_UNIVERSAL_GROUP) + + # Now remove the user DN and primary group + S.remove(self.test_user_dn.get_casefold()) + + if len(samr_dns.difference(S)): + self.fail(msg="additional samr_GetUserGroups over filtered_closure: %s" % samr_dns.difference(S)) + + def test_samr_GetGroupsForUser_nomember(self): + # Confirm that we get the correct results against SAMR also + if not url.startswith("ldap://"): + self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)") + host = url.split("://")[1] + + test_user = "tokengroups_user2" + self.admin_ldb.newuser(test_user, self.test_user_pass) + res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (test_user, self.base_dn), + attrs=["objectSid"], scope=ldb.SCOPE_BASE) + user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) + + (domain_sid, user_rid) = user_sid.split() + samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds) + samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, + domain_sid) + user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid) + rids = samr_conn.GetGroupsForUser(user_handle) + user_info = samr_conn.QueryUserInfo(user_handle, 1) + delete_force(self.admin_ldb, "CN=%s,%s,%s" % + (test_user, "cn=users", self.base_dn)) + self.assertEqual(len(rids.rids), 1) + self.assertEqual(rids.rids[0].rid, user_info.primary_gid) + + +if "://" not in url: + if os.path.isfile(url): + url = "tdb://%s" % url + else: + url = "ldap://%s" % url + +TestProgram(module=__name__, opts=subunitopts) |