diff options
Diffstat (limited to 'lib/ldb/tests/python/index.py')
-rwxr-xr-x | lib/ldb/tests/python/index.py | 1454 |
1 files changed, 1454 insertions, 0 deletions
diff --git a/lib/ldb/tests/python/index.py b/lib/ldb/tests/python/index.py new file mode 100755 index 0000000..c1da76d --- /dev/null +++ b/lib/ldb/tests/python/index.py @@ -0,0 +1,1454 @@ +#!/usr/bin/env python3 +# +# Tests for truncated index keys +# +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +"""Tests for truncated index keys + +Databases such as lmdb have a maximum key length, these tests ensure that +ldb behaves correctly in those circumstances. + +""" + +import os +from unittest import TestCase +import sys +import ldb +import shutil + + +TDB_PREFIX = "tdb://" +MDB_PREFIX = "mdb://" + + +def tempdir(): + import tempfile + try: + dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp") + except KeyError: + dir_prefix = None + return tempfile.mkdtemp(dir=dir_prefix) + + +def contains(result, dn): + if result is None: + return False + + for r in result: + if str(r["dn"]) == dn: + return True + return False + + +class LdbBaseTest(TestCase): + def setUp(self): + super(LdbBaseTest, self).setUp() + try: + if self.prefix is None: + self.prefix = TDB_PREFIX + except AttributeError: + self.prefix = TDB_PREFIX + + def tearDown(self): + super(LdbBaseTest, self).tearDown() + + def url(self): + return self.prefix + self.filename + + def flags(self): + if self.prefix == MDB_PREFIX: + return ldb.FLG_NOSYNC + else: + return 0 + + +class MaxIndexKeyLengthTests(LdbBaseTest): + def checkGuids(self, key, guids): + # + # This check relies on the current implementation where the indexes + # are in the same database as the data. + # + # It checks that the index record exists, unless guids is None then + # the record must not exist. And the it contains the expected guid + # entries. + # + # The caller needs to provide the GUID's in the expected order + # + res = self.l.search( + base=key, + scope=ldb.SCOPE_BASE) + if guids is None: + self.assertEqual(len(res), 0) + return + self.assertEqual(len(res), 1) + + # The GUID index format has only one value + index = res[0]["@IDX"][0] + self.assertEqual(len(guids), len(index)) + self.assertEqual(guids, index) + + def tearDown(self): + shutil.rmtree(self.testdir) + super(MaxIndexKeyLengthTests, self).tearDown() + + # Ensure the LDB is closed now, so we close the FD + del(self.l) + + def setUp(self): + super(MaxIndexKeyLengthTests, self).setUp() + self.testdir = tempdir() + self.filename = os.path.join(self.testdir, "key_len_test.ldb") + # Note that the maximum key length is set to 54 + # This accounts for the 4 bytes added by the dn formatting + # a leading dn=, and a trailing zero terminator + # + self.l = ldb.Ldb(self.url(), + options=[ + "modules:rdn_name", + "max_key_len_for_self_test:54"]) + self.l.add({"dn": "@ATTRIBUTES", + "uniqueThing": "UNIQUE_INDEX"}) + self.l.add({"dn": "@INDEXLIST", + "@IDXATTR": [ + b"uniqueThing", + b"notUnique", + b"base64____lt", + b"base64_____eq", + b"base64______gt"], + "@IDXONE": [b"1"], + "@IDXGUID": [b"objectUUID"], + "@IDX_DN_GUID": [b"GUID"]}) + + # Add a value to a unique index that exceeds the maximum key length + # This should be rejected. + def test_add_long_unique_add(self): + try: + self.l.add({"dn": "OU=UNIQUE_MAX_LEN,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef", + "uniqueThing": "01234567890123456789012345678901"}) + # index key will be + # "@INDEX:UNIQUETHING:01234567890123456789012345678901" + self.fail("Should have failed on long index key") + + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION) + + # Test that DN's longer the maximum key length can be added + # and that duplicate DN's are rejected correctly + def test_add_long_dn_add(self): + # + # For all entries the DN index key gets truncated to + # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA + # + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM", + "objectUUID": b"0123456789abcde0"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde0" + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcdef") + + # Key is equal to max length does not get inserted into the truncated + # key namespace + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde5"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + # This key should not get truncated, as it's one character less than + # max, and will not be in the truncate name space + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde7"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde7") + + try: + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcde2"}) + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS) + + try: + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM", + "objectUUID": b"0123456789abcde3"}) + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS) + + try: + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + "objectUUID": b"0123456789abcde4"}) + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS) + + try: + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde6"}) + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS) + + try: + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde8"}) + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS) + + def test_rename_truncated_dn_keys(self): + # For all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM", + "objectUUID": b"0123456789abcde0"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde0" + b"0123456789abcdef") + + # Non conflicting rename, should succeed + self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + # Index should be unchanged. + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde0" + b"0123456789abcdef") + + # Conflicting rename should fail + try: + self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM", + "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS) + + def test_delete_truncated_dn_keys(self): + # + # For all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA + # + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde1" + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde5"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + # Try to delete a non existent DN with a truncated key + try: + self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM") + except ldb.LdbError as err: + enum = err.args[0] + self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT) + # Ensure that non of the other truncated DN's got deleted + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG") + self.assertEqual(len(res), 1) + + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 1) + + # Ensure that the non truncated DN did not get deleted + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 1) + + # Check the indexes are correct + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde1" + b"0123456789abcdef") + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + # delete an existing entry + self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG") + + # Ensure it got deleted + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG") + self.assertEqual(len(res), 0) + + # Ensure that non of the other truncated DN's got deleted + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 1) + + # Ensure the non truncated entry did not get deleted. + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 1) + + # Check the indexes are correct + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde1") + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + # delete an existing entry + self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + + # Ensure it got deleted + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 0) + + # Ensure that non of the non truncated DN's got deleted + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 1) + # Check the indexes are correct + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + None) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + # delete an existing entry + self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA") + + # Ensure it got deleted + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBAxxx") + self.assertEqual(len(res), 0) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + None) + + def test_search_truncated_dn_keys(self): + # + # For all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA + # + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde1" + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde5"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG") + self.assertEqual(len(res), 1) + + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 1) + + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 1) + + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM") + self.assertEqual(len(res), 0) + + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 0) + + # Non existent, key one less than truncation limit + res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 0) + + def test_search_dn_filter_truncated_dn_keys(self): + # + # For all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA + # + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde1" + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde5"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + res = self.l.search( + expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG") + self.assertEqual(len(res), 1) + + res = self.l.search( + expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 1) + + res = self.l.search( + expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 1) + + res = self.l.search( + expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM") + self.assertEqual(len(res), 0) + + res = self.l.search( + expression="dn=OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV") + self.assertEqual(len(res), 0) + + # Non existent, key one less than truncation limit + res = self.l.search( + expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA") + self.assertEqual(len(res), 0) + + def test_search_one_level_truncated_dn_keys(self): + # + # Except for the base DN's + # all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=??,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA + # The base DN-s truncate to + # @INDEX:@IDXDN:OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR + # + self.l.add({"dn": "OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcdef"}) + self.l.add({"dn": "OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcd1f"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR", + b"0123456789abcd1f" + b"0123456789abcdef") + + self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde1"}) + self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcd11"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA", + b"0123456789abcd11" + b"0123456789abcde1") + + self.l.add({"dn": "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde2"}) + self.l.add({"dn": "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcdf2"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA", + b"0123456789abcde2" + b"0123456789abcdf2") + + self.l.add({"dn": "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde3"}) + self.l.add({"dn": "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcd13"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA", + b"0123456789abcd13" + b"0123456789abcde3") + + # This key is not truncated as it's the max_key_len + self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA", + "objectUUID": b"0123456789abcde7"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA", + b"0123456789abcde7") + + res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1", + scope=ldb.SCOPE_ONELEVEL) + self.assertEqual(len(res), 3) + self.assertTrue( + contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1")) + + res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2", + scope=ldb.SCOPE_ONELEVEL) + self.assertEqual(len(res), 3) + self.assertTrue( + contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2")) + + res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA", + scope=ldb.SCOPE_ONELEVEL) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA")) + + def test_search_sub_tree_truncated_dn_keys(self): + # + # Except for the base DN's + # all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=??,OU=A_LONG_DN_SUB_TREE,DC=SAMBA + # The base DN-s truncate to + # @INDEX:@IDXDN:OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR + # + self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcdef"}) + self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcde4"}) + self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3", + "objectUUID": b"0123456789abcde8"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR", + b"0123456789abcde4" + b"0123456789abcde8" + b"0123456789abcdef") + + self.l.add({"dn": "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde1"}) + self.l.add({"dn": "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcde5"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA", + b"0123456789abcde1" + b"0123456789abcde5") + + self.l.add({"dn": "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde2"}) + self.l.add({"dn": "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcde6"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA", + b"0123456789abcde2" + b"0123456789abcde6") + + self.l.add({"dn": "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde3"}) + + self.l.add({"dn": "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcde7"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA", + b"0123456789abcde3" + b"0123456789abcde7") + + self.l.add({"dn": "OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4", + "objectUUID": b"0123456789abcde9"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA", + b"0123456789abcde9") + + res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1", + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(len(res), 4) + self.assertTrue( + contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1")) + + res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2", + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(len(res), 4) + self.assertTrue( + contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2")) + + res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3", + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3")) + + res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4", + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4")) + + def test_search_base_truncated_dn_keys(self): + # + # For all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA + # + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + "objectUUID": b"0123456789abcdef"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde1" + b"0123456789abcdef") + + self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + "objectUUID": b"0123456789abcde5"}) + self.checkGuids( + "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + b"0123456789abcde5") + + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_BASE) + self.assertEqual(len(res), 1) + + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + scope=ldb.SCOPE_BASE) + self.assertEqual(len(res), 1) + + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA", + scope=ldb.SCOPE_BASE) + self.assertEqual(len(res), 1) + + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM", + scope=ldb.SCOPE_BASE) + self.assertEqual(len(res), 0) + + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV", + scope=ldb.SCOPE_BASE) + self.assertEqual(len(res), 0) + + # Non existent, key one less than truncation limit + res = self.l.search( + base="OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA", + scope=ldb.SCOPE_BASE) + self.assertEqual(len(res), 0) + + # + # Test non unique index searched with truncated keys + # + def test_index_truncated_keys(self): + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + + eq_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + lt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + # > than max length and differs in values that will be truncated + gt_max_b = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab" + + # Add two entries with the same value, key length = max so no + # truncation. + self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": eq_max, + "objectUUID": b"0123456789abcde0"}) + self.checkGuids( + "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0") + + self.l.add({"dn": "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": eq_max, + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde1") + + # + # An entry outside the tree + # + self.l.add({"dn": "OU=10,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG", + "notUnique": eq_max, + "objectUUID": b"0123456789abcd11"}) + self.checkGuids( + "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcd11" + b"0123456789abcde0" + b"0123456789abcde1") + + # Key longer than max so should get truncated to same key as + # the previous two entries + self.l.add({"dn": "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": gt_max, + "objectUUID": b"0123456789abcde2"}) + # But in the truncated key space + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde2") + + # Key longer than max so should get truncated to same key as + # the previous entries but differs in the chars after max length + self.l.add({"dn": "OU=23,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": gt_max_b, + "objectUUID": b"0123456789abcd22"}) + # But in the truncated key space + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcd22" + b"0123456789abcde2") + # + # An entry outside the tree + # + self.l.add({"dn": "OU=11,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG", + "notUnique": gt_max, + "objectUUID": b"0123456789abcd12"}) + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcd12" + b"0123456789abcd22" + b"0123456789abcde2") + + # Key shorter than max + # + self.l.add({"dn": "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": lt_max, + "objectUUID": b"0123456789abcde3"}) + self.checkGuids( + "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde3") + # + # An entry outside the tree + # + self.l.add({"dn": "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG", + "notUnique": lt_max, + "objectUUID": b"0123456789abcd13"}) + self.checkGuids( + "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcd13" + b"0123456789abcde3") + + # + # search for target is max value not truncated + # should return ou's 01, 02 + # + expression = "(notUnique=" + eq_max.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 2) + self.assertTrue( + contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + # + # search for target is max value not truncated + # search one level up the tree, scope is ONE_LEVEL + # So should get no matches + # + expression = "(notUnique=" + eq_max.decode('ascii') + ")" + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 0) + # + # search for target is max value not truncated + # search one level up the tree, scope is SUBTREE + # So should get 3 matches + # + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_SUBTREE, + expression=expression) + self.assertEqual(len(res), 3) + self.assertTrue( + contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=10,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG")) + # + # search for target is max value + 1 so truncated + # should return ou 23 as it's gt_max_b being searched for + # + expression = "(notUnique=" + gt_max_b.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=23,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + # + # search for target is max value + 1 so truncated + # should return ou 03 as it's gt_max being searched for + # + expression = "(notUnique=" + gt_max.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + # + # scope one level and one level up one level up should get no matches + # + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 0) + # + # scope sub tree and one level up one level up should get 2 matches + # + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_SUBTREE, + expression=expression) + self.assertEqual(len(res), 2) + self.assertTrue( + contains(res, "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=11,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG")) + + # + # search for target is max value - 1 so not truncated + # should return ou 04 + # + expression = "(notUnique=" + lt_max.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + # + # scope one level and one level up one level up should get no matches + # + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 0) + + # + # scope sub tree and one level up one level up should get 2 matches + # + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_SUBTREE, + expression=expression) + self.assertEqual(len(res), 2) + self.assertTrue( + contains(res, "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG")) + + # + # Test index key truncation for base64 encoded values + # + def test_index_truncated_base64_encoded_keys(self): + value = b"aaaaaaaaaaaaaaaaaaaa\x02" + # base64 encodes to "YWFhYWFhYWFhYWFhYWFhYWFhYWEC" + + # One less than max key length + self.l.add({"dn": "OU=01,OU=BASE64,DC=SAMBA,DC=ORG", + "base64____lt": value, + "objectUUID": b"0123456789abcde0"}) + self.checkGuids( + "@INDEX:BASE64____LT::YWFhYWFhYWFhYWFhYWFhYWFhYWEC", + b"0123456789abcde0") + + # Equal max key length + self.l.add({"dn": "OU=02,OU=BASE64,DC=SAMBA,DC=ORG", + "base64_____eq": value, + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX:BASE64_____EQ::YWFhYWFhYWFhYWFhYWFhYWFhYWEC", + b"0123456789abcde1") + + # One greater than max key length + self.l.add({"dn": "OU=03,OU=BASE64,DC=SAMBA,DC=ORG", + "base64______gt": value, + "objectUUID": b"0123456789abcde2"}) + self.checkGuids( + "@INDEX#BASE64______GT##YWFhYWFhYWFhYWFhYWFhYWFhYWE", + b"0123456789abcde2") + # + # Test adding to non unique index with identical multivalued index + # attributes + # + + def test_index_multi_valued_identical_keys(self): + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + as_eq_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + bs_eq_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + + try: + self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": [bs_eq_max, as_eq_max, as_eq_max], + "objectUUID": b"0123456789abcde0"}) + self.fail("Exception not thrown") + except ldb.LdbError as e: + code = e.args[0] + self.assertEqual(ldb.ERR_ATTRIBUTE_OR_VALUE_EXISTS, code) + + # + # Test non unique index with multivalued index attributes + # searched with non truncated keys + # + def test_search_index_multi_valued_truncated_keys(self): + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + + aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab" + bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + + self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": [aa_gt_max, ab_gt_max, bb_gt_max], + "objectUUID": b"0123456789abcde0"}) + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + + expression = "(notUnique=" + aa_gt_max.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + expression = "(notUnique=" + ab_gt_max.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + expression = "(notUnique=" + bb_gt_max.decode('ascii') + ")" + res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_ONELEVEL, + expression=expression) + self.assertEqual(len(res), 1) + self.assertTrue( + contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + # + # Test deletion of records with non unique index with multivalued index + # attributes + # replicate this to test modify with modify flags i.e. DELETE, REPLACE + # + def test_delete_index_multi_valued_truncated_keys(self): + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + + aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab" + bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + cc_gt_max = b"cccccccccccccccccccccccccccccccccc" + + self.l.add({"dn": "OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": [aa_gt_max, ab_gt_max, bb_gt_max], + "objectUUID": b"0123456789abcde0"}) + self.l.add({"dn": "OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": [aa_gt_max, ab_gt_max, cc_gt_max], + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde0" + + b"0123456789abcde1" + b"0123456789abcde1") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + res = self.l.search( + base="DC=SAMBA,DC=ORG", + expression="(notUnique=" + aa_gt_max.decode("ascii") + ")") + self.assertEqual(2, len(res)) + self.assertTrue( + contains(res, "OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + self.l.delete("OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG") + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + None) + + self.l.delete("OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG") + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + None) + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + None) + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + None) + + # + # Test modification of records with non unique index with multivalued index + # attributes + # + def test_modify_index_multi_valued_truncated_keys(self): + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + + aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab" + bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + cc_gt_max = b"cccccccccccccccccccccccccccccccccc" + + self.l.add({"dn": "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": [aa_gt_max, ab_gt_max, bb_gt_max], + "objectUUID": b"0123456789abcde0"}) + self.l.add({"dn": "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG", + "notUnique": [aa_gt_max, ab_gt_max, cc_gt_max], + "objectUUID": b"0123456789abcde1"}) + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde0" + + b"0123456789abcde1" + b"0123456789abcde1") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + res = self.l.search( + base="DC=SAMBA,DC=ORG", + expression="(notUnique=" + aa_gt_max.decode("ascii") + ")") + self.assertEqual(2, len(res)) + self.assertTrue( + contains(res, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")) + self.assertTrue( + contains(res, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")) + + # + # Modify that does not change the indexed attribute + # + msg = ldb.Message() + msg.dn = ldb.Dn(self.l, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG") + msg["notUnique"] = ldb.MessageElement( + [aa_gt_max, ab_gt_max, bb_gt_max], + ldb.FLAG_MOD_REPLACE, + "notUnique") + self.l.modify(msg) + # + # As the modify is replacing the attribute with the same contents + # there should be no changes to the indexes. + # + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde0" + + b"0123456789abcde1" + b"0123456789abcde1") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + # + # Modify that removes a value from the indexed attribute + # + msg = ldb.Message() + msg.dn = ldb.Dn(self.l, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG") + msg["notUnique"] = ldb.MessageElement( + [aa_gt_max, bb_gt_max], + ldb.FLAG_MOD_REPLACE, + "notUnique") + self.l.modify(msg) + + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + + b"0123456789abcde1" + b"0123456789abcde1") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + # + # Modify that does a constrained delete the indexed attribute + # + msg = ldb.Message() + msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG") + msg["notUnique"] = ldb.MessageElement( + [ab_gt_max], + ldb.FLAG_MOD_DELETE, + "notUnique") + self.l.modify(msg) + + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + b"0123456789abcde1") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + # + # Modify that does an unconstrained delete the indexed attribute + # + msg = ldb.Message() + msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG") + msg["notUnique"] = ldb.MessageElement( + [], + ldb.FLAG_MOD_DELETE, + "notUnique") + self.l.modify(msg) + + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + None) + + # + # Modify that adds a value to the indexed attribute + # + msg = ldb.Message() + msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG") + msg["notUnique"] = ldb.MessageElement( + [cc_gt_max], + ldb.FLAG_MOD_ADD, + "notUnique") + self.l.modify(msg) + + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + # + # Modify that adds a values to the indexed attribute + # + msg = ldb.Message() + msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG") + msg["notUnique"] = ldb.MessageElement( + [aa_gt_max, ab_gt_max], + ldb.FLAG_MOD_ADD, + "notUnique") + self.l.modify(msg) + + self.checkGuids( + "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + b"0123456789abcde0" + + b"0123456789abcde1" + b"0123456789abcde1") + self.checkGuids( + "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + b"0123456789abcde0") + self.checkGuids( + "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc", + b"0123456789abcde1") + + # + # Test Sub tree searches when checkBaseOnSearch is enabled and the + # DN indexes are truncated and collide. + # + def test_check_base_on_search_truncated_dn_keys(self): + # + # Except for the base DN's + # all entries the DN index key gets truncated to + # 0 1 2 3 4 5 + # 12345678901234567890123456789012345678901234567890 + # @INDEX:@IDXDN:OU=??,OU=CHECK_BASE_DN_XXXX,DC=SAMBA + # The base DN-s truncate to + # @INDEX:@IDXDN:OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR + # + checkbaseonsearch = {"dn": "@OPTIONS", + "checkBaseOnSearch": b"TRUE"} + self.l.add(checkbaseonsearch) + + self.l.add({"dn": "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcdef"}) + self.l.add({"dn": "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcdee"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR", + b"0123456789abcdee" + b"0123456789abcdef") + + self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcdec"}) + self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcdeb"}) + self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3", + "objectUUID": b"0123456789abcded"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA", + b"0123456789abcdeb" + b"0123456789abcdec" + b"0123456789abcded") + + self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1", + "objectUUID": b"0123456789abcde0"}) + self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2", + "objectUUID": b"0123456789abcde1"}) + self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3", + "objectUUID": b"0123456789abcde2"}) + self.checkGuids( + "@INDEX#@IDXDN#OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA", + b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcde2") + + res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1", + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(len(res), 3) + self.assertTrue( + contains(res, "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1")) + self.assertTrue( + contains(res, "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1")) + + res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2", + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(len(res), 3) + self.assertTrue( + contains(res, "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2")) + self.assertTrue( + contains(res, "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2")) + + try: + res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3", + scope=ldb.SCOPE_SUBTREE) + self.fail("Expected exception no thrown") + except ldb.LdbError as e: + code = e.args[0] + self.assertEqual(ldb.ERR_NO_SUCH_OBJECT, code) + + +# Run the index truncation tests against an lmdb backend +class MaxIndexKeyLengthTestsLmdb(MaxIndexKeyLengthTests): + + def setUp(self): + if os.environ.get('HAVE_LMDB', '1') == '0': + self.skipTest("No lmdb backend") + self.prefix = MDB_PREFIX + super(MaxIndexKeyLengthTestsLmdb, self).setUp() + + def tearDown(self): + super(MaxIndexKeyLengthTestsLmdb, self).tearDown() + + +class OrderedIntegerRangeTests(LdbBaseTest): + + def tearDown(self): + shutil.rmtree(self.testdir) + super(OrderedIntegerRangeTests, self).tearDown() + + # Ensure the LDB is closed now, so we close the FD + del(self.l) + + def setUp(self): + super(OrderedIntegerRangeTests, self).setUp() + self.testdir = tempdir() + self.filename = os.path.join(self.testdir, "ordered_integer_test.ldb") + + self.l = ldb.Ldb(self.url(), + options=self.options()) + self.l.add({"dn": "@ATTRIBUTES", + "int64attr": "ORDERED_INTEGER"}) + self.l.add({"dn": "@INDEXLIST", + "@IDXATTR": [b"int64attr"], + "@IDXONE": [b"1"], + "@IDXGUID": [b"objectUUID"], + "@IDX_DN_GUID": [b"GUID"]}) + + def options(self): + if self.prefix == MDB_PREFIX: + return ['modules:rdn_name', + 'disable_full_db_scan_for_self_test:1'] + else: + return ['modules:rdn_name'] + + def test_comparison_expression(self): + int64_max = 2**63-1 + int64_min = -2**63 + test_nums = list(range(-5, 5)) + test_nums += list(range(int64_max-5, int64_max+1)) + test_nums += list(range(int64_min, int64_min+5)) + test_nums = sorted(test_nums) + + for (i, num) in enumerate(test_nums): + ouuid = 0x0123456789abcdef + i + ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode()) + self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i), + "objectUUID": ouuid_s, + "int64attr": str(num)}) + + def assert_int64_expr(expr, py_expr=None): + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_SUBTREE, + expression="(int64attr%s)" % (expr)) + + if not py_expr: + py_expr = expr + expect = [n for n in test_nums if eval(str(n) + py_expr)] + vals = sorted([int(r.get("int64attr")[0]) for r in res]) + self.assertEqual(len(res), len(expect)) + self.assertEqual(set(vals), set(expect)) + self.assertEqual(expect, vals) + + assert_int64_expr(">=-2") + assert_int64_expr("<=2") + assert_int64_expr(">=" + str(int64_min)) + assert_int64_expr("<=" + str(int64_min)) + assert_int64_expr("<=" + str(int64_min+1)) + assert_int64_expr("<=" + str(int64_max)) + assert_int64_expr(">=" + str(int64_max)) + assert_int64_expr(">=" + str(int64_max-1)) + assert_int64_expr("=10", "==10") + + def test_comparison_expression_duplicates(self): + int64_max = 2**63-1 + int64_min = -2**63 + test_nums = list(range(-5, 5)) * 3 + test_nums += list(range(-20, 20, 5)) * 2 + test_nums += list(range(-50, 50, 15)) + test_nums = sorted(test_nums) + + for (i, num) in enumerate(test_nums): + ouuid = 0x0123456789abcdef + i + ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode()) + self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i), + "objectUUID": ouuid_s, + "int64attr": str(num)}) + + def assert_int64_expr(expr, py_expr=None): + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_SUBTREE, + expression="(int64attr%s)" % (expr)) + + if not py_expr: + py_expr = expr + expect = [n for n in test_nums if eval(str(n) + py_expr)] + vals = sorted([int(r.get("int64attr")[0]) for r in res]) + self.assertEqual(len(res), len(expect)) + self.assertEqual(set(vals), set(expect)) + self.assertEqual(expect, vals) + + assert_int64_expr(">=-2") + assert_int64_expr("<=2") + assert_int64_expr(">=" + str(int64_min)) + assert_int64_expr("<=" + str(int64_min)) + assert_int64_expr("<=" + str(int64_min+1)) + assert_int64_expr("<=" + str(int64_max)) + assert_int64_expr(">=" + str(int64_max)) + assert_int64_expr(">=" + str(int64_max-1)) + assert_int64_expr("=-5", "==-5") + assert_int64_expr("=5", "==5") + + +# Run the ordered integer range tests against an lmdb backend +class OrderedIntegerRangeTestsLmdb(OrderedIntegerRangeTests): + + def setUp(self): + if os.environ.get('HAVE_LMDB', '1') == '0': + self.skipTest("No lmdb backend") + self.prefix = MDB_PREFIX + super(OrderedIntegerRangeTestsLmdb, self).setUp() + + def tearDown(self): + super(OrderedIntegerRangeTestsLmdb, self).tearDown() + + +# Run the index truncation tests against an lmdb backend +class RejectSubDBIndex(LdbBaseTest): + + def setUp(self): + if os.environ.get('HAVE_LMDB', '1') == '0': + self.skipTest("No lmdb backend") + self.prefix = MDB_PREFIX + super(RejectSubDBIndex, self).setUp() + self.testdir = tempdir() + self.filename = os.path.join(self.testdir, + "reject_subidx_test.ldb") + self.l = ldb.Ldb(self.url(), + options=[ + "modules:rdn_name"]) + + def tearDown(self): + super(RejectSubDBIndex, self).tearDown() + + def test_try_subdb_index(self): + try: + self.l.add({"dn": "@INDEXLIST", + "@IDX_LMDB_SUBDB": [b"1"], + "@IDXONE": [b"1"], + "@IDXGUID": [b"objectUUID"], + "@IDX_DN_GUID": [b"GUID"], + }) + except ldb.LdbError as e: + code = e.args[0] + string = e.args[1] + self.assertEqual(ldb.ERR_OPERATIONS_ERROR, code) + self.assertIn("sub-database index", string) + + +if __name__ == '__main__': + import unittest + unittest.TestProgram() |