#!/usr/bin/env python3 # -*- coding: utf-8 -*- # This tests the restrictions on userAccountControl that apply even if write access is permitted # # Copyright Samuel Cabrero 2014 # Copyright Andrew Bartlett 2014 # # Licenced under the GPLv3 # import optparse import sys import unittest import samba import samba.getopt as options import samba.tests import ldb sys.path.insert(0, "bin/python") from samba.tests import DynamicTestCase from samba.subunit.run import SubunitTestRunner from samba.samdb import SamDB from samba.dcerpc import security from samba.credentials import Credentials from samba.ndr import ndr_pack from samba.tests import delete_force from samba import gensec, sd_utils from samba.credentials import DONT_USE_KERBEROS from ldb import SCOPE_BASE, LdbError from samba.dsdb import ( UF_NORMAL_ACCOUNT, UF_PARTIAL_SECRETS_ACCOUNT, UF_PASSWD_NOTREQD, UF_SERVER_TRUST_ACCOUNT, UF_TRUSTED_FOR_DELEGATION, UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION, UF_WORKSTATION_TRUST_ACCOUNT, ) parser = optparse.OptionParser("priv_attrs.py [options] ") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) parser.add_option_group(options.VersionOptions(parser)) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) opts, args = parser.parse_args() if len(args) < 1: parser.print_usage() sys.exit(1) host = args[0] if "://" not in host: ldaphost = "ldap://%s" % host else: ldaphost = host start = host.rindex("://") host = host.lstrip(start + 3) lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) """ Check the combinations of: rodc kdc a2d2 useraccountcontrol (trusted for delegation) sidHistory x add modify(replace) modify(add) x sd WP on add cc default perms admin created, WP to user x computer user """ attrs = {"sidHistory": {"value": ndr_pack(security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)), "priv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS}, "msDS-AllowedToDelegateTo": {"value": f"host/{host}", "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS}, "userAccountControl-a2d-user": {"attr": "userAccountControl", "value": str(UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION|UF_NORMAL_ACCOUNT|UF_PASSWD_NOTREQD), "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS}, "userAccountControl-a2d-computer": {"attr": "userAccountControl", "value": str(UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION|UF_WORKSTATION_TRUST_ACCOUNT), "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, "only-1": "computer"}, # This flag makes many legitimate authenticated clients # send a forwardable ticket-granting-ticket to the server "userAccountControl-t4d-user": {"attr": "userAccountControl", "value": str(UF_TRUSTED_FOR_DELEGATION|UF_NORMAL_ACCOUNT|UF_PASSWD_NOTREQD), "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS}, "userAccountControl-t4d-computer": {"attr": "userAccountControl", "value": str(UF_TRUSTED_FOR_DELEGATION|UF_WORKSTATION_TRUST_ACCOUNT), "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, "only-1": "computer"}, "userAccountControl-DC": {"attr": "userAccountControl", "value": str(UF_SERVER_TRUST_ACCOUNT), "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, "only-2": "computer"}, "userAccountControl-RODC": {"attr": "userAccountControl", "value": str(UF_PARTIAL_SECRETS_ACCOUNT|UF_WORKSTATION_TRUST_ACCOUNT), "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, "only-1": "computer"}, "msDS-SecondaryKrbTgtNumber": {"value": "65536", "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS}, "primaryGroupID": {"value": str(security.DOMAIN_RID_ADMINS), "priv-error": ldb.ERR_UNWILLING_TO_PERFORM, "unpriv-add-error": ldb.ERR_UNWILLING_TO_PERFORM, "unpriv-error": ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS} } @DynamicTestCase class PrivAttrsTests(samba.tests.TestCase): def get_creds(self, target_username, target_password): creds_tmp = Credentials() creds_tmp.set_username(target_username) creds_tmp.set_password(target_password) creds_tmp.set_domain(creds.get_domain()) creds_tmp.set_realm(creds.get_realm()) creds_tmp.set_workstation(creds.get_workstation()) creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL) creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop return creds_tmp def assertGotLdbError(self, wanted, got): if not self.strict_checking: self.assertNotEqual(got, ldb.SUCCESS) else: self.assertEqual(got, wanted) def setUp(self): super().setUp() strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True) if strict_checking is None: strict_checking = '1' self.strict_checking = bool(int(strict_checking)) self.admin_creds = creds self.admin_samdb = SamDB(url=ldaphost, credentials=self.admin_creds, lp=lp) self.domain_sid = security.dom_sid(self.admin_samdb.get_domain_sid()) self.base_dn = self.admin_samdb.domain_dn() self.unpriv_user = "testuser1" self.unpriv_user_pw = "samba123@" self.unpriv_creds = self.get_creds(self.unpriv_user, self.unpriv_user_pw) self.admin_sd_utils = sd_utils.SDUtils(self.admin_samdb) self.test_ou_name = "OU=test_priv_attrs" self.test_ou = self.test_ou_name + "," + self.base_dn delete_force(self.admin_samdb, self.test_ou, controls=["tree_delete:0"]) self.admin_samdb.create_ou(self.test_ou) expected_user_dn = f"CN={self.unpriv_user},{self.test_ou_name},{self.base_dn}" self.admin_samdb.newuser(self.unpriv_user, self.unpriv_user_pw, userou=self.test_ou_name) res = self.admin_samdb.search(expected_user_dn, scope=SCOPE_BASE, attrs=["objectSid"]) self.assertEqual(1, len(res)) self.unpriv_user_dn = res[0].dn self.addCleanup(delete_force, self.admin_samdb, self.unpriv_user_dn, controls=["tree_delete:0"]) self.unpriv_user_sid = self.admin_sd_utils.get_object_sid(self.unpriv_user_dn) self.unpriv_samdb = SamDB(url=ldaphost, credentials=self.unpriv_creds, lp=lp) @classmethod def setUpDynamicTestCases(cls): for test_name in attrs.keys(): for add_or_mod in ["add", "mod-del-add", "mod-replace"]: for permission in ["admin-add", "CC"]: for sd in ["default", "WP"]: for objectclass in ["computer", "user"]: tname = f"{test_name}_{add_or_mod}_{permission}_{sd}_{objectclass}" targs = (test_name, add_or_mod, permission, sd, objectclass) cls.generate_dynamic_test("test_priv_attr", tname, *targs) def add_computer_ldap(self, computername, others=None, samdb=None): dn = "CN=%s,%s" % (computername, self.test_ou) samaccountname = "%s$" % computername msg_dict = { "dn": dn, "objectclass": "computer"} if others is not None: msg_dict = dict(list(msg_dict.items()) + list(others.items())) msg = ldb.Message.from_dict(samdb, msg_dict) msg["sAMAccountName"] = samaccountname print("Adding computer account %s" % computername) try: samdb.add(msg) except ldb.LdbError: print(msg) raise return msg.dn def add_user_ldap(self, username, others=None, samdb=None): dn = "CN=%s,%s" % (username, self.test_ou) samaccountname = "%s$" % username msg_dict = { "dn": dn, "objectclass": "user"} if others is not None: msg_dict = dict(list(msg_dict.items()) + list(others.items())) msg = ldb.Message.from_dict(samdb, msg_dict) msg["sAMAccountName"] = samaccountname print("Adding user account %s" % username) try: samdb.add(msg) except ldb.LdbError: print(msg) raise return msg.dn def add_thing_ldap(self, user, others, samdb, objectclass): if objectclass == "user": dn = self.add_user_ldap(user, others, samdb=samdb) elif objectclass == "computer": dn = self.add_computer_ldap(user, others, samdb=samdb) return dn def _test_priv_attr_with_args(self, test_name, add_or_mod, permission, sd, objectclass): user="privattrs" if "attr" in attrs[test_name]: attr = attrs[test_name]["attr"] else: attr = test_name if add_or_mod == "add": others = {attr: attrs[test_name]["value"]} else: others = {} if permission == "CC": samdb = self.unpriv_samdb # Set CC on container to allow user add mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.unpriv_user_sid) self.admin_sd_utils.dacl_add_ace(self.test_ou, mod) mod = "(OA;CI;CC;bf967a86-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.unpriv_user_sid) self.admin_sd_utils.dacl_add_ace(self.test_ou, mod) else: samdb = self.admin_samdb if sd == "WP": # Set SD to WP to the target user as part of add sd = "O:%sG:DUD:(OA;CIID;RPWP;;;%s)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;%s)" % (self.unpriv_user_sid, self.unpriv_user_sid, self.unpriv_user_sid) tmp_desc = security.descriptor.from_sddl(sd, self.domain_sid) others["ntSecurityDescriptor"] = ndr_pack(tmp_desc) if add_or_mod == "add": # only-1 and only-2 are due to windows behaviour if "only-1" in attrs[test_name] and \ attrs[test_name]["only-1"] != objectclass: try: dn = self.add_thing_ldap(user, others, samdb, objectclass) self.fail(f"{test_name}: Unexpectedly able to set {attr} on new {objectclass} as ADMIN (should fail LDAP_OBJECT_CLASS_VIOLATION)") except LdbError as e5: (enum, estr) = e5.args self.assertGotLdbError(ldb.ERR_OBJECT_CLASS_VIOLATION, enum) elif permission == "CC": try: dn = self.add_thing_ldap(user, others, samdb, objectclass) self.fail(f"{test_name}: Unexpectedly able to set {attr} on new {objectclass}") except LdbError as e5: (enum, estr) = e5.args if "unpriv-add-error" in attrs[test_name]: self.assertGotLdbError(attrs[test_name]["unpriv-add-error"], enum) else: self.assertGotLdbError(attrs[test_name]["unpriv-error"], enum) elif "only-2" in attrs[test_name] and \ attrs[test_name]["only-2"] != objectclass: try: dn = self.add_thing_ldap(user, others, samdb, objectclass) self.fail(f"{test_name}: Unexpectedly able to set {attr} on new {objectclass} as ADMIN (should fail LDAP_OBJECT_CLASS_VIOLATION)") except LdbError as e5: (enum, estr) = e5.args self.assertGotLdbError(ldb.ERR_OBJECT_CLASS_VIOLATION, enum) elif "priv-error" in attrs[test_name]: try: dn = self.add_thing_ldap(user, others, samdb, objectclass) self.fail(f"{test_name}: Unexpectedly able to set {attr} on new {objectclass} as ADMIN") except LdbError as e5: (enum, estr) = e5.args self.assertGotLdbError(attrs[test_name]["priv-error"], enum) else: try: dn = self.add_thing_ldap(user, others, samdb, objectclass) except LdbError as e5: (enum, estr) = e5.args self.fail(f"Failed to add account {user} as objectclass {objectclass}") else: try: dn = self.add_thing_ldap(user, others, samdb, objectclass) except LdbError as e5: (enum, estr) = e5.args self.fail(f"Failed to add account {user} as objectclass {objectclass}") if add_or_mod == "add": return m = ldb.Message() m.dn = dn # Do modify if add_or_mod == "mod-del-add": m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, attr) m["1"] = ldb.MessageElement(attrs[test_name]["value"], ldb.FLAG_MOD_ADD, attr) else: m["0"] = ldb.MessageElement(attrs[test_name]["value"], ldb.FLAG_MOD_REPLACE, attr) try: self.unpriv_samdb.modify(m) self.fail(f"{test_name}: Unexpectedly able to set {attr} on {m.dn}") except LdbError as e5: (enum, estr) = e5.args self.assertGotLdbError(attrs[test_name]["unpriv-error"], enum) runner = SubunitTestRunner() rc = 0 if not runner.run(unittest.TestLoader().loadTestsFromTestCase( PrivAttrsTests)).wasSuccessful(): rc = 1 sys.exit(rc)