#!/usr/bin/env python3
#
# Tests for truncated index keys
#
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Tests for truncated index keys
Databases such as lmdb have a maximum key length, these tests ensure that
ldb behaves correctly in those circumstances.
"""
import os
from unittest import TestCase
import sys
import ldb
import shutil
TDB_PREFIX = "tdb://"
MDB_PREFIX = "mdb://"
def tempdir():
import tempfile
try:
dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
except KeyError:
dir_prefix = None
return tempfile.mkdtemp(dir=dir_prefix)
def contains(result, dn):
if result is None:
return False
for r in result:
if str(r["dn"]) == dn:
return True
return False
class LdbBaseTest(TestCase):
def setUp(self):
super(LdbBaseTest, self).setUp()
try:
if self.prefix is None:
self.prefix = TDB_PREFIX
except AttributeError:
self.prefix = TDB_PREFIX
def tearDown(self):
super(LdbBaseTest, self).tearDown()
def url(self):
return self.prefix + self.filename
def flags(self):
if self.prefix == MDB_PREFIX:
return ldb.FLG_NOSYNC
else:
return 0
class MaxIndexKeyLengthTests(LdbBaseTest):
def checkGuids(self, key, guids):
#
# This check relies on the current implementation where the indexes
# are in the same database as the data.
#
# It checks that the index record exists, unless guids is None then
# the record must not exist. And the it contains the expected guid
# entries.
#
# The caller needs to provide the GUID's in the expected order
#
res = self.l.search(
base=key,
scope=ldb.SCOPE_BASE)
if guids is None:
self.assertEqual(len(res), 0)
return
self.assertEqual(len(res), 1)
# The GUID index format has only one value
index = res[0]["@IDX"][0]
self.assertEqual(len(guids), len(index))
self.assertEqual(guids, index)
def tearDown(self):
shutil.rmtree(self.testdir)
super(MaxIndexKeyLengthTests, self).tearDown()
# Ensure the LDB is closed now, so we close the FD
del(self.l)
def setUp(self):
super(MaxIndexKeyLengthTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "key_len_test.ldb")
# Note that the maximum key length is set to 54
# This accounts for the 4 bytes added by the dn formatting
# a leading dn=, and a trailing zero terminator
#
self.l = ldb.Ldb(self.url(),
options=[
"modules:rdn_name",
"max_key_len_for_self_test:54"])
self.l.add({"dn": "@ATTRIBUTES",
"uniqueThing": "UNIQUE_INDEX"})
self.l.add({"dn": "@INDEXLIST",
"@IDXATTR": [
b"uniqueThing",
b"notUnique",
b"base64____lt",
b"base64_____eq",
b"base64______gt"],
"@IDXONE": [b"1"],
"@IDXGUID": [b"objectUUID"],
"@IDX_DN_GUID": [b"GUID"]})
# Add a value to a unique index that exceeds the maximum key length
# This should be rejected.
def test_add_long_unique_add(self):
try:
self.l.add({"dn": "OU=UNIQUE_MAX_LEN,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef",
"uniqueThing": "01234567890123456789012345678901"})
# index key will be
# "@INDEX:UNIQUETHING:01234567890123456789012345678901"
self.fail("Should have failed on long index key")
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
# Test that DN's longer the maximum key length can be added
# and that duplicate DN's are rejected correctly
def test_add_long_dn_add(self):
#
# For all entries the DN index key gets truncated to
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
#
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcdef")
# Key is equal to max length does not get inserted into the truncated
# key namespace
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
# This key should not get truncated, as it's one character less than
# max, and will not be in the truncate name space
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde7"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde7")
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcde2"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"objectUUID": b"0123456789abcde3"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde4"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde6"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde8"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
def test_rename_truncated_dn_keys(self):
# For all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcdef")
# Non conflicting rename, should succeed
self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
|