summaryrefslogtreecommitdiffstats
path: root/source4/torture/drs/python/repl_secdesc.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/torture/drs/python/repl_secdesc.py')
-rw-r--r--source4/torture/drs/python/repl_secdesc.py400
1 files changed, 400 insertions, 0 deletions
diff --git a/source4/torture/drs/python/repl_secdesc.py b/source4/torture/drs/python/repl_secdesc.py
new file mode 100644
index 0000000..38ae25a
--- /dev/null
+++ b/source4/torture/drs/python/repl_secdesc.py
@@ -0,0 +1,400 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Unix SMB/CIFS implementation.
+# Copyright (C) Catalyst.Net Ltd. 2017
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+import drs_base
+import ldb
+import samba
+from samba import sd_utils
+from ldb import LdbError
+
+class ReplAclTestCase(drs_base.DrsBaseTestCase):
+
+ def setUp(self):
+ super(ReplAclTestCase, self).setUp()
+ self.mod = "(A;CIOI;GA;;;SY)"
+ self.mod_becomes = "(A;OICIIO;GA;;;SY)"
+ self.mod_inherits_as = "(A;OICIIOID;GA;;;SY)"
+
+ self.sd_utils_dc1 = sd_utils.SDUtils(self.ldb_dc1)
+ self.sd_utils_dc2 = sd_utils.SDUtils(self.ldb_dc2)
+
+ self.ou = samba.tests.create_test_ou(self.ldb_dc1,
+ "test_acl_inherit")
+
+ # disable replication for the tests so we can control at what point
+ # the DCs try to replicate
+ self._disable_all_repl(self.dnsname_dc1)
+ self._disable_all_repl(self.dnsname_dc2)
+
+ # make sure DCs are synchronized before the test
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ def tearDown(self):
+ self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
+
+ # re-enable replication
+ self._enable_all_repl(self.dnsname_dc1)
+ self._enable_all_repl(self.dnsname_dc2)
+
+ super(ReplAclTestCase, self).tearDown()
+
+ def test_acl_inheirt_new_object_1_pass(self):
+ # Set the inherited ACL on the parent OU
+ self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod)
+
+ # Assert ACL set stuck as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(self.ou))
+
+ # Make a new object
+ dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou)
+ self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"})
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Assert ACL replicated as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc2.get_sd_as_sddl(self.ou))
+
+ # Confirm inherited ACLs are identical and were inherited
+
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn),
+ self.sd_utils_dc2.get_sd_as_sddl(dn))
+
+ def test_acl_inheirt_new_object(self):
+ # Set the inherited ACL on the parent OU
+ self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod)
+
+ # Assert ACL set stuck as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(self.ou))
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Make a new object
+ dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou)
+ self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"})
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Assert ACL replicated as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc2.get_sd_as_sddl(self.ou))
+
+ # Confirm inherited ACLs are identical and were inherited
+
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn),
+ self.sd_utils_dc2.get_sd_as_sddl(dn))
+
+ def test_acl_inherit_existing_object(self):
+ # Make a new object
+ dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou)
+ self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"})
+
+ try:
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=dn,
+ attrs=[])
+ self.fail()
+ except LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Confirm it is now replicated
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=dn,
+ attrs=[])
+
+ # Set the inherited ACL on the parent OU
+ self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod)
+
+ # Assert ACL set stuck as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(self.ou))
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Confirm inherited ACLs are identical and were inherited
+
+ # Assert ACL replicated as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc2.get_sd_as_sddl(self.ou))
+
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn),
+ self.sd_utils_dc2.get_sd_as_sddl(dn))
+
+ def test_acl_inheirt_existing_object_1_pass(self):
+ # Make a new object
+ dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou)
+ self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"})
+
+ try:
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=dn,
+ attrs=[])
+ self.fail()
+ except LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
+
+ # Set the inherited ACL on the parent OU
+ self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod)
+
+ # Assert ACL set as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(self.ou))
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Assert ACL replicated as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc2.get_sd_as_sddl(self.ou))
+
+ # Confirm inherited ACLs are identical and were inherited
+
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn),
+ self.sd_utils_dc2.get_sd_as_sddl(dn))
+
+ def test_acl_inheirt_renamed_object(self):
+ # Make a new object
+ new_ou = samba.tests.create_test_ou(self.ldb_dc1,
+ "acl_test_l2")
+
+ sub_ou_dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou)
+
+ try:
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=new_ou,
+ attrs=[])
+ self.fail()
+ except LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Confirm it is now replicated
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=new_ou,
+ attrs=[])
+
+ # Set the inherited ACL on the parent OU on DC1
+ self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod)
+
+ # Assert ACL set as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(self.ou))
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Assert ACL replicated as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc2.get_sd_as_sddl(self.ou))
+
+ # Rename to under self.ou
+
+ self.ldb_dc1.rename(new_ou, sub_ou_dn)
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Confirm inherited ACLs are identical and were inherited
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn),
+ self.sd_utils_dc2.get_sd_as_sddl(sub_ou_dn))
+
+
+ def test_acl_inheirt_renamed_child_object(self):
+ # Make a new OU
+ new_ou = samba.tests.create_test_ou(self.ldb_dc1,
+ "acl_test_l2")
+
+ # Here is where the new OU will end up at the end.
+ sub2_ou_dn_final = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou)
+
+ sub3_ou_dn = ldb.Dn(self.ldb_dc1, "OU=l3,%s" % new_ou)
+ sub3_ou_dn_final = ldb.Dn(self.ldb_dc1, "OU=l3,%s" % sub2_ou_dn_final)
+
+ self.ldb_dc1.add({"dn": sub3_ou_dn,
+ "objectclass": "organizationalUnit"})
+
+ sub4_ou_dn = ldb.Dn(self.ldb_dc1, "OU=l4,%s" % sub3_ou_dn)
+ sub4_ou_dn_final = ldb.Dn(self.ldb_dc1, "OU=l4,%s" % sub3_ou_dn_final)
+
+ self.ldb_dc1.add({"dn": sub4_ou_dn,
+ "objectclass": "organizationalUnit"})
+
+ try:
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=new_ou,
+ attrs=[])
+ self.fail()
+ except LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Confirm it is now replicated
+ self.ldb_dc2.search(scope=ldb.SCOPE_BASE,
+ base=new_ou,
+ attrs=[])
+
+ #
+ # Given a tree new_ou -> l3 -> l4
+ #
+
+ # Set the inherited ACL on the grandchild OU (l3) on DC1
+ self.sd_utils_dc1.dacl_add_ace(sub3_ou_dn, self.mod)
+
+ # Assert ACL set stuck as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(sub3_ou_dn))
+
+ # Rename new_ou (l2) to under self.ou (this must happen second). If the
+ # inheritance between l3 and l4 is name-based, this could
+ # break.
+
+ # The tree is now self.ou -> l2 -> l3 -> l4
+
+ self.ldb_dc1.rename(new_ou, sub2_ou_dn_final)
+
+ # Assert ACL set remained as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(sub3_ou_dn_final))
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Confirm set ACLs (on l3 ) are identical and were inherited
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc2.get_sd_as_sddl(sub3_ou_dn_final))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub3_ou_dn_final),
+ self.sd_utils_dc2.get_sd_as_sddl(sub3_ou_dn_final))
+
+ # Confirm inherited ACLs (from l3 to l4) are identical
+ # and were inherited
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(sub4_ou_dn_final))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub4_ou_dn_final),
+ self.sd_utils_dc2.get_sd_as_sddl(sub4_ou_dn_final))
+
+
+ def test_acl_inheirt_renamed_object_in_conflict(self):
+ # Make a new object to be renamed under self.ou
+ new_ou = samba.tests.create_test_ou(self.ldb_dc1,
+ "acl_test_l2")
+
+ # Make a new OU under self.ou (on DC2)
+ sub_ou_dn = ldb.Dn(self.ldb_dc2, "OU=l2,%s" % self.ou)
+ self.ldb_dc2.add({"dn": sub_ou_dn,
+ "objectclass": "organizationalUnit"})
+
+ # Set the inherited ACL on the parent OU
+ self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod)
+
+ # Assert ACL set stuck as expected
+ self.assertIn(self.mod_becomes,
+ self.sd_utils_dc1.get_sd_as_sddl(self.ou))
+
+ # Replicate to DC2
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ # Rename to under self.ou
+ self.ldb_dc1.rename(new_ou, sub_ou_dn)
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn))
+
+ # Replicate to DC2 (will cause a conflict, DC1 to win, version
+ # is higher since named twice)
+
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ forced=True)
+
+ children = self.ldb_dc2.search(scope=ldb.SCOPE_ONELEVEL,
+ base=self.ou,
+ attrs=[])
+ for child in children:
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc2.get_sd_as_sddl(child.dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn),
+ self.sd_utils_dc2.get_sd_as_sddl(child.dn))
+
+ # Replicate back
+ self._net_drs_replicate(DC=self.dnsname_dc1,
+ fromDC=self.dnsname_dc2,
+ forced=True)
+
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn))
+
+ for child in children:
+ self.assertIn(self.mod_inherits_as,
+ self.sd_utils_dc1.get_sd_as_sddl(child.dn))
+ self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(child.dn),
+ self.sd_utils_dc2.get_sd_as_sddl(child.dn))