summaryrefslogtreecommitdiffstats
path: root/source4/torture/drs/python/getnc_schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/torture/drs/python/getnc_schema.py')
-rw-r--r--source4/torture/drs/python/getnc_schema.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/source4/torture/drs/python/getnc_schema.py b/source4/torture/drs/python/getnc_schema.py
new file mode 100644
index 0000000..60062f9
--- /dev/null
+++ b/source4/torture/drs/python/getnc_schema.py
@@ -0,0 +1,304 @@
+import drs_base
+import ldb
+import time
+import random
+import os
+
+break_me = os.getenv("PLEASE_BREAK_MY_WINDOWS") == "1"
+assert break_me, ("This test breaks Windows active directory after "
+ "a few runs. Set PLEASE_BREAK_MY_WINDOWS=1 to run.")
+
+# This test runs against Windows. To run, set up two Windows AD DCs, join one
+# to the other, and make sure the passwords are the same. SMB_CONF_PATH must
+# also be set to any smb.conf file. Set DC1 to the PDC's hostname, and DC2 to
+# the join'd DC's hostname. Example:
+# PLEASE_BREAK_MY_WINDOWS=1
+# DC1=pdc DC2=joindc
+# SMB_CONF_PATH=st/ad_dc/etc/smb.conf
+# PYTHONPATH=$PYTHONPATH:./source4/torture/drs/python
+# python3 ./source4/scripting/bin/subunitrun getnc_schema
+# -UAdministrator%Password
+
+class SchemaReplicationTests(drs_base.DrsBaseTestCase):
+
+ def setUp(self):
+ super(SchemaReplicationTests, self).setUp()
+ self.creds = self.get_credentials()
+ self.cmdline_auth = "-U{}%{}".format(self.creds.get_username(),
+ self.creds.get_password())
+
+ self.from_ldb = self.ldb_dc1
+ self.dest_ldb = self.ldb_dc2
+ self._disable_inbound_repl(self.url_dc1)
+ self._disable_all_repl(self.url_dc1)
+ self.free_offset = 0
+
+ def tearDown(self):
+ self._enable_inbound_repl(self.url_dc1)
+ self._enable_all_repl(self.url_dc1)
+
+ def do_repl(self, partition_dn):
+ self._enable_inbound_repl(self.url_dc1)
+ self._enable_all_repl(self.url_dc1)
+
+ samba_tool_cmd = ["drs", "replicate", self.url_dc2, self.url_dc1]
+ samba_tool_cmd += [partition_dn]
+ username = self.creds.get_username()
+ password = self.creds.get_password()
+ samba_tool_cmd += ["-U{0}%{1}".format(username, password)]
+
+ (result, out, err) = self.runsubcmd(*samba_tool_cmd)
+
+ try:
+ self.assertCmdSuccess(result, out, err)
+ except AssertionError:
+ print("Failed repl, retrying in 10s")
+ time.sleep(10)
+ (result, out, err) = self.runsubcmd(*samba_tool_cmd)
+
+ self._disable_inbound_repl(self.url_dc1)
+ self._disable_all_repl(self.url_dc1)
+
+ self.assertCmdSuccess(result, out, err)
+
+ # Get a unique prefix for some search expression like "([att]=[pref]{i}*)"
+ def get_unique(self, expr_templ):
+ found = True
+ while found:
+ i = random.randint(0, 65535)
+ res = self.from_ldb.search(base=self.schema_dn,
+ scope=ldb.SCOPE_SUBTREE,
+ expression=expr_templ.format(i=i))
+ found = len(res) > 0
+
+ return str(i)
+
+ def unique_gov_id_prefix(self):
+ prefix = "1.3.6.1.4.1.7165.4.6.2.8."
+ return prefix + self.get_unique("(governsId=" + prefix + "{i}.*)")
+
+ def unique_cn_prefix(self, prefix="testobj"):
+ return prefix + self.get_unique("(cn=" + prefix + "{i}x*)") + "x"
+
+ # Make test schema classes linked to each other in a line, then modify
+ # them in reverse order so when we repl, a link crosses the chunk
+ # boundary. Chunk size is 133 by default so we do 150.
+ def test_poss_superiors_across_chunk(self):
+ num_schema_objects_to_add = 150
+ class_name = self.unique_cn_prefix()
+
+ ldif_template = """
+dn: CN={class_name}{i},{schema_dn}
+objectClass: top
+objectClass: classSchema
+adminDescription: {class_name}{i}
+adminDisplayName: {class_name}{i}
+cn: {class_name}{i}
+governsId: {gov_id}.{i}
+instanceType: 4
+objectClassCategory: 1
+systemFlags: 16
+systemOnly: FALSE
+"""
+
+ ldif_kwargs = {'class_name': class_name,
+ 'schema_dn': self.schema_dn}
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(i=0, gov_id=gov_id, **ldif_kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ ldif_template += "systemPossSuperiors: {possSup}\n"
+
+ ids = list(range(num_schema_objects_to_add))
+ got_no_such_attrib = False
+ for i in ids[1:]:
+ last_class_name = class_name + str(i-1)
+ ldif = ldif_template.format(i=i, gov_id=gov_id,
+ possSup=last_class_name,
+ **ldif_kwargs)
+
+ try:
+ self.from_ldb.add_ldif(ldif)
+ if got_no_such_attrib:
+ self.from_ldb.set_schema_update_now()
+ except ldb.LdbError as e:
+ if e.args[0] != ldb.ERR_NO_SUCH_ATTRIBUTE:
+ self.fail(e)
+ if got_no_such_attrib:
+ self.fail(("got NO_SUCH_ATTRIB even after "
+ "setting schemaUpdateNow", str(e)))
+ print("got NO_SUCH_ATTRIB, trying schemaUpdateNow")
+ got_no_such_attrib = True
+ self.from_ldb.set_schema_update_now()
+ self.from_ldb.add_ldif(ldif)
+ self.from_ldb.set_schema_update_now()
+
+ ldif_template = """
+dn: CN={class_name}{i},{schema_dn}
+changetype: modify
+replace: adminDescription
+adminDescription: new_description
+"""
+
+ for i in reversed(ids):
+ ldif = ldif_template.format(i=i, **ldif_kwargs)
+ self.from_ldb.modify_ldif(ldif)
+
+ self.do_repl(self.schema_dn)
+
+ dn_templ = "CN={class_name}{i},{schema_dn}"
+ for i in ids:
+ dn = dn_templ.format(i=i, **ldif_kwargs)
+ res = self.dest_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+
+ # Test for method of adding linked attributes in schema partition
+ # required by other tests.
+ def test_create_linked_attribute_in_schema(self):
+ # Make an object outside of the schema partition that we can link to
+ user_name = self.unique_cn_prefix("user")
+ user_dn = "CN={},CN=Users,{}".format(user_name, self.domain_dn)
+
+ ldif_template = """
+dn: {user_dn}
+objectClass: person
+objectClass: user"""
+ ldif = ldif_template.format(user_dn=user_dn)
+ self.from_ldb.add_ldif(ldif)
+
+ # Make test class name unique so test can run multiple times
+ class_name = self.unique_cn_prefix("class")
+
+ kwargs = {'class_name': class_name,
+ 'schema_dn': self.schema_dn,
+ 'user_dn': user_dn}
+
+ # Add an auxiliary schemaClass (cat 3) class and give it managedBy
+ # so we can create schema objects with linked attributes.
+ ldif_template = """
+dn: CN={class_name},{schema_dn}
+objectClass: classSchema
+governsId: {gov_id}.0
+instanceType: 4
+systemFlags: 16
+systemOnly: FALSE
+objectClassCategory: 3
+mayContain: managedBy
+"""
+
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ # Now make an instance that points back to the user with managedBy,
+ # thus creating an object in the schema with a linked attribute
+ ldif_template = """
+dn: CN=link{class_name},{schema_dn}
+objectClass: classSchema
+objectClass: {class_name}
+instanceType: 4
+governsId: {gov_id}.0
+systemFlags: 16
+managedBy: {user_dn}
+"""
+
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ # Check link exists on test schema object
+ dn_templ = "CN=link{class_name},{schema_dn}"
+ dn = dn_templ.format(**kwargs)
+ res = self.from_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ self.assertIsNotNone(res[0].get("managedBy"))
+ self.assertEqual(str(res[0].get("managedBy")[0]), user_dn)
+
+ # Check backlink on user object
+ res = self.from_ldb.search(base=user_dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ managed_objs = res[0].get("managedObjects")
+ self.assertEqual(len(managed_objs), 1)
+ managed_objs = [str(o) for o in managed_objs]
+ self.assertEqual(managed_objs, [dn_templ.format(**kwargs)])
+
+ def test_schema_linked_attributes(self):
+ num_test_objects = 9
+
+ # Make an object outside of the schema partition that we can link to
+ user_name = self.unique_cn_prefix("user")
+ user_dn = "CN={},CN=Users,{}".format(user_name, self.domain_dn)
+
+ ldif_template = """
+dn: {user_dn}
+objectClass: person
+objectClass: user"""
+ ldif = ldif_template.format(user_dn=user_dn)
+ self.from_ldb.add_ldif(ldif)
+
+ self.do_repl(self.domain_dn)
+
+ # Make test object name prefixes unique so test can run multiple times
+ # in a single testenv (can't delete schema objects)
+ class_name = self.unique_cn_prefix("class")
+ link_class_name = self.unique_cn_prefix("linkClass")
+
+ kwargs = {'class_name': class_name,
+ 'schema_dn': self.schema_dn,
+ 'link_class_name': link_class_name,
+ 'user_dn': user_dn}
+
+ # Add an auxiliary schemaClass (cat 3) class and give it managedBy
+ # so we can create schema objects with linked attributes.
+ ldif_template = """
+dn: CN={class_name},{schema_dn}
+objectClass: classSchema
+governsId: {gov_id}.0
+instanceType: 4
+systemFlags: 16
+systemOnly: FALSE
+objectClassCategory: 3
+mayContain: managedBy
+"""
+
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ # Now make instances that point back to the user with managedBy,
+ # thus creating objects in the schema with linked attributes
+ ldif_template = """
+dn: CN={link_class_name}{i},{schema_dn}
+objectClass: classSchema
+objectClass: {class_name}
+instanceType: 4
+governsId: {gov_id}.0
+systemFlags: 16
+managedBy: {user_dn}
+"""
+
+ id_range = list(range(num_test_objects))
+ for i in id_range:
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(i=i, gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ self.do_repl(self.schema_dn)
+
+ # Check link exists in each test schema objects at destination DC
+ dn_templ = "CN={link_class_name}{i},{schema_dn}"
+ for i in id_range:
+ dn = dn_templ.format(i=i, **kwargs)
+ res = self.dest_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ self.assertIsNotNone(res[0].get("managedBy"))
+ self.assertEqual(str(res[0].get("managedBy")[0]), user_dn)
+
+ # Check backlinks list on user object contains DNs of test objects.
+ res = self.dest_ldb.search(base=user_dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ managed_objs = res[0].get("managedObjects")
+ self.assertIsNotNone(managed_objs)
+ managed_objs_set = {str(el) for el in managed_objs}
+ expected = {dn_templ.format(i=i, **kwargs) for i in id_range}
+ self.assertEqual(managed_objs_set, expected)