From 4f5791ebd03eaec1c7da0865a383175b05102712 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 19:47:29 +0200 Subject: Adding upstream version 2:4.17.12+dfsg. Signed-off-by: Daniel Baumann --- source4/dsdb/tests/python/rodc.py | 258 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100755 source4/dsdb/tests/python/rodc.py (limited to 'source4/dsdb/tests/python/rodc.py') diff --git a/source4/dsdb/tests/python/rodc.py b/source4/dsdb/tests/python/rodc.py new file mode 100755 index 0000000..1a4b0f9 --- /dev/null +++ b/source4/dsdb/tests/python/rodc.py @@ -0,0 +1,258 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import optparse +import sys +import os +import base64 +import random +import re +import uuid + +sys.path.insert(0, "bin/python") +import samba +from samba.tests.subunitrun import SubunitOptions, TestProgram + +import samba.getopt as options + +from samba.auth import system_session +import ldb +from samba.samdb import SamDB +from samba.ndr import ndr_pack, ndr_unpack +from samba.dcerpc import drsblobs + +import time + + +class RodcTestException(Exception): + pass + + +class RodcTests(samba.tests.TestCase): + + def setUp(self): + super(RodcTests, self).setUp() + self.samdb = SamDB(HOST, credentials=CREDS, + session_info=system_session(LP), lp=LP) + + self.base_dn = self.samdb.domain_dn() + + root = self.samdb.search(base='', scope=ldb.SCOPE_BASE, + attrs=['dsServiceName']) + self.service = root[0]['dsServiceName'][0] + self.tag = uuid.uuid4().hex + + def test_add_replicated_objects(self): + for o in ( + { + 'dn': "ou=%s1,%s" % (self.tag, self.base_dn), + "objectclass": "organizationalUnit" + }, + { + 'dn': "cn=%s2,%s" % (self.tag, self.base_dn), + "objectclass": "user" + }, + { + 'dn': "cn=%s3,%s" % (self.tag, self.base_dn), + "objectclass": "group" + }, + { + 'dn': "cn=%s4,%s" % (self.tag, self.service), + "objectclass": "NTDSConnection", + 'enabledConnection': 'TRUE', + 'fromServer': self.base_dn, + 'options': '0' + }, + ): + try: + self.samdb.add(o) + self.fail("Failed to fail to add %s" % o['dn']) + except ldb.LdbError as e: + (ecode, emsg) = e.args + if ecode != ldb.ERR_REFERRAL: + print(emsg) + self.fail("Adding %s: ldb error: %s %s, wanted referral" % + (o['dn'], ecode, emsg)) + else: + m = re.search(r'(ldap://[^>]+)>', emsg) + if m is None: + self.fail("referral seems not to refer to anything") + address = m.group(1) + + try: + tmpdb = SamDB(address, credentials=CREDS, + session_info=system_session(LP), lp=LP) + tmpdb.add(o) + tmpdb.delete(o['dn']) + except ldb.LdbError as e: + self.fail("couldn't modify referred location %s" % + address) + + if address.lower().startswith(self.samdb.domain_dns_name()): + self.fail("referral address did not give a specific DC") + + def test_modify_replicated_attributes(self): + # some timestamp ones + dn = 'CN=Guest,CN=Users,' + self.base_dn + value = 'hallooo' + for attr in ['carLicense', 'middleName']: + msg = ldb.Message() + msg.dn = ldb.Dn(self.samdb, dn) + msg[attr] = ldb.MessageElement(value, + ldb.FLAG_MOD_REPLACE, + attr) + try: + self.samdb.modify(msg) + self.fail("Failed to fail to modify %s %s" % (dn, attr)) + except ldb.LdbError as e1: + (ecode, emsg) = e1.args + if ecode != ldb.ERR_REFERRAL: + self.fail("Failed to REFER when trying to modify %s %s" % + (dn, attr)) + else: + m = re.search(r'(ldap://[^>]+)>', emsg) + if m is None: + self.fail("referral seems not to refer to anything") + address = m.group(1) + + try: + tmpdb = SamDB(address, credentials=CREDS, + session_info=system_session(LP), lp=LP) + tmpdb.modify(msg) + except ldb.LdbError as e: + self.fail("couldn't modify referred location %s" % + address) + + if address.lower().startswith(self.samdb.domain_dns_name()): + self.fail("referral address did not give a specific DC") + + def test_modify_nonreplicated_attributes(self): + # some timestamp ones + dn = 'CN=Guest,CN=Users,' + self.base_dn + value = '123456789' + for attr in ['badPwdCount', 'lastLogon', 'lastLogoff']: + m = ldb.Message() + m.dn = ldb.Dn(self.samdb, dn) + m[attr] = ldb.MessageElement(value, + ldb.FLAG_MOD_REPLACE, + attr) + # Windows refers these ones even though they are non-replicated + try: + self.samdb.modify(m) + self.fail("Failed to fail to modify %s %s" % (dn, attr)) + except ldb.LdbError as e2: + (ecode, emsg) = e2.args + if ecode != ldb.ERR_REFERRAL: + self.fail("Failed to REFER when trying to modify %s %s" % + (dn, attr)) + else: + m = re.search(r'(ldap://[^>]+)>', emsg) + if m is None: + self.fail("referral seems not to refer to anything") + address = m.group(1) + + if address.lower().startswith(self.samdb.domain_dns_name()): + self.fail("referral address did not give a specific DC") + + def test_modify_nonreplicated_reps_attributes(self): + # some timestamp ones + dn = self.base_dn + + m = ldb.Message() + m.dn = ldb.Dn(self.samdb, dn) + attr = 'repsFrom' + + res = self.samdb.search(dn, scope=ldb.SCOPE_BASE, + attrs=['repsFrom']) + rep = ndr_unpack(drsblobs.repsFromToBlob, res[0]['repsFrom'][0], + allow_remaining=True) + rep.ctr.result_last_attempt = -1 + value = ndr_pack(rep) + + m[attr] = ldb.MessageElement(value, + ldb.FLAG_MOD_REPLACE, + attr) + try: + self.samdb.modify(m) + self.fail("Failed to fail to modify %s %s" % (dn, attr)) + except ldb.LdbError as e3: + (ecode, emsg) = e3.args + if ecode != ldb.ERR_REFERRAL: + self.fail("Failed to REFER when trying to modify %s %s" % + (dn, attr)) + else: + m = re.search(r'(ldap://[^>]+)>', emsg) + if m is None: + self.fail("referral seems not to refer to anything") + address = m.group(1) + + if address.lower().startswith(self.samdb.domain_dns_name()): + self.fail("referral address did not give a specific DC") + + def test_delete_special_objects(self): + dn = 'CN=Guest,CN=Users,' + self.base_dn + try: + self.samdb.delete(dn) + self.fail("Failed to fail to delete %s" % (dn)) + except ldb.LdbError as e4: + (ecode, emsg) = e4.args + if ecode != ldb.ERR_REFERRAL: + print(ecode, emsg) + self.fail("Failed to REFER when trying to delete %s" % dn) + else: + m = re.search(r'(ldap://[^>]+)>', emsg) + if m is None: + self.fail("referral seems not to refer to anything") + address = m.group(1) + + if address.lower().startswith(self.samdb.domain_dns_name()): + self.fail("referral address did not give a specific DC") + + def test_no_delete_nonexistent_objects(self): + dn = 'CN=does-not-exist-%s,CN=Users,%s' % (self.tag, self.base_dn) + try: + self.samdb.delete(dn) + self.fail("Failed to fail to delete %s" % (dn)) + except ldb.LdbError as e5: + (ecode, emsg) = e5.args + if ecode != ldb.ERR_NO_SUCH_OBJECT: + print(ecode, emsg) + self.fail("Failed to NO_SUCH_OBJECT when trying to delete " + "%s (which does not exist)" % dn) + + +def main(): + global HOST, CREDS, LP + parser = optparse.OptionParser("rodc.py [options] ") + + sambaopts = options.SambaOptions(parser) + versionopts = options.VersionOptions(parser) + credopts = options.CredentialsOptions(parser) + subunitopts = SubunitOptions(parser) + + parser.add_option_group(sambaopts) + parser.add_option_group(versionopts) + parser.add_option_group(credopts) + parser.add_option_group(subunitopts) + + opts, args = parser.parse_args() + + LP = sambaopts.get_loadparm() + CREDS = credopts.get_credentials(LP) + + try: + HOST = args[0] + except IndexError: + parser.print_usage() + sys.exit(1) + + if "://" not in HOST: + if os.path.isfile(HOST): + HOST = "tdb://%s" % HOST + else: + HOST = "ldap://%s" % HOST + + TestProgram(module=__name__, opts=subunitopts) + + +main() -- cgit v1.2.3