#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Unix SMB/CIFS implementation. # Copyright (C) Catalyst.Net Ltd. 2017 # Copyright (C) Andrew Bartlett 2019 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import drs_base import ldb import samba from samba import sd_utils from ldb import LdbError class ReplAclTestCase(drs_base.DrsBaseTestCase): def setUp(self): super(ReplAclTestCase, self).setUp() self.mod = "(A;CIOI;GA;;;SY)" self.mod_becomes = "(A;OICIIO;GA;;;SY)" self.mod_inherits_as = "(A;OICIIOID;GA;;;SY)" self.sd_utils_dc1 = sd_utils.SDUtils(self.ldb_dc1) self.sd_utils_dc2 = sd_utils.SDUtils(self.ldb_dc2) self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_acl_inherit") # disable replication for the tests so we can control at what point # the DCs try to replicate self._disable_all_repl(self.dnsname_dc1) self._disable_all_repl(self.dnsname_dc2) # make sure DCs are synchronized before the test self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) def tearDown(self): self.ldb_dc1.delete(self.ou, ["tree_delete:1"]) # re-enable replication self._enable_all_repl(self.dnsname_dc1) self._enable_all_repl(self.dnsname_dc2) super(ReplAclTestCase, self).tearDown() def test_acl_inheirt_new_object_1_pass(self): # Set the inherited ACL on the parent OU self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod) # Assert ACL set stuck as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(self.ou)) # Make a new object dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou) self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"}) self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Assert ACL replicated as expected self.assertIn(self.mod_becomes, self.sd_utils_dc2.get_sd_as_sddl(self.ou)) # Confirm inherited ACLs are identical and were inherited self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn), self.sd_utils_dc2.get_sd_as_sddl(dn)) def test_acl_inheirt_new_object(self): # Set the inherited ACL on the parent OU self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod) # Assert ACL set stuck as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(self.ou)) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Make a new object dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou) self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"}) self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Assert ACL replicated as expected self.assertIn(self.mod_becomes, self.sd_utils_dc2.get_sd_as_sddl(self.ou)) # Confirm inherited ACLs are identical and were inherited self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn), self.sd_utils_dc2.get_sd_as_sddl(dn)) def test_acl_inherit_existing_object(self): # Make a new object dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou) self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"}) try: self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=dn, attrs=[]) self.fail() except LdbError as err: enum = err.args[0] self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT) self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Confirm it is now replicated self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=dn, attrs=[]) # Set the inherited ACL on the parent OU self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod) # Assert ACL set stuck as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(self.ou)) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Confirm inherited ACLs are identical and were inherited # Assert ACL replicated as expected self.assertIn(self.mod_becomes, self.sd_utils_dc2.get_sd_as_sddl(self.ou)) self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn), self.sd_utils_dc2.get_sd_as_sddl(dn)) def test_acl_inheirt_existing_object_1_pass(self): # Make a new object dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou) self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalUnit"}) try: self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=dn, attrs=[]) self.fail() except LdbError as err: enum = err.args[0] self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT) # Set the inherited ACL on the parent OU self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod) # Assert ACL set as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(self.ou)) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Assert ACL replicated as expected self.assertIn(self.mod_becomes, self.sd_utils_dc2.get_sd_as_sddl(self.ou)) # Confirm inherited ACLs are identical and were inherited self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(dn), self.sd_utils_dc2.get_sd_as_sddl(dn)) def test_acl_inheirt_renamed_object(self): # Make a new object new_ou = samba.tests.create_test_ou(self.ldb_dc1, "acl_test_l2") sub_ou_dn = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou) try: self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=new_ou, attrs=[]) self.fail() except LdbError as err: enum = err.args[0] self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT) self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Confirm it is now replicated self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=new_ou, attrs=[]) # Set the inherited ACL on the parent OU on DC1 self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod) # Assert ACL set as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(self.ou)) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Assert ACL replicated as expected self.assertIn(self.mod_becomes, self.sd_utils_dc2.get_sd_as_sddl(self.ou)) # Rename to under self.ou self.ldb_dc1.rename(new_ou, sub_ou_dn) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Confirm inherited ACLs are identical and were inherited self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn), self.sd_utils_dc2.get_sd_as_sddl(sub_ou_dn)) def test_acl_inheirt_renamed_child_object(self): # Make a new OU new_ou = samba.tests.create_test_ou(self.ldb_dc1, "acl_test_l2") # Here is where the new OU will end up at the end. sub2_ou_dn_final = ldb.Dn(self.ldb_dc1, "OU=l2,%s" % self.ou) sub3_ou_dn = ldb.Dn(self.ldb_dc1, "OU=l3,%s" % new_ou) sub3_ou_dn_final = ldb.Dn(self.ldb_dc1, "OU=l3,%s" % sub2_ou_dn_final) self.ldb_dc1.add({"dn": sub3_ou_dn, "objectclass": "organizationalUnit"}) sub4_ou_dn = ldb.Dn(self.ldb_dc1, "OU=l4,%s" % sub3_ou_dn) sub4_ou_dn_final = ldb.Dn(self.ldb_dc1, "OU=l4,%s" % sub3_ou_dn_final) self.ldb_dc1.add({"dn": sub4_ou_dn, "objectclass": "organizationalUnit"}) try: self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=new_ou, attrs=[]) self.fail() except LdbError as err: enum = err.args[0] self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT) self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Confirm it is now replicated self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=new_ou, attrs=[]) # # Given a tree new_ou -> l3 -> l4 # # Set the inherited ACL on the grandchild OU (l3) on DC1 self.sd_utils_dc1.dacl_add_ace(sub3_ou_dn, self.mod) # Assert ACL set stuck as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(sub3_ou_dn)) # Rename new_ou (l2) to under self.ou (this must happen second). If the # inheritance between l3 and l4 is name-based, this could # break. # The tree is now self.ou -> l2 -> l3 -> l4 self.ldb_dc1.rename(new_ou, sub2_ou_dn_final) # Assert ACL set remained as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(sub3_ou_dn_final)) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Confirm set ACLs (on l3 ) are identical and were inherited self.assertIn(self.mod_becomes, self.sd_utils_dc2.get_sd_as_sddl(sub3_ou_dn_final)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub3_ou_dn_final), self.sd_utils_dc2.get_sd_as_sddl(sub3_ou_dn_final)) # Confirm inherited ACLs (from l3 to l4) are identical # and were inherited self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(sub4_ou_dn_final)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub4_ou_dn_final), self.sd_utils_dc2.get_sd_as_sddl(sub4_ou_dn_final)) def test_acl_inheirt_renamed_object_in_conflict(self): # Make a new object to be renamed under self.ou new_ou = samba.tests.create_test_ou(self.ldb_dc1, "acl_test_l2") # Make a new OU under self.ou (on DC2) sub_ou_dn = ldb.Dn(self.ldb_dc2, "OU=l2,%s" % self.ou) self.ldb_dc2.add({"dn": sub_ou_dn, "objectclass": "organizationalUnit"}) # Set the inherited ACL on the parent OU self.sd_utils_dc1.dacl_add_ace(self.ou, self.mod) # Assert ACL set stuck as expected self.assertIn(self.mod_becomes, self.sd_utils_dc1.get_sd_as_sddl(self.ou)) # Replicate to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # Rename to under self.ou self.ldb_dc1.rename(new_ou, sub_ou_dn) self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn)) # Replicate to DC2 (will cause a conflict, DC1 to win, version # is higher since named twice) self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) children = self.ldb_dc2.search(scope=ldb.SCOPE_ONELEVEL, base=self.ou, attrs=[]) for child in children: self.assertIn(self.mod_inherits_as, self.sd_utils_dc2.get_sd_as_sddl(child.dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn), self.sd_utils_dc2.get_sd_as_sddl(child.dn)) # Replicate back self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(sub_ou_dn)) for child in children: self.assertIn(self.mod_inherits_as, self.sd_utils_dc1.get_sd_as_sddl(child.dn)) self.assertEqual(self.sd_utils_dc1.get_sd_as_sddl(child.dn), self.sd_utils_dc2.get_sd_as_sddl(child.dn))