summaryrefslogtreecommitdiffstats
path: root/source4/dsdb/tests/python/rodc.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xsource4/dsdb/tests/python/rodc.py258
1 files changed, 258 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/rodc.py b/source4/dsdb/tests/python/rodc.py
new file mode 100755
index 0000000..1a4b0f9
--- /dev/null
+++ b/source4/dsdb/tests/python/rodc.py
@@ -0,0 +1,258 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+import optparse
+import sys
+import os
+import base64
+import random
+import re
+import uuid
+
+sys.path.insert(0, "bin/python")
+import samba
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+
+import samba.getopt as options
+
+from samba.auth import system_session
+import ldb
+from samba.samdb import SamDB
+from samba.ndr import ndr_pack, ndr_unpack
+from samba.dcerpc import drsblobs
+
+import time
+
+
+class RodcTestException(Exception):
+ pass
+
+
+class RodcTests(samba.tests.TestCase):
+
+ def setUp(self):
+ super(RodcTests, self).setUp()
+ self.samdb = SamDB(HOST, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ self.base_dn = self.samdb.domain_dn()
+
+ root = self.samdb.search(base='', scope=ldb.SCOPE_BASE,
+ attrs=['dsServiceName'])
+ self.service = root[0]['dsServiceName'][0]
+ self.tag = uuid.uuid4().hex
+
+ def test_add_replicated_objects(self):
+ for o in (
+ {
+ 'dn': "ou=%s1,%s" % (self.tag, self.base_dn),
+ "objectclass": "organizationalUnit"
+ },
+ {
+ 'dn': "cn=%s2,%s" % (self.tag, self.base_dn),
+ "objectclass": "user"
+ },
+ {
+ 'dn': "cn=%s3,%s" % (self.tag, self.base_dn),
+ "objectclass": "group"
+ },
+ {
+ 'dn': "cn=%s4,%s" % (self.tag, self.service),
+ "objectclass": "NTDSConnection",
+ 'enabledConnection': 'TRUE',
+ 'fromServer': self.base_dn,
+ 'options': '0'
+ },
+ ):
+ try:
+ self.samdb.add(o)
+ self.fail("Failed to fail to add %s" % o['dn'])
+ except ldb.LdbError as e:
+ (ecode, emsg) = e.args
+ if ecode != ldb.ERR_REFERRAL:
+ print(emsg)
+ self.fail("Adding %s: ldb error: %s %s, wanted referral" %
+ (o['dn'], ecode, emsg))
+ else:
+ m = re.search(r'(ldap://[^>]+)>', emsg)
+ if m is None:
+ self.fail("referral seems not to refer to anything")
+ address = m.group(1)
+
+ try:
+ tmpdb = SamDB(address, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+ tmpdb.add(o)
+ tmpdb.delete(o['dn'])
+ except ldb.LdbError as e:
+ self.fail("couldn't modify referred location %s" %
+ address)
+
+ if address.lower().startswith(self.samdb.domain_dns_name()):
+ self.fail("referral address did not give a specific DC")
+
+ def test_modify_replicated_attributes(self):
+ # some timestamp ones
+ dn = 'CN=Guest,CN=Users,' + self.base_dn
+ value = 'hallooo'
+ for attr in ['carLicense', 'middleName']:
+ msg = ldb.Message()
+ msg.dn = ldb.Dn(self.samdb, dn)
+ msg[attr] = ldb.MessageElement(value,
+ ldb.FLAG_MOD_REPLACE,
+ attr)
+ try:
+ self.samdb.modify(msg)
+ self.fail("Failed to fail to modify %s %s" % (dn, attr))
+ except ldb.LdbError as e1:
+ (ecode, emsg) = e1.args
+ if ecode != ldb.ERR_REFERRAL:
+ self.fail("Failed to REFER when trying to modify %s %s" %
+ (dn, attr))
+ else:
+ m = re.search(r'(ldap://[^>]+)>', emsg)
+ if m is None:
+ self.fail("referral seems not to refer to anything")
+ address = m.group(1)
+
+ try:
+ tmpdb = SamDB(address, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+ tmpdb.modify(msg)
+ except ldb.LdbError as e:
+ self.fail("couldn't modify referred location %s" %
+ address)
+
+ if address.lower().startswith(self.samdb.domain_dns_name()):
+ self.fail("referral address did not give a specific DC")
+
+ def test_modify_nonreplicated_attributes(self):
+ # some timestamp ones
+ dn = 'CN=Guest,CN=Users,' + self.base_dn
+ value = '123456789'
+ for attr in ['badPwdCount', 'lastLogon', 'lastLogoff']:
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.samdb, dn)
+ m[attr] = ldb.MessageElement(value,
+ ldb.FLAG_MOD_REPLACE,
+ attr)
+ # Windows refers these ones even though they are non-replicated
+ try:
+ self.samdb.modify(m)
+ self.fail("Failed to fail to modify %s %s" % (dn, attr))
+ except ldb.LdbError as e2:
+ (ecode, emsg) = e2.args
+ if ecode != ldb.ERR_REFERRAL:
+ self.fail("Failed to REFER when trying to modify %s %s" %
+ (dn, attr))
+ else:
+ m = re.search(r'(ldap://[^>]+)>', emsg)
+ if m is None:
+ self.fail("referral seems not to refer to anything")
+ address = m.group(1)
+
+ if address.lower().startswith(self.samdb.domain_dns_name()):
+ self.fail("referral address did not give a specific DC")
+
+ def test_modify_nonreplicated_reps_attributes(self):
+ # some timestamp ones
+ dn = self.base_dn
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.samdb, dn)
+ attr = 'repsFrom'
+
+ res = self.samdb.search(dn, scope=ldb.SCOPE_BASE,
+ attrs=['repsFrom'])
+ rep = ndr_unpack(drsblobs.repsFromToBlob, res[0]['repsFrom'][0],
+ allow_remaining=True)
+ rep.ctr.result_last_attempt = -1
+ value = ndr_pack(rep)
+
+ m[attr] = ldb.MessageElement(value,
+ ldb.FLAG_MOD_REPLACE,
+ attr)
+ try:
+ self.samdb.modify(m)
+ self.fail("Failed to fail to modify %s %s" % (dn, attr))
+ except ldb.LdbError as e3:
+ (ecode, emsg) = e3.args
+ if ecode != ldb.ERR_REFERRAL:
+ self.fail("Failed to REFER when trying to modify %s %s" %
+ (dn, attr))
+ else:
+ m = re.search(r'(ldap://[^>]+)>', emsg)
+ if m is None:
+ self.fail("referral seems not to refer to anything")
+ address = m.group(1)
+
+ if address.lower().startswith(self.samdb.domain_dns_name()):
+ self.fail("referral address did not give a specific DC")
+
+ def test_delete_special_objects(self):
+ dn = 'CN=Guest,CN=Users,' + self.base_dn
+ try:
+ self.samdb.delete(dn)
+ self.fail("Failed to fail to delete %s" % (dn))
+ except ldb.LdbError as e4:
+ (ecode, emsg) = e4.args
+ if ecode != ldb.ERR_REFERRAL:
+ print(ecode, emsg)
+ self.fail("Failed to REFER when trying to delete %s" % dn)
+ else:
+ m = re.search(r'(ldap://[^>]+)>', emsg)
+ if m is None:
+ self.fail("referral seems not to refer to anything")
+ address = m.group(1)
+
+ if address.lower().startswith(self.samdb.domain_dns_name()):
+ self.fail("referral address did not give a specific DC")
+
+ def test_no_delete_nonexistent_objects(self):
+ dn = 'CN=does-not-exist-%s,CN=Users,%s' % (self.tag, self.base_dn)
+ try:
+ self.samdb.delete(dn)
+ self.fail("Failed to fail to delete %s" % (dn))
+ except ldb.LdbError as e5:
+ (ecode, emsg) = e5.args
+ if ecode != ldb.ERR_NO_SUCH_OBJECT:
+ print(ecode, emsg)
+ self.fail("Failed to NO_SUCH_OBJECT when trying to delete "
+ "%s (which does not exist)" % dn)
+
+
+def main():
+ global HOST, CREDS, LP
+ parser = optparse.OptionParser("rodc.py [options] <host>")
+
+ sambaopts = options.SambaOptions(parser)
+ versionopts = options.VersionOptions(parser)
+ credopts = options.CredentialsOptions(parser)
+ subunitopts = SubunitOptions(parser)
+
+ parser.add_option_group(sambaopts)
+ parser.add_option_group(versionopts)
+ parser.add_option_group(credopts)
+ parser.add_option_group(subunitopts)
+
+ opts, args = parser.parse_args()
+
+ LP = sambaopts.get_loadparm()
+ CREDS = credopts.get_credentials(LP)
+
+ try:
+ HOST = args[0]
+ except IndexError:
+ parser.print_usage()
+ sys.exit(1)
+
+ if "://" not in HOST:
+ if os.path.isfile(HOST):
+ HOST = "tdb://%s" % HOST
+ else:
+ HOST = "ldap://%s" % HOST
+
+ TestProgram(module=__name__, opts=subunitopts)
+
+
+main()