diff options
Diffstat (limited to 'lib/ldb-samba/tests')
-rw-r--r-- | lib/ldb-samba/tests/index.py | 192 | ||||
-rwxr-xr-x | lib/ldb-samba/tests/match_rules.py | 1797 | ||||
-rwxr-xr-x | lib/ldb-samba/tests/match_rules_remote.py | 104 |
3 files changed, 2093 insertions, 0 deletions
diff --git a/lib/ldb-samba/tests/index.py b/lib/ldb-samba/tests/index.py new file mode 100644 index 0000000..2256e3e --- /dev/null +++ b/lib/ldb-samba/tests/index.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# +# Tests for comparison expressions on indexed keys +# +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +"""Tests for expressions containing comparisons on indexed attributes. + Copied from ldb's index.py""" + +import os +from unittest import TestCase +import sys +from samba import _ldb +import shutil +from ldb import SCOPE_SUBTREE +from samba.tests.subunitrun import TestProgram + +PY3 = sys.version_info > (3, 0) + +TDB_PREFIX = "tdb://" +MDB_PREFIX = "mdb://" + +def tempdir(): + import tempfile + try: + dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp") + except KeyError: + dir_prefix = None + return tempfile.mkdtemp(dir=dir_prefix) + +class LdbBaseTest(TestCase): + def setUp(self): + super(LdbBaseTest, self).setUp() + try: + if self.prefix is None: + self.prefix = TDB_PREFIX + except AttributeError: + self.prefix = TDB_PREFIX + + def tearDown(self): + super(LdbBaseTest, self).tearDown() + + def url(self): + return self.prefix + self.filename + + def flags(self): + if self.prefix == MDB_PREFIX: + return ldb.FLG_NOSYNC + else: + return 0 + + def options(self): + if self.prefix == MDB_PREFIX: + return ['disable_full_db_scan_for_self_test:1'] + else: + return None + +class LdbTDBIndexedComparisonExpressions(LdbBaseTest): + def tearDown(self): + shutil.rmtree(self.testdir) + super(LdbTDBIndexedComparisonExpressions, self).tearDown() + + # Ensure the LDB is closed now, so we close the FD + del(self.l) + + def setUp(self): + super(LdbTDBIndexedComparisonExpressions, self).setUp() + self.testdir = tempdir() + self.filename = os.path.join(self.testdir, "indexedcomptest.ldb") + # Note that the maximum key length is set to 54 + # This accounts for the 4 bytes added by the dn formatting + # a leading dn=, and a trailing zero terminator + # + self.l = _ldb.Ldb(self.url(), options=self.options()) + self.l.add({"dn": "@ATTRIBUTES"}) + self.l.add({"dn": "@INDEXLIST", + "@IDXATTR": [b"int32attr"], + "@IDXONE": [b"1"], + "@IDXGUID": [b"objectUUID"], + "@IDX_DN_GUID": [b"GUID"]}) + + def test_comparison_expression(self): + self.l.samba_schema_attribute_add("int32attr", 0, + _ldb.SYNTAX_SAMBA_INT32) + + int32_max = 2**31-1 + int32_min = -2**31 + test_nums = list(range(-5, 5)) + test_nums += list(range(int32_max-5, int32_max+1)) + test_nums += list(range(int32_min, int32_min+5)) + test_nums = sorted(test_nums) + + for i in test_nums: + ouuid = 0x0123456789abcdef + i + ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode()) + self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i), + "objectUUID": ouuid_s, + "int32attr": str(i)}) + + def assert_int32_expr(expr, py_expr=None): + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=SCOPE_SUBTREE, + expression="(int32attr%s)" % (expr)) + + if not py_expr: + py_expr = expr + expect = [n for n in test_nums if eval(str(n) + py_expr)] + vals = sorted([int(r.get("int32attr")[0]) for r in res]) + self.assertEqual(len(res), len(expect)) + self.assertEqual(set(vals), set(expect)) + self.assertEqual(expect, vals) + + assert_int32_expr(">=-2") + assert_int32_expr("<=2") + assert_int32_expr(">=" + str(int32_min)) + assert_int32_expr("<=" + str(int32_min)) + assert_int32_expr("<=" + str(int32_min+1)) + assert_int32_expr("<=" + str(int32_max)) + assert_int32_expr(">=" + str(int32_max)) + assert_int32_expr(">=" + str(int32_max-1)) + assert_int32_expr("=10", "==10") + + def test_comparison_expression_duplicates(self): + self.l.samba_schema_attribute_add("int32attr", 0, + _ldb.SYNTAX_SAMBA_INT32) + + int32_max = 2**31-1 + int32_min = -2**31 + + test_nums = list(range(-5, 5)) * 3 + test_nums += list(range(-20, 20, 5)) * 2 + test_nums += list(range(-50, 50, 15)) + test_nums = sorted(test_nums) + + for i, n in enumerate(test_nums): + ouuid = 0x0123456789abcdef + i + ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode()) + self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i), + "objectUUID": ouuid_s, + "int32attr": str(n)}) + + def assert_int32_expr(expr, py_expr=None): + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=SCOPE_SUBTREE, + expression="(int32attr%s)" % (expr)) + + if not py_expr: + py_expr = expr + expect = [n for n in test_nums if eval(str(n) + py_expr)] + vals = sorted([int(r.get("int32attr")[0]) for r in res]) + self.assertEqual(len(res), len(expect)) + self.assertEqual(set(vals), set(expect)) + self.assertEqual(expect, vals) + + assert_int32_expr(">=-2") + assert_int32_expr("<=2") + assert_int32_expr(">=" + str(int32_min)) + assert_int32_expr("<=" + str(int32_min)) + assert_int32_expr("<=" + str(int32_min+1)) + assert_int32_expr("<=" + str(int32_max)) + assert_int32_expr(">=" + str(int32_max)) + assert_int32_expr(">=" + str(int32_max-1)) + assert_int32_expr("=-5", "==-5") + assert_int32_expr("=5", "==5") + +# Run the same tests against an lmdb backend +class LdbLMDBIndexedComparisonExpressions(LdbTDBIndexedComparisonExpressions): + + def setUp(self): + if os.environ.get('HAVE_LMDB', '1') == '0': + self.skipTest("No lmdb backend") + self.prefix = MDB_PREFIX + super(LdbLMDBIndexedComparisonExpressions, self).setUp() + + def tearDown(self): + super(LdbLMDBIndexedComparisonExpressions, self).tearDown() + + +TestProgram(module=__name__, opts=[]) diff --git a/lib/ldb-samba/tests/match_rules.py b/lib/ldb-samba/tests/match_rules.py new file mode 100755 index 0000000..2fe6c3e --- /dev/null +++ b/lib/ldb-samba/tests/match_rules.py @@ -0,0 +1,1797 @@ +#!/usr/bin/env python3 + +import optparse +import sys +import os +import samba +import samba.getopt as options + +from samba.tests.subunitrun import SubunitOptions, TestProgram + +from samba.samdb import SamDB +from samba.auth import system_session +from ldb import Message, MessageElement, Dn, LdbError +from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE +from ldb import SCOPE_BASE, SCOPE_SUBTREE, SCOPE_ONELEVEL + +# TODO I'm ignoring case in these tests for now. +# This should be fixed to work inline with Windows. +# The literal strings are in the case Windows uses. +# Windows appear to preserve casing of the RDN and uppercase the other keys. + + +class MatchRulesTestsBase(samba.tests.TestCase): + def setUp(self): + super().setUp() + self.lp = self.sambaopts.get_loadparm() + self.creds = self.credopts.get_credentials(self.lp) + + self.ldb = SamDB(self.host, credentials=self.creds, + session_info=system_session(self.lp), + lp=self.lp) + self.base_dn = self.ldb.domain_dn() + self.ou_rdn = "OU=matchrulestest" + self.ou = self.ou_rdn + "," + self.base_dn + self.ou_users = "OU=users,%s" % self.ou + self.ou_groups = "OU=groups,%s" % self.ou + self.ou_computers = "OU=computers,%s" % self.ou + + try: + self.ldb.delete(self.ou, ["tree_delete:1"]) + except LdbError as e: + pass + + # Add a organizational unit to create objects + self.ldb.add({ + "dn": self.ou, + "objectclass": "organizationalUnit"}) + + self.addCleanup(self.ldb.delete, self.ou, controls=['tree_delete:0']) + + + # Add the following OU hierarchy and set otherWellKnownObjects, + # which has BinaryDN syntax: + # + # o1 + # |--> o2 + # | |--> o3 + # | | |-->o4 + + self.ldb.add({ + "dn": "OU=o1,%s" % self.ou, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": "OU=o2,OU=o1,%s" % self.ou, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": "OU=o3,OU=o2,OU=o1,%s" % self.ou, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou, + "objectclass": "organizationalUnit"}) + + m = Message() + m.dn = Dn(self.ldb, self.ou) + m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000001:OU=o1,%s" % self.ou, + FLAG_MOD_ADD, "otherWellKnownObjects") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "OU=o1,%s" % self.ou) + m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000002:OU=o2,OU=o1,%s" % self.ou, + FLAG_MOD_ADD, "otherWellKnownObjects") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "OU=o2,OU=o1,%s" % self.ou) + m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000003:OU=o3,OU=o2,OU=o1,%s" % self.ou, + FLAG_MOD_ADD, "otherWellKnownObjects") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou) + m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou, + FLAG_MOD_ADD, "otherWellKnownObjects") + self.ldb.modify(m) + + # Create OU for users and groups + self.ldb.add({ + "dn": self.ou_users, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": self.ou_groups, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": self.ou_computers, + "objectclass": "organizationalUnit"}) + + # Add four groups + self.ldb.add({ + "dn": "cn=g1,%s" % self.ou_groups, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=g2,%s" % self.ou_groups, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=g4,%s" % self.ou_groups, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=g3,%s" % self.ou_groups, + "objectclass": "group"}) + + # Add four users + self.ldb.add({ + "dn": "cn=u1,%s" % self.ou_users, + "objectclass": "user"}) + self.ldb.add({ + "dn": "cn=u2,%s" % self.ou_users, + "objectclass": "user"}) + self.ldb.add({ + "dn": "cn=u3,%s" % self.ou_users, + "objectclass": "user"}) + self.ldb.add({ + "dn": "cn=u4,%s" % self.ou_users, + "objectclass": "user"}) + + # Add computers to test Object(DN-Binary) syntax + self.ldb.add({ + "dn": "cn=c1,%s" % self.ou_computers, + "objectclass": "computer", + "dNSHostName": "c1.%s" % self.lp.get("realm").lower(), + "servicePrincipalName": ["HOST/c1"], + "sAMAccountName": "c1$", + "userAccountControl": "83890178"}) + + self.ldb.add({ + "dn": "cn=c2,%s" % self.ou_computers, + "objectclass": "computer", + "dNSHostName": "c2.%s" % self.lp.get("realm").lower(), + "servicePrincipalName": ["HOST/c2"], + "sAMAccountName": "c2$", + "userAccountControl": "83890178"}) + + self.ldb.add({ + "dn": "cn=c3,%s" % self.ou_computers, + "objectclass": "computer", + "dNSHostName": "c3.%s" % self.lp.get("realm").lower(), + "servicePrincipalName": ["HOST/c3"], + "sAMAccountName": "c3$", + "userAccountControl": "83890178"}) + + # Create the following hierarchy: + # g4 + # |--> u4 + # |--> g3 + # | |--> u3 + # | |--> g2 + # | | |--> u2 + # | | |--> g1 + # | | | |--> u1 + + # u1 member of g1 + m = Message() + m.dn = Dn(self.ldb, "CN=g1,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u1,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # u2 member of g2 + m = Message() + m.dn = Dn(self.ldb, "CN=g2,%s" % self.ou_groups) + m["member"] = MessageElement("cn=u2,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # u3 member of g3 + m = Message() + m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u3,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # u4 member of g4 + m = Message() + m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups) + m["member"] = MessageElement("cn=u4,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # g3 member of g4 + m = Message() + m.dn = Dn(self.ldb, "CN=g4,%s" % self.ou_groups) + m["member"] = MessageElement("cn=g3,%s" % self.ou_groups, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # g2 member of g3 + m = Message() + m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups) + m["member"] = MessageElement("CN=g2,%s" % self.ou_groups, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # g1 member of g2 + m = Message() + m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups) + m["member"] = MessageElement("cn=g1,%s" % self.ou_groups, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # Add a couple of ms-Exch-Configuration-Container to test forward-link + # attributes without backward link (addressBookRoots2) + # e1 + # |--> e2 + # | |--> c1 + self.ldb.add({ + "dn": "cn=e1,%s" % self.ou, + "objectclass": "msExchConfigurationContainer"}) + self.ldb.add({ + "dn": "cn=e2,%s" % self.ou, + "objectclass": "msExchConfigurationContainer"}) + + m = Message() + m.dn = Dn(self.ldb, "cn=e2,%s" % self.ou) + m["e1"] = MessageElement("cn=c1,%s" % self.ou_computers, + FLAG_MOD_ADD, "addressBookRoots2") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "cn=e1,%s" % self.ou) + m["e1"] = MessageElement("cn=e2,%s" % self.ou, + FLAG_MOD_ADD, "addressBookRoots2") + self.ldb.modify(m) + + + +class MatchRulesTests(MatchRulesTestsBase): + def setUp(self): + self.sambaopts = sambaopts + self.credopts = credopts + self.host = host + super().setUp() + + # The msDS-RevealedUsers is owned by system and cannot be modified + # directly. Set the schemaUpgradeInProgress flag as workaround + # and create this hierarchy: + # ou=computers + # |-> c1 + # | |->c2 + # | | |->u1 + + # + # While appropriate for this test, this is NOT a good practice + # in general. This is only done here because the alternative + # is to make a schema modification. + # + # IF/WHEN Samba protects this attribute better, this + # particular part of the test can be removed, as the same code + # is covered by the addressBookRoots2 case well enough. + # + m = Message() + m.dn = Dn(self.ldb, "") + m["e1"] = MessageElement("1", FLAG_MOD_REPLACE, "schemaUpgradeInProgress") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "cn=c2,%s" % self.ou_computers) + m["e1"] = MessageElement("B:8:01010101:cn=c3,%s" % self.ou_computers, + FLAG_MOD_ADD, "msDS-RevealedUsers") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "cn=c1,%s" % self.ou_computers) + m["e1"] = MessageElement("B:8:01010101:cn=c2,%s" % self.ou_computers, + FLAG_MOD_ADD, "msDS-RevealedUsers") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "") + m["e1"] = MessageElement("0", FLAG_MOD_REPLACE, "schemaUpgradeInProgress") + self.ldb.modify(m) + + + def test_u1_member_of_g4(self): + # Search without transitive match must return 0 results + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=u1,%s" % self.ou_users, + scope=SCOPE_BASE, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + # Search with transitive match must return 1 results + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search("cn=u1,%s" % self.ou_users, + scope=SCOPE_BASE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=u1,%s" % self.ou_users).lower()) + + def test_g1_member_of_g4(self): + # Search without transitive match must return 0 results + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g1,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + # Search with transitive match must return 1 results + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search("cn=g1,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g1,%s" % self.ou_groups).lower()) + + def test_u1_groups(self): + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g1,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g1,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + def test_u2_groups(self): + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g2,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + def test_u3_groups(self): + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g3,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + def test_u4_groups(self): + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_users, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + def test_extended_dn_u1(self): + res1 = self.ldb.search("cn=u1,%s" % self.ou_users, + scope=SCOPE_BASE, + expression="objectClass=*", + attrs=['objectSid', 'objectGUID']) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("cn=u1,%s" % self.ou_users).lower()) + + sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0]).decode('utf8') + guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0]).decode('utf8') + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<SID=%s>" % sid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g1,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g1,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g1,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g1,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g1,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g1,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + def test_extended_dn_u2(self): + res1 = self.ldb.search("cn=u2,%s" % self.ou_users, + scope=SCOPE_BASE, + expression="objectClass=*", + attrs=['objectSid', 'objectGUID']) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("cn=u2,%s" % self.ou_users).lower()) + + sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0]).decode('utf8') + guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0]).decode('utf8') + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<SID=%s>" % sid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g2,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g2,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g2,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + def test_extended_dn_u3(self): + res1 = self.ldb.search("cn=u3,%s" % self.ou_users, + scope=SCOPE_BASE, + expression="objectClass=*", + attrs=['objectSid', 'objectGUID']) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("cn=u3,%s" % self.ou_users).lower()) + + sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0]).decode('utf8') + guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0]).decode('utf8') + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<SID=%s>" % sid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g3,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g3,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=g3,%s" % self.ou_groups).lower() in dn_list) + self.assertTrue(("CN=g4,%s" % self.ou_groups).lower() in dn_list) + + def test_extended_dn_u4(self): + res1 = self.ldb.search("cn=u4,%s" % self.ou_users, + scope=SCOPE_BASE, + expression="objectClass=*", + attrs=['objectSid', 'objectGUID']) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("cn=u4,%s" % self.ou_users).lower()) + + sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0]).decode('utf8') + guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0]).decode('utf8') + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<SID=%s>" % sid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + def test_object_dn_binary(self): + res1 = self.ldb.search(self.ou_computers, + scope=SCOPE_SUBTREE, + expression="msDS-RevealedUsers=B:8:01010101:cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=c2,%s" % self.ou_computers).lower()) + + res1 = self.ldb.search(self.ou_computers, + scope=SCOPE_ONELEVEL, + expression="msDS-RevealedUsers=B:8:01010101:cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=c2,%s" % self.ou_computers).lower()) + + res1 = self.ldb.search(self.ou_computers, + scope=SCOPE_SUBTREE, + expression="msDS-RevealedUsers:1.2.840.113556.1.4.1941:=B:8:01010101:cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=c1,%s" % self.ou_computers).lower() in dn_list) + self.assertTrue(("CN=c2,%s" % self.ou_computers).lower() in dn_list) + + res1 = self.ldb.search(self.ou_computers, + scope=SCOPE_ONELEVEL, + expression="msDS-RevealedUsers:1.2.840.113556.1.4.1941:=B:8:01010101:cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=c1,%s" % self.ou_computers).lower() in dn_list) + self.assertTrue(("CN=c2,%s" % self.ou_computers).lower() in dn_list) + + def test_one_way_links(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="addressBookRoots2=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=e2,%s" % self.ou).lower()) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_ONELEVEL, + expression="addressBookRoots2=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=e2,%s" % self.ou).lower()) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="addressBookRoots2:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=e1,%s" % self.ou).lower() in dn_list) + self.assertTrue(("CN=e2,%s" % self.ou).lower() in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_ONELEVEL, + expression="addressBookRoots2:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn).lower() for res in res1] + self.assertTrue(("CN=e1,%s" % self.ou).lower() in dn_list) + self.assertTrue(("CN=e2,%s" % self.ou).lower() in dn_list) + + def test_not_linked_attrs(self): + res1 = self.ldb.search(self.base_dn, + scope=SCOPE_BASE, + expression="wellKnownObjects=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), self.base_dn.lower()) + + def test_invalid_basedn(self): + res1 = self.ldb.search(self.base_dn, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=c1,ou=computers,ou=matchrulestest,%sXX" % self.base_dn) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.base_dn, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=XX,ou=computers,ou=matchrulestest,%s" % self.base_dn) + self.assertEqual(len(res1), 0) + + def test_subtree(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="otherWellKnownObjects=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("OU=o3,OU=o2,OU=o1,%s" % self.ou).lower()) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_ONELEVEL, + expression="otherWellKnownObjects=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="otherWellKnownObjects:1.2.840.113556.1.4.1941:=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_ONELEVEL, + expression="otherWellKnownObjects:1.2.840.113556.1.4.1941:=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou) + self.assertEqual(len(res1), 0) + + def test_unknown_oid(self): + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:2.4.681.226012.2.8.3882:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:8.16.8720.1008448.8.32.15528:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.3.4:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + def test_nul_text(self): + self.assertRaises((ValueError,TypeError), + lambda: self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="\00member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)) + self.assertRaises((ValueError,TypeError), + lambda: self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840\00.113556.1.4.1941:=cn=u1,%s" % self.ou_users)) + self.assertRaises((ValueError,TypeError), + lambda: self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1\00,%s" % self.ou_users)) + self.assertRaises(LdbError, + lambda: self.ldb.search("cn=\00g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)) + self.assertRaises(LdbError, + lambda: self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:")) + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=") + self.assertEqual(len(res1), 0) + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member=") + self.assertEqual(len(res1), 0) + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=nonexistent") + self.assertEqual(len(res1), 0) + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member=nonexistent") + self.assertEqual(len(res1), 0) + self.assertRaises(LdbError, + lambda: self.ldb.search("cn=\00g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:cn=u1,%s" % self.ou_users)) + self.assertRaises(LdbError, + lambda: self.ldb.search("cn=\00g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1")) + self.assertRaises(LdbError, + lambda: self.ldb.search("cn=\00g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=")) + self.assertRaises(LdbError, + lambda: self.ldb.search("cn=\00g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member::=cn=u1,%s" % self.ou_users)) + + def test_misc_matches(self): + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_BASE, + expression="member=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g1,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g2,%s" % self.ou_groups) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g2,%s" % self.ou_groups) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g1,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="member:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_BASE, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), ("CN=g3,%s" % self.ou_groups)) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), ("CN=g3,%s" % self.ou_groups)) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_BASE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_ONELEVEL, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou_groups, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + +class MatchRuleConditionTests(samba.tests.TestCase): + def setUp(self): + super(MatchRuleConditionTests, self).setUp() + self.lp = sambaopts.get_loadparm() + self.creds = credopts.get_credentials(self.lp) + + self.ldb = SamDB(host, credentials=self.creds, + session_info=system_session(self.lp), + lp=self.lp) + self.base_dn = self.ldb.domain_dn() + self.ou = "OU=matchruleconditiontests,%s" % self.base_dn + self.ou_users = "OU=users,%s" % self.ou + self.ou_groups = "OU=groups,%s" % self.ou + self.ou_computers = "OU=computers,%s" % self.ou + + # Add a organizational unit to create objects + self.ldb.add({ + "dn": self.ou, + "objectclass": "organizationalUnit"}) + + # Create users, groups, and computers + self.ldb.add({ + "dn": self.ou_users, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": self.ou_groups, + "objectclass": "organizationalUnit"}) + self.ldb.add({ + "dn": self.ou_computers, + "objectclass": "organizationalUnit"}) + + self.ldb.add({ + "dn": "cn=g1,%s" % self.ou_groups, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=g2,%s" % self.ou_groups, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=g3,%s" % self.ou_groups, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=g4,%s" % self.ou_groups, + "objectclass": "group"}) + + self.ldb.add({ + "dn": "cn=u1,%s" % self.ou_users, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=u2,%s" % self.ou_users, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=u3,%s" % self.ou_users, + "objectclass": "group"}) + self.ldb.add({ + "dn": "cn=u4,%s" % self.ou_users, + "objectclass": "group"}) + + self.ldb.add({ + "dn": "cn=c1,%s" % self.ou_computers, + "objectclass": "user"}) + + self.ldb.add({ + "dn": "cn=c2,%s" % self.ou_computers, + "objectclass": "user"}) + + self.ldb.add({ + "dn": "cn=c3,%s" % self.ou_computers, + "objectclass": "user"}) + + self.ldb.add({ + "dn": "cn=c4,%s" % self.ou_computers, + "objectclass": "user"}) + + # Assign groups according to the following structure: + # g1-->g2---->g3 --g4 + # \ | / | / / | + # u1- >u2-- | u3<- | u4 + # \ \ \ \ | + # c1* >c2 ->c3 c4 + # *c1 is a member of u1, u2, u3, and u4 + + # u2 is a member of g1 and g2 + m = Message() + m.dn = Dn(self.ldb, "CN=g1,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u2,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=g2,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u2,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # g2 is a member of g1 + m = Message() + m.dn = Dn(self.ldb, "CN=g1,%s" % self.ou_groups) + m["member"] = MessageElement("CN=g2,%s" % self.ou_groups, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # g3 is a member of g2 + m = Message() + m.dn = Dn(self.ldb, "CN=g2,%s" % self.ou_groups) + m["member"] = MessageElement("CN=g3,%s" % self.ou_groups, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # u3 is a member of g3 and g4 + m = Message() + m.dn = Dn(self.ldb, "CN=g3,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u3,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=g4,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u3,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # u4 is a member of g4 + m = Message() + m.dn = Dn(self.ldb, "CN=g4,%s" % self.ou_groups) + m["member"] = MessageElement("CN=u4,%s" % self.ou_users, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # c1 is a member of u1, u2, u3, and u4 + m = Message() + m.dn = Dn(self.ldb, "CN=u1,%s" % self.ou_users) + m["member"] = MessageElement("CN=c1,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=u2,%s" % self.ou_users) + m["member"] = MessageElement("CN=c1,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=u3,%s" % self.ou_users) + m["member"] = MessageElement("CN=c1,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=u4,%s" % self.ou_users) + m["member"] = MessageElement("CN=c1,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # c2 is a member of u1 + m = Message() + m.dn = Dn(self.ldb, "CN=u1,%s" % self.ou_users) + m["member"] = MessageElement("CN=c2,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # c3 is a member of u2 and g3 + m = Message() + m.dn = Dn(self.ldb, "CN=u2,%s" % self.ou_users) + m["member"] = MessageElement("CN=c3,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=g3,%s" % self.ou_groups) + m["member"] = MessageElement("CN=c3,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + # c4 is a member of u4 and g4 + m = Message() + m.dn = Dn(self.ldb, "CN=u4,%s" % self.ou_users) + m["member"] = MessageElement("CN=c4,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + m = Message() + m.dn = Dn(self.ldb, "CN=g4,%s" % self.ou_groups) + m["member"] = MessageElement("CN=c4,%s" % self.ou_computers, + FLAG_MOD_ADD, "member") + self.ldb.modify(m) + + def tearDown(self): + super(MatchRuleConditionTests, self).tearDown() + self.ldb.delete(self.ou, controls=['tree_delete:0']) + + def test_g1_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 6) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + def test_g2_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=g2,%s" % self.ou_groups) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s" % self.ou_groups) + self.assertEqual(len(res1), 5) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=g2,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g1,%s" % self.ou_groups) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=g2,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g1,%s" % self.ou_groups) + + def test_g3_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=g3,%s" % self.ou_groups) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s" % self.ou_groups) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=g3,%s" % self.ou_groups) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g2,%s" % self.ou_groups) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=g3,%s" % self.ou_groups) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + + def test_g4_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups) + self.assertEqual(len(res1), 0) + + def test_u1_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c2,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c2,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + def test_u2_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + + def test_u3_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=c1,%s" % self.ou_computers) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=c1,%s" % self.ou_computers) + + def test_u4_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g4,%s" % self.ou_groups) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=g4,%s" % self.ou_groups) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + def test_c1_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u1,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 8) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u1,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + def test_c2_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=c2,%s" % self.ou_computers) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=u1,%s" % self.ou_users) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=c2,%s" % self.ou_computers) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=u1,%s" % self.ou_users) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=c2,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=c2,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + def test_c3_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 4) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=c3,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + def test_c4_members(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member=cn=c4,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="member:1.2.840.113556.1.4.1941:=cn=c4,%s" % self.ou_computers) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf=cn=c4,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression="memberOf:1.2.840.113556.1.4.1941:=cn=c4,%s" % self.ou_computers) + self.assertEqual(len(res1), 0) + + def test_or_member_queries(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(member:1.2.840.113556.1.4.1941:=cn=c1,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c2,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 8) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u1,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(member:1.2.840.113556.1.4.1941:=cn=c2,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c3,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 5) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u1,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(member:1.2.840.113556.1.4.1941:=cn=c2,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c4,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u1,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(member:1.2.840.113556.1.4.1941:=cn=c3,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c4,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 6) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(member:1.2.840.113556.1.4.1941:=cn=u1,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c4,%s))") % ( + self.ou_users, self.ou_computers)) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g4,%s" % self.ou_groups in dn_list) + + def test_and_member_queries(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(member:1.2.840.113556.1.4.1941:=cn=c1,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c2,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn), "CN=u1,%s" % self.ou_users) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(member:1.2.840.113556.1.4.1941:=cn=c2,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=c3,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 0) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(member:1.2.840.113556.1.4.1941:=cn=c3,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=u3,%s))") % ( + self.ou_computers, self.ou_users)) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=g1,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(member:1.2.840.113556.1.4.1941:=cn=c1,%s)" + "(member:1.2.840.113556.1.4.1941:=cn=u4,%s))") % ( + self.ou_computers, self.ou_computers)) + self.assertEqual(len(res1), 0) + + def test_or_memberOf_queries(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 6) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 6) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 8) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g2,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s))") % + (self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 5) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 7) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(|(memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 5) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u4,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c4,%s" % self.ou_computers in dn_list) + + def test_and_memberOf_queries(self): + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 5) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u2,%s" % self.ou_users in dn_list) + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=g3,%s" % self.ou_groups in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 3) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + self.assertTrue("CN=c3,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g2,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g3,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s))") % ( + self.ou_groups, self.ou_groups)) + self.assertEqual(len(res1), 2) + dn_list = [str(res.dn) for res in res1] + self.assertTrue("CN=u3,%s" % self.ou_users in dn_list) + self.assertTrue("CN=c1,%s" % self.ou_computers in dn_list) + + res1 = self.ldb.search(self.ou, + scope=SCOPE_SUBTREE, + expression=("(&(memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s)" + "(memberOf:1.2.840.113556.1.4.1941:=cn=c1,%s))") % ( + self.ou_groups, self.ou_computers)) + self.assertEqual(len(res1), 0) + +if __name__ == "__main__": + + parser = optparse.OptionParser("match_rules.py [options] <host>") + sambaopts = options.SambaOptions(parser) + parser.add_option_group(sambaopts) + parser.add_option_group(options.VersionOptions(parser)) + + # use command line creds if available + credopts = options.CredentialsOptions(parser) + parser.add_option_group(credopts) + opts, args = parser.parse_args() + subunitopts = SubunitOptions(parser) + parser.add_option_group(subunitopts) + + if len(args) < 1: + parser.print_usage() + sys.exit(1) + + host = args[0] + + if "://" not in host: + if os.path.isfile(host): + host = "tdb://%s" % host + else: + host = "ldap://%s" % host + + TestProgram(module=__name__, opts=subunitopts) diff --git a/lib/ldb-samba/tests/match_rules_remote.py b/lib/ldb-samba/tests/match_rules_remote.py new file mode 100755 index 0000000..122231f --- /dev/null +++ b/lib/ldb-samba/tests/match_rules_remote.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 + +import optparse +import sys +import os +import samba +import samba.getopt as options + +from samba.tests.subunitrun import SubunitOptions, TestProgram + +from samba.samdb import SamDB +from samba.auth import system_session +from samba import sd_utils +from samba.ndr import ndr_unpack +from ldb import Message, MessageElement, Dn, LdbError +from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE +from ldb import SCOPE_BASE, SCOPE_SUBTREE, SCOPE_ONELEVEL + +from match_rules import MatchRulesTestsBase + + +class MatchRulesTestsUser(MatchRulesTestsBase): + def setUp(self): + self.sambaopts = sambaopts + self.credopts = credopts + self.host = host + super().setUp() + self.sd_utils = sd_utils.SDUtils(self.ldb) + + self.user_pass = "samba123@" + self.match_test_user = "matchtestuser" + self.ldb.newuser(self.match_test_user, + self.user_pass, + userou=self.ou_rdn) + user_creds = self.insta_creds(template=self.creds, + username=self.match_test_user, + userpass=self.user_pass) + self.user_ldb = SamDB(host, credentials=user_creds, lp=self.lp) + token_res = self.user_ldb.search(scope=SCOPE_BASE, + base="", + attrs=["tokenGroups"]) + self.user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, + token_res[0]["tokenGroups"][0]) + + self.member_attr_guid = "bf9679c0-0de6-11d0-a285-00aa003049e2" + + def test_with_denied_link(self): + + # add an ACE that denies the user Read Property (RP) access to + # the member attr (which is similar to making the attribute + # confidential) + ace = "(OD;;RP;{0};;{1})".format(self.member_attr_guid, + self.user_sid) + g2_dn = Dn(self.ldb, "CN=g2,%s" % self.ou_groups) + + # add the ACE that denies access to the attr under test + self.sd_utils.dacl_add_ace(g2_dn, ace) + + # Search without transitive match must return 0 results + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + # Search with transitive match must return 1 results + res1 = self.ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 1) + self.assertEqual(str(res1[0].dn).lower(), ("CN=g4,%s" % self.ou_groups).lower()) + + # Search as a user match must return 0 results as the intermediate link can't be seen + res1 = self.user_ldb.search("cn=g4,%s" % self.ou_groups, + scope=SCOPE_BASE, + expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users) + self.assertEqual(len(res1), 0) + + + +parser = optparse.OptionParser("match_rules_remote.py [options] <host>") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) + +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +opts, args = parser.parse_args() +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +host = args[0] + +if "://" not in host: + if os.path.isfile(host): + host = "tdb://%s" % host + else: + host = "ldap://%s" % host + +TestProgram(module=__name__, opts=subunitopts) |