summaryrefslogtreecommitdiffstats
path: root/source4/torture/drs/python/repl_rodc.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
commit8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch)
tree4099e8021376c7d8c05bdf8503093d80e9c7bad0 /source4/torture/drs/python/repl_rodc.py
parentInitial commit. (diff)
downloadsamba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz
samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'source4/torture/drs/python/repl_rodc.py')
-rw-r--r--source4/torture/drs/python/repl_rodc.py735
1 files changed, 735 insertions, 0 deletions
diff --git a/source4/torture/drs/python/repl_rodc.py b/source4/torture/drs/python/repl_rodc.py
new file mode 100644
index 0000000..ab3d6fa
--- /dev/null
+++ b/source4/torture/drs/python/repl_rodc.py
@@ -0,0 +1,735 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Test replication scenarios involving an RODC
+#
+# Copyright (C) Catalyst.Net Ltd. 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# Usage:
+# export DC1=dc1_dns_name
+# export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
+# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
+# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+#
+
+import drs_base
+import samba.tests
+import ldb
+
+from samba import WERRORError
+from samba.join import DCJoinContext
+from samba.dcerpc import drsuapi, misc, drsblobs, security
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.samdb import dsdb_Dn
+from samba.credentials import Credentials
+
+import random
+import time
+
+
+def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=None):
+ '''get a list of attributes for RODC replication'''
+ if exceptions is None:
+ exceptions = []
+
+ partial_attribute_set = drsuapi.DsPartialAttributeSet()
+ partial_attribute_set.version = 1
+
+ attids = []
+
+ # the exact list of attids we send is quite critical. Note that
+ # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
+ # to zero them out
+ schema_dn = samdb.get_schema_basedn()
+ res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="objectClass=attributeSchema",
+ attrs=["lDAPDisplayName", "systemFlags",
+ "searchFlags"])
+
+ for r in res:
+ ldap_display_name = str(r["lDAPDisplayName"][0])
+ if "systemFlags" in r:
+ system_flags = str(r["systemFlags"][0])
+ if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
+ samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
+ continue
+ if "searchFlags" in r:
+ search_flags = str(r["searchFlags"][0])
+ if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
+ continue
+ try:
+ attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
+ if attid not in exceptions:
+ attids.append(int(attid))
+ except:
+ pass
+
+ # the attids do need to be sorted, or windows doesn't return
+ # all the attributes we need
+ attids.sort()
+ partial_attribute_set.attids = attids
+ partial_attribute_set.num_attids = len(attids)
+ return partial_attribute_set
+
+
+class DrsRodcTestCase(drs_base.DrsBaseTestCase):
+ """Intended as a semi-black box test case for replication involving
+ an RODC."""
+
+ def setUp(self):
+ super(DrsRodcTestCase, self).setUp()
+ self.base_dn = self.ldb_dc1.get_default_basedn()
+
+ self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_drs_rodc")
+ self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
+
+ self.site = self.ldb_dc1.server_site_name()
+ self.rodc_name = "TESTRODCDRS%s" % random.randint(1, 10000000)
+ self.rodc_pass = "password12#"
+ self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
+
+ self.rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
+ creds=self.get_credentials(),
+ lp=self.get_loadparm(), site=self.site,
+ netbios_name=self.rodc_name,
+ targetdir=None, domain=None,
+ machinepass=self.rodc_pass)
+ self._create_rodc(self.rodc_ctx)
+ self.rodc_ctx.create_tmp_samdb()
+ self.tmp_samdb = self.rodc_ctx.tmp_samdb
+
+ rodc_creds = Credentials()
+ rodc_creds.guess(self.rodc_ctx.lp)
+ rodc_creds.set_username(self.rodc_name + '$')
+ rodc_creds.set_password(self.rodc_pass)
+ self.rodc_creds = rodc_creds
+
+ (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
+ (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
+
+ def tearDown(self):
+ self.rodc_ctx.cleanup_old_join()
+ super(DrsRodcTestCase, self).tearDown()
+
+ def test_admin_repl_secrets(self):
+ """
+ When a secret attribute is set to be replicated to an RODC with the
+ admin credentials, it should always replicate regardless of whether
+ or not it's in the Allowed RODC Password Replication Group.
+ """
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ user_name = "test_rodcA_%s" % rand
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+
+ # Check that the user has been added to msDSRevealedUsers
+ self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ def test_admin_repl_secrets_DummyDN_GUID(self):
+ """
+ When a secret attribute is set to be replicated to an RODC with the
+ admin credentials, it should always replicate regardless of whether
+ or not it's in the Allowed RODC Password Replication Group.
+ """
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ user_name = "test_rodcA_%s" % rand
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ res = self.ldb_dc1.search(base=user_dn, scope=ldb.SCOPE_BASE,
+ attrs=["objectGUID"])
+
+ user_guid = misc.GUID(res[0]["objectGUID"][0])
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str="DummyDN",
+ nc_guid=user_guid,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+ try:
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.fail(f"DsGetNCChanges failed with {estr}")
+
+ # Check that the user has been added to msDSRevealedUsers
+ self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ def test_rodc_repl_secrets(self):
+ """
+ When a secret attribute is set to be replicated to an RODC with
+ the RODC account credentials, it should not replicate if it's in
+ the Allowed RODC Password Replication Group. Once it is added to
+ the group, it should replicate.
+ """
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ user_name = "test_rodcB_%s" % rand
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as e:
+ (enum, estr) = e.args
+ self.assertEqual(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ # send the same request again and we should get the same response
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.assertEqual(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ # Retry with Administrator credentials, ignores password replication groups
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+
+ # Check that the user has been added to msDSRevealedUsers
+ self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ def test_rodc_repl_secrets_follow_on_req(self):
+ """
+ Checks that an RODC can't subvert an existing (valid) GetNCChanges
+ request to reveal secrets it shouldn't have access to.
+ """
+
+ # send an acceptable request that will match as many GUIDs as possible.
+ # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
+ # (On the server, this builds up the getnc_state->guids array)
+ req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.domain_dn(),
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ max_objects=1,
+ replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
+
+ # Get the next replication chunk, but set REPL_SECRET this time. This
+ # is following on the the previous accepted request, but we've changed
+ # exop to now request secrets. This request should fail
+ try:
+ req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.domain_dn(),
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
+ req8.highwatermark = ctr.new_highwatermark
+
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
+
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except RuntimeError as e2:
+ (enum, estr) = e2.args
+ pass
+
+ def test_msDSRevealedUsers_admin(self):
+ """
+ When a secret attribute is to be replicated to an RODC, the contents
+ of the attribute should be added to the msDSRevealedUsers attribute
+ of the computer object corresponding to the RODC.
+ """
+
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ # Add a user on DC1, add it to allowed password replication
+ # group, and replicate to RODC with EXOP_REPL_SECRETS
+ user_name = "test_rodcC_%s" % rand
+ password = "password12#"
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
+
+ self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+ [user_name],
+ add_members_operation=True)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+
+ # Check that the user has been added to msDSRevealedUsers
+ (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ # Change the user's password on DC1
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
+
+ (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
+
+ # Replicate to RODC again with EXOP_REPL_SECRETS
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+
+ # This is important for Windows, because the entry won't have been
+ # updated in time if we don't have it. Even with this sleep, it only
+ # passes some of the time...
+ time.sleep(5)
+
+ # Check that the entry in msDSRevealedUsers has been updated
+ (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
+
+ # We should be able to delete the user
+ self.ldb_dc1.deleteuser(user_name)
+
+ res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
+ attrs=["msDS-RevealedUsers"])
+ self.assertFalse("msDS-RevealedUsers" in res[0])
+
+ def test_msDSRevealedUsers(self):
+ """
+ When a secret attribute is to be replicated to an RODC, the contents
+ of the attribute should be added to the msDSRevealedUsers attribute
+ of the computer object corresponding to the RODC.
+ """
+
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ # Add a user on DC1, add it to allowed password replication
+ # group, and replicate to RODC with EXOP_REPL_SECRETS
+ user_name = "test_rodcD_%s" % rand
+ password = "password12#"
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
+
+ self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+ [user_name],
+ add_members_operation=True)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+
+ # Check that the user has been added to msDSRevealedUsers
+ (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ # Change the user's password on DC1
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
+
+ (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
+
+ # Replicate to RODC again with EXOP_REPL_SECRETS
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+
+ # This is important for Windows, because the entry won't have been
+ # updated in time if we don't have it. Even with this sleep, it only
+ # passes some of the time...
+ time.sleep(5)
+
+ # Check that the entry in msDSRevealedUsers has been updated
+ (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
+
+ # We should be able to delete the user
+ self.ldb_dc1.deleteuser(user_name)
+
+ res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
+ attrs=["msDS-RevealedUsers"])
+ self.assertFalse("msDS-RevealedUsers" in res[0])
+
+ def test_msDSRevealedUsers_pas(self):
+ """
+ If we provide a Partial Attribute Set when replicating to an RODC,
+ we should ignore it and replicate all of the secret attributes anyway
+ msDSRevealedUsers attribute.
+ """
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+ pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ # Add a user on DC1, add it to allowed password replication
+ # group, and replicate to RODC with EXOP_REPL_SECRETS
+ user_name = "test_rodcE_%s" % rand
+ password = "password12#"
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
+
+ self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+ [user_name],
+ add_members_operation=True)
+
+ pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=pas,
+ max_objects=133,
+ replica_flags=0)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
+
+ # Make sure that we still replicate the secrets
+ for attribute in ctr.first_object.object.attribute_ctr.attributes:
+ if attribute.attid in pas_exceptions:
+ pas_exceptions.remove(attribute.attid)
+ for attribute in pas_exceptions:
+ self.fail("%d was not replicated even though the partial attribute set should be ignored."
+ % attribute)
+
+ # Check that the user has been added to msDSRevealedUsers
+ (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ def test_msDSRevealedUsers_using_other_RODC(self):
+ """
+ Ensure that the machine account is tied to the destination DSA.
+ """
+ # Create a new identical RODC with just the first letter missing
+ other_rodc_name = self.rodc_name[1:]
+ other_rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
+ creds=self.get_credentials(),
+ lp=self.get_loadparm(), site=self.site,
+ netbios_name=other_rodc_name,
+ targetdir=None, domain=None,
+ machinepass=self.rodc_pass)
+ self._create_rodc(other_rodc_ctx)
+
+ other_rodc_creds = Credentials()
+ other_rodc_creds.guess(other_rodc_ctx.lp)
+ other_rodc_creds.set_username(other_rodc_name + '$')
+ other_rodc_creds.set_password(self.rodc_pass)
+
+ (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
+
+ rand = random.randint(1, 10000000)
+
+ user_name = "test_rodcF_%s" % rand
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+ self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+ [user_name],
+ add_members_operation=True)
+
+ req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as e3:
+ (enum, estr) = e3.args
+ self.assertEqual(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as e4:
+ (enum, estr) = e4.args
+ self.assertEqual(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ def test_msDSRevealedUsers_local_deny_allow(self):
+ """
+ Ensure that the deny trumps allow, and we can modify these
+ attributes directly instead of the global groups.
+
+ This may fail on Windows due to tokenGroup calculation caching.
+ """
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ # Add a user on DC1, add it to allowed password replication
+ # group, and replicate to RODC with EXOP_REPL_SECRETS
+ user_name = "test_rodcF_%s" % rand
+ password = "password12#"
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+ m["msDS-RevealOnDemandGroup"] = \
+ ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
+ "msDS-RevealOnDemandGroup")
+ self.ldb_dc1.modify(m)
+
+ # In local allow, should be success
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ except:
+ self.fail("Should have succeeded when in local allow group")
+
+ self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+ m["msDS-NeverRevealGroup"] = \
+ ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
+ "msDS-NeverRevealGroup")
+ self.ldb_dc1.modify(m)
+
+ # In local allow and deny, should be failure
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as e5:
+ (enum, estr) = e5.args
+ self.assertEqual(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+ m["msDS-RevealOnDemandGroup"] = \
+ ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
+ "msDS-RevealOnDemandGroup")
+ self.ldb_dc1.modify(m)
+
+ # In local deny, should be failure
+ (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as e6:
+ (enum, estr) = e6.args
+ self.assertEqual(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ def _assert_in_revealed_users(self, user_dn, attrlist):
+ res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
+ attrs=["msDS-RevealedUsers"])
+ revealed_users = res[0]["msDS-RevealedUsers"]
+ actual_attrids = []
+ packed_attrs = []
+ unpacked_attrs = []
+ for attribute in revealed_users:
+ attribute = attribute.decode('utf8')
+ dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute)
+ metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
+ if user_dn in attribute:
+ unpacked_attrs.append(metadata)
+ packed_attrs.append(dsdb_dn.get_bytes())
+ actual_attrids.append(metadata.attid)
+
+ self.assertEqual(sorted(actual_attrids), sorted(attrlist))
+
+ return (packed_attrs, unpacked_attrs)
+
+ def _assert_attrlist_equals(self, list_1, list_2):
+ return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
+
+ def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
+ for i in range(len(list_2)):
+ self.assertEqual(list_1[i].attid, list_2[i].attid)
+ self.assertEqual(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
+ self.assertEqual(list_1[i].version + num_changes, list_2[i].version)
+
+ if expected_new_usn:
+ self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
+ self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
+ else:
+ self.assertEqual(list_1[i].originating_usn, list_2[i].originating_usn)
+ self.assertEqual(list_1[i].local_usn, list_2[i].local_usn)
+
+ if list_1[i].attid in changed_attributes:
+ # We do the changes too quickly, so unless we put sleeps
+ # in between calls, these remain the same. Checking the USNs
+ # is enough.
+ pass
+ #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
+ else:
+ self.assertEqual(list_1[i].originating_change_time, list_2[i].originating_change_time)
+
+ def _create_rodc(self, ctx):
+ ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
+
+ ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
+ ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
+
+ mysid = ctx.get_mysid()
+ admin_dn = "<SID=%s>" % mysid
+ ctx.managedby = admin_dn
+
+ ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
+ samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+ samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
+
+ ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
+ ctx.secure_channel_type = misc.SEC_CHAN_RODC
+ ctx.RODC = True
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+
+ ctx.join_add_objects()