#!/usr/bin/env python3 # # Unit tests for dirsync control # Copyright (C) Matthieu Patou 2011 # Copyright (C) Jelmer Vernooij 2014 # Copyright (C) Catalyst.Net Ltd # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import optparse import sys sys.path.insert(0, "bin/python") import samba from samba.tests.subunitrun import TestProgram, SubunitOptions import samba.getopt as options import base64 import ldb from ldb import LdbError, SCOPE_BASE from ldb import Message, MessageElement, Dn from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_RODC_ATTRIBUTE from samba.dcerpc import security, misc, drsblobs from samba.ndr import ndr_unpack, ndr_pack from samba.auth import system_session from samba import gensec, sd_utils from samba.samdb import SamDB from samba.credentials import Credentials, DONT_USE_KERBEROS import samba.tests from samba.tests import delete_force parser = optparse.OptionParser("dirsync.py [options] ") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) parser.add_option_group(options.VersionOptions(parser)) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) subunitopts = SubunitOptions(parser) parser.add_option_group(subunitopts) opts, args = parser.parse_args() if len(args) < 1: parser.print_usage() sys.exit(1) host = args.pop() if "://" not in host: ldaphost = "ldap://%s" % host else: ldaphost = host start = host.rindex("://") host = host.lstrip(start + 3) lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) # # Tests start here # class DirsyncBaseTests(samba.tests.TestCase): def setUp(self): super().setUp() self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp) self.base_dn = self.ldb_admin.domain_dn() self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid()) self.user_pass = samba.generate_random_password(12, 16) self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized() self.sd_utils = sd_utils.SDUtils(self.ldb_admin) # used for anonymous login print("baseDN: %s" % self.base_dn) userou = "OU=dirsync-test" self.ou = f"{userou},{self.base_dn}" samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1']) self.ldb_admin.create_ou(self.ou) self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1']) # Regular user self.dirsync_user = "test_dirsync_user" self.simple_user = "test_simple_user" self.admin_user = "test_admin_user" self.dirsync_pass = self.user_pass self.simple_pass = self.user_pass self.admin_pass = self.user_pass self.ldb_admin.newuser(self.dirsync_user, self.dirsync_pass, userou=userou) self.ldb_admin.newuser(self.simple_user, self.simple_pass, userou=userou) self.ldb_admin.newuser(self.admin_user, self.admin_pass, userou=userou) self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn) user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user)) mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES, str(user_sid)) self.sd_utils.dacl_add_ace(self.base_dn, mod) self.addCleanup(self.sd_utils.dacl_delete_aces, self.base_dn, mod) # add admins to the Domain Admins group self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user], add_members_operation=True) def get_user_dn(self, name): return ldb.Dn(self.ldb_admin, "CN={0},{1}".format(name, self.ou)) def get_ldb_connection(self, target_username, target_password): creds_tmp = Credentials() creds_tmp.set_username(target_username) creds_tmp.set_password(target_password) creds_tmp.set_domain(creds.get_domain()) creds_tmp.set_realm(creds.get_realm()) creds_tmp.set_workstation(creds.get_workstation()) creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL) creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp) return ldb_target # tests on ldap add operations class SimpleDirsyncTests(DirsyncBaseTests): # def test_dirsync_errors(self): def test_dirsync_supported(self): """Test the basic of the dirsync is supported""" self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass) self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) try: self.ldb_simple.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) except LdbError as l: self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) def test_parentGUID_referrals(self): res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"]) res = self.ldb_admin.search(self.base_dn, expression="name=Configuration", controls=["dirsync:1:0:1"]) self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID")) def test_ok_not_rootdc(self): """Test if it's ok to do dirsync on another NC that is not the root DC""" self.ldb_admin.search(self.ldb_admin.get_config_basedn(), expression="samaccountname=*", controls=["dirsync:1:0:1"]) def test_dirsync_errors(self): """Test if dirsync returns the correct LDAP errors in case of pb""" self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass) try: self.ldb_simple.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) except LdbError as l: print(l) self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) try: self.ldb_simple.search("CN=Users,%s" % self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) except LdbError as l: print(l) self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) try: self.ldb_simple.search("CN=Users,%s" % self.base_dn, expression="samaccountname=*", controls=["dirsync:1:1:1"]) except LdbError as l: print(l) self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1) try: self.ldb_dirsync.search("CN=Users,%s" % self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) except LdbError as l: print(l) self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) try: self.ldb_admin.search("CN=Users,%s" % self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) except LdbError as l: print(l) self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) try: self.ldb_admin.search("CN=Users,%s" % self.base_dn, expression="samaccountname=*", controls=["dirsync:1:1:1"]) except LdbError as l: print(l) self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1) def test_dirsync_attributes(self): """Check behavior with some attributes """ res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") is not None) # Check that non replicated attributes are not returned self.assertTrue(res.msgs[0].get("badPwdCount") is None) # Check that non forward link are not returned self.assertTrue(res.msgs[0].get("memberof") is None) # Asking for instanceType will return also objectGUID res = self.ldb_admin.search(self.base_dn, expression="samaccountname=Administrator", attrs=["instanceType"], controls=["dirsync:1:0:1"]) self.assertTrue(res.msgs[0].get("objectGUID") is not None) self.assertTrue(res.msgs[0].get("instanceType") is not None) # We don't return an entry if asked for objectGUID res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str(self.base_dn), attrs=["objectGUID"], controls=["dirsync:1:0:1"]) self.assertEqual(len(res.msgs), 0) # a request on the root of a NC didn't return parentGUID res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str(self.base_dn), attrs=["name"], controls=["dirsync:1:0:1"]) self.assertTrue(res.msgs[0].get("objectGUID") is not None) self.assertTrue(res.msgs[0].get("name") is not None) self.assertTrue(res.msgs[0].get("parentGUID") is None) self.assertTrue(res.msgs[0].get("instanceType") is not None) # Asking for name will return also objectGUID and parentGUID # and instanceType and of course name res = self.ldb_admin.search(self.base_dn, expression="samaccountname=Administrator", attrs=["name"], controls=["dirsync:1:0:1"]) self.assertTrue(res.msgs[0].get("objectGUID") is not None) self.assertTrue(res.msgs[0].get("name") is not None) self.assertTrue(res.msgs[0].get("parentGUID") is not None) self.assertTrue(res.msgs[0].get("instanceType") is not None) # Asking for dn will not return not only DN but more like if attrs=* # parentGUID should be returned res = self.ldb_admin.search(self.base_dn, expression="samaccountname=Administrator", attrs=["dn"], controls=["dirsync:1:0:1"]) count = len(res.msgs[0]) res2 = self.ldb_admin.search(self.base_dn, expression="samaccountname=Administrator", controls=["dirsync:1:0:1"]) count2 = len(res2.msgs[0]) self.assertEqual(count, count2) # Asking for cn will return nothing on objects that have CN as RDN res = self.ldb_admin.search(self.base_dn, expression="samaccountname=Administrator", attrs=["cn"], controls=["dirsync:1:0:1"]) self.assertEqual(len(res.msgs), 0) # Asking for parentGUID will return nothing too res = self.ldb_admin.search(self.base_dn, expression="samaccountname=Administrator", attrs=["parentGUID"], controls=["dirsync:1:0:1"]) self.assertEqual(len(res.msgs), 0) ouname = "OU=testou,%s" % self.ou self.ouname = ouname self.ldb_admin.create_ou(ouname) delta = Message() delta.dn = Dn(self.ldb_admin, ouname) delta["cn"] = MessageElement("test ou", FLAG_MOD_ADD, "cn") self.ldb_admin.modify(delta) res = self.ldb_admin.search(self.base_dn, expression="name=testou", attrs=["cn"], controls=["dirsync:1:0:1"]) self.assertEqual(len(res.msgs), 1) self.assertEqual(len(res.msgs[0]), 3) delete_force(self.ldb_admin, ouname) def test_dirsync_with_controls(self): """Check that dirsync return correct information when dealing with the NC""" res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str(self.base_dn), attrs=["name"], controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"]) def test_dirsync_basenc(self): """Check that dirsync return correct information when dealing with the NC""" res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str(self.base_dn), attrs=["name"], controls=["dirsync:1:0:10000"]) self.assertEqual(len(res.msgs), 1) self.assertEqual(len(res.msgs[0]), 3) res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str(self.base_dn), attrs=["ntSecurityDescriptor"], controls=["dirsync:1:0:10000"]) self.assertEqual(len(res.msgs), 1) self.assertEqual(len(res.msgs[0]), 3) def test_dirsync_othernc(self): """Check that dirsync return information for entries that are normally referrals (ie. other NCs)""" res = self.ldb_admin.search(self.base_dn, expression="(objectclass=configuration)", attrs=["name"], controls=["dirsync:1:0:10000"]) self.assertEqual(len(res.msgs), 1) self.assertEqual(len(res.msgs[0]), 4) res = self.ldb_admin.search(self.base_dn, expression="(objectclass=configuration)", attrs=["ntSecurityDescriptor"], controls=["dirsync:1:0:10000"]) self.assertEqual(len(res.msgs), 1) self.assertEqual(len(res.msgs[0]), 3) res = self.ldb_admin.search(self.base_dn, expression="(objectclass=domaindns)", attrs=["ntSecurityDescriptor"], controls=["dirsync:1:0:10000"]) nb = len(res.msgs) # only sub nc returns a result when asked for objectGUID res = self.ldb_admin.search(self.base_dn, expression="(objectclass=domaindns)", attrs=["objectGUID"], controls=["dirsync:1:0:0"]) self.assertEqual(len(res.msgs), nb - 1) if nb > 1: self.assertTrue(res.msgs[0].get("objectGUID") is not None) else: res = self.ldb_admin.search(self.base_dn, expression="(objectclass=configuration)", attrs=["objectGUID"], controls=["dirsync:1:0:0"]) def test_dirsync_send_delta(self): """Check that dirsync return correct delta when sending the last cookie""" res = self.ldb_admin.search(self.base_dn, expression="(&(samaccountname=test*)(!(isDeleted=*)))", controls=["dirsync:1:0:10000"]) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "0" ctl[3] = "10000" control = str(":".join(ctl)) res = self.ldb_admin.search(self.base_dn, expression="(&(samaccountname=test*)(!(isDeleted=*)))", controls=[control]) self.assertEqual(len(res), 0) res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=["dirsync:1:0:100000"]) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "0" ctl[3] = "10000" control2 = str(":".join(ctl)) # Let's create an OU ouname = "OU=testou2,%s" % self.base_dn self.ouname = ouname self.ldb_admin.create_ou(ouname) res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=[control2]) self.assertEqual(len(res), 1) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "0" ctl[3] = "10000" control3 = str(":".join(ctl)) delta = Message() delta.dn = Dn(self.ldb_admin, str(ouname)) delta["cn"] = MessageElement("test ou", FLAG_MOD_ADD, "cn") self.ldb_admin.modify(delta) res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=[control3]) self.assertEqual(len(res.msgs), 1) # 3 attributes: instanceType, cn and objectGUID self.assertEqual(len(res.msgs[0]), 3) delta = Message() delta.dn = Dn(self.ldb_admin, str(ouname)) delta["cn"] = MessageElement([], FLAG_MOD_DELETE, "cn") self.ldb_admin.modify(delta) res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=[control3]) self.assertEqual(len(res.msgs), 1) # So we won't have much attribute returned but instanceType and GUID # are. # 3 attributes: instanceType and objectGUID and cn but empty self.assertEqual(len(res.msgs[0]), 3) ouname = "OU=newouname,%s" % self.base_dn self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname))) self.ouname = ouname ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "0" ctl[3] = "10000" control4 = str(":".join(ctl)) res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=[control3]) self.assertTrue(res[0].get("parentGUID") is not None) self.assertTrue(res[0].get("name") is not None) delete_force(self.ldb_admin, ouname) def test_dirsync_linkedattributes_OBJECT_SECURITY(self): """Check that dirsync returned deleted objects too""" # Let's search for members self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) res = self.ldb_simple.search(self.base_dn, expression="(name=Administrators)", controls=["dirsync:1:1:1"]) self.assertTrue(len(res[0].get("member")) > 0) size = len(res[0].get("member")) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "1" ctl[3] = "10000" control1 = str(":".join(ctl)) self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user], add_members_operation=True) res = self.ldb_simple.search(self.base_dn, expression="(name=Administrators)", controls=[control1]) self.assertEqual(len(res[0].get("member")), size + 1) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "1" ctl[3] = "10000" control1 = str(":".join(ctl)) # remove the user from the group self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user], add_members_operation=False) res = self.ldb_simple.search(self.base_dn, expression="(name=Administrators)", controls=[control1]) self.assertEqual(len(res[0].get("member")), size) self.ldb_admin.newgroup("testgroup") self.addCleanup(self.ldb_admin.deletegroup, "testgroup") self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user], add_members_operation=True) res = self.ldb_admin.search(self.base_dn, expression="(name=testgroup)", controls=["dirsync:1:0:1"]) self.assertEqual(len(res[0].get("member")), 1) self.assertTrue(res[0].get("member") != "") ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "0" ctl[3] = "1" control1 = str(":".join(ctl)) # Check that reasking the same question but with an updated cookie # didn't return any results. print(control1) res = self.ldb_admin.search(self.base_dn, expression="(name=testgroup)", controls=[control1]) self.assertEqual(len(res), 0) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "1" ctl[3] = "10000" control1 = str(":".join(ctl)) self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user], add_members_operation=False) res = self.ldb_admin.search(self.base_dn, expression="(name=testgroup)", attrs=["member"], controls=[control1]) self.assertEqual(len(res[0].get("member")), 0) def test_dirsync_deleted_items(self): """Check that dirsync returned deleted objects too""" # Let's create an OU ouname = "OU=testou3,%s" % self.base_dn self.ouname = ouname self.ldb_admin.create_ou(ouname) res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=["dirsync:1:0:1"]) guid = None for e in res: if str(e["name"]) == "testou3": guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0])) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "0" ctl[3] = "10000" control1 = str(":".join(ctl)) # So now delete the object and check that # we can see the object but deleted when admin delete_force(self.ldb_admin, ouname) res = self.ldb_admin.search(self.base_dn, expression="(objectClass=organizationalUnit)", controls=[control1]) self.assertEqual(len(res), 1) guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0])) self.assertEqual(guid2, guid) self.assertTrue(res[0].get("isDeleted")) self.assertTrue(res[0].get("name") is not None) def test_cookie_from_others(self): res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=["dirsync:1:0:1"]) ctl = str(res.controls[0]).split(":") cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4]))) cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db") controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')] res = self.ldb_admin.search(self.base_dn, expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", controls=controls) def test_dirsync_linkedattributes_range(self): self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) res = self.ldb_admin.search(self.base_dn, attrs=["member;range=1-1"], expression="(name=Administrators)", controls=["dirsync:1:0:0"]) self.assertTrue(len(res) > 0) self.assertTrue(res[0].get("member;range=1-1") is None) self.assertTrue(res[0].get("member") is not None) self.assertTrue(len(res[0].get("member")) > 0) def test_dirsync_linkedattributes_range_user(self): self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) try: res = self.ldb_simple.search(self.base_dn, attrs=["member;range=1-1"], expression="(name=Administrators)", controls=["dirsync:1:0:0"]) except LdbError as e: (num, _) = e.args self.assertEqual(num, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS) else: self.fail() def test_dirsync_linkedattributes(self): flag_incr_linked = 2147483648 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) res = self.ldb_admin.search(self.base_dn, attrs=["member"], expression="(name=Administrators)", controls=["dirsync:1:%d:1" % flag_incr_linked]) self.assertTrue(res[0].get("member;range=1-1") is not None) self.assertTrue(len(res[0].get("member;range=1-1")) > 0) size = len(res[0].get("member;range=1-1")) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "%d" % flag_incr_linked ctl[3] = "10000" control1 = str(":".join(ctl)) self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user], add_members_operation=True) self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user], add_members_operation=True) res = self.ldb_admin.search(self.base_dn, expression="(name=Administrators)", controls=[control1]) self.assertEqual(len(res[0].get("member;range=1-1")), 2) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "%d" % flag_incr_linked ctl[3] = "10000" control1 = str(":".join(ctl)) # remove the user from the group self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user], add_members_operation=False) res = self.ldb_admin.search(self.base_dn, expression="(name=Administrators)", controls=[control1]) self.assertEqual(res[0].get("member;range=1-1"), None) self.assertEqual(len(res[0].get("member;range=0-0")), 1) ctl = str(res.controls[0]).split(":") ctl[1] = "1" ctl[2] = "%d" % flag_incr_linked ctl[3] = "10000" control2 = str(":".join(ctl)) self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user], add_members_operation=False) res = self.ldb_admin.search(self.base_dn, expression="(name=Administrators)", controls=[control2]) self.assertEqual(res[0].get("member;range=1-1"), None) self.assertEqual(len(res[0].get("member;range=0-0")), 1) res = self.ldb_admin.search(self.base_dn, expression="(name=Administrators)", controls=[control1]) self.assertEqual(res[0].get("member;range=1-1"), None) self.assertEqual(len(res[0].get("member;range=0-0")), 2) def test_dirsync_extended_dn(self): """Check that dirsync works together with the extended_dn control""" # Let's search for members self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass) res = self.ldb_simple.search(self.base_dn, expression="(name=Administrators)", controls=["dirsync:1:1:1"]) self.assertTrue(len(res[0].get("member")) > 0) size = len(res[0].get("member")) resEX1 = self.ldb_simple.search(self.base_dn, expression="(name=Administrators)", controls=["dirsync:1:1:1","extended_dn:1:1"]) self.assertTrue(len(resEX1[0].get("member")) > 0) sizeEX1 = len(resEX1[0].get("member")) self.assertEqual(sizeEX1, size) self.assertIn(res[0]["member"][0], resEX1[0]["member"][0]) self.assertIn(b"; 0) sizeEX0 = len(resEX0[0].get("member")) self.assertEqual(sizeEX0, size) self.assertIn(res[0]["member"][0], resEX0[0]["member"][0]) self.assertIn(b";