summaryrefslogtreecommitdiffstats
path: root/source4/dsdb/tests/python/urgent_replication.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/dsdb/tests/python/urgent_replication.py')
-rwxr-xr-xsource4/dsdb/tests/python/urgent_replication.py339
1 files changed, 339 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/urgent_replication.py b/source4/dsdb/tests/python/urgent_replication.py
new file mode 100755
index 0000000..0d39c38
--- /dev/null
+++ b/source4/dsdb/tests/python/urgent_replication.py
@@ -0,0 +1,339 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import optparse
+import sys
+sys.path.insert(0, "bin/python")
+import samba
+from samba.tests.subunitrun import TestProgram, SubunitOptions
+
+from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
+ MessageElement, Dn, FLAG_MOD_REPLACE)
+import samba.tests
+import samba.dsdb as dsdb
+import samba.getopt as options
+import random
+
+parser = optparse.OptionParser("urgent_replication.py [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+ parser.print_usage()
+ sys.exit(1)
+
+host = args[0]
+
+
+class UrgentReplicationTests(samba.tests.TestCase):
+
+ def delete_force(self, ldb, dn):
+ try:
+ ldb.delete(dn, ["relax:0"])
+ except LdbError as e:
+ (num, _) = e.args
+ self.assertEqual(num, ERR_NO_SUCH_OBJECT)
+
+ def setUp(self):
+ super(UrgentReplicationTests, self).setUp()
+ self.ldb = samba.tests.connect_samdb(host, global_schema=False)
+ self.base_dn = self.ldb.domain_dn()
+
+ print("baseDN: %s\n" % self.base_dn)
+
+ def test_nonurgent_object(self):
+ """Test if the urgent replication is not activated when handling a non urgent object."""
+ self.ldb.add({
+ "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "samaccountname": "nonurgenttest",
+ "description": "nonurgenttest description"})
+
+ # urgent replication should not be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should not be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
+ "description")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should not be enabled when deleting
+ self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ def test_nTDSDSA_object(self):
+ """Test if the urgent replication is activated when handling a nTDSDSA object."""
+ self.ldb.add({
+ "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
+ self.ldb.get_config_basedn(),
+ "objectclass": "server",
+ "cn": "test server",
+ "name": "test server",
+ "systemFlags": "50000000"}, ["relax:0"])
+
+ self.ldb.add_ldif(
+ """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
+objectclass: nTDSDSA
+cn: NTDS Settings test
+options: 1
+instanceType: 4
+systemFlags: 33554432""", ["relax:0"])
+
+ # urgent replication should be enabled when creation
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should NOT be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "options")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when deleting
+ self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+
+ def test_crossRef_object(self):
+ """Test if the urgent replication is activated when handling a crossRef object."""
+ self.ldb.add({
+ "dn": "CN=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn,
+ "objectClass": "crossRef",
+ "cn": "test crossRef",
+ "dnsRoot": self.get_loadparm().get("realm").lower(),
+ "instanceType": "4",
+ "nCName": self.base_dn,
+ "showInAdvancedViewOnly": "TRUE",
+ "name": "test crossRef",
+ "systemFlags": "1"}, ["relax:0"])
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should NOT be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "systemFlags")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when deleting
+ self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ def test_attributeSchema_object(self):
+ """Test if the urgent replication is activated when handling an attributeSchema object"""
+
+ self.ldb.add_ldif(
+ """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: attributeSchema
+cn: test attributeSchema
+instanceType: 4
+isSingleValued: FALSE
+showInAdvancedViewOnly: FALSE
+attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1, 100000)) + """
+attributeSyntax: 2.5.5.12
+adminDisplayName: test attributeSchema
+adminDescription: test attributeSchema
+oMSyntax: 64
+systemOnly: FALSE
+searchFlags: 8
+lDAPDisplayName: testAttributeSchema
+name: test attributeSchema""")
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
+ "lDAPDisplayName")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ def test_classSchema_object(self):
+ """Test if the urgent replication is activated when handling a classSchema object."""
+ try:
+ self.ldb.add_ldif(
+ """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: classSchema
+cn: test classSchema
+instanceType: 4
+subClassOf: top
+governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1, 100000)) + """
+rDNAttID: cn
+showInAdvancedViewOnly: TRUE
+adminDisplayName: test classSchema
+adminDescription: test classSchema
+objectClassCategory: 1
+lDAPDisplayName: testClassSchema
+name: test classSchema
+systemOnly: FALSE
+systemPossSuperiors: dfsConfiguration
+systemMustContain: msDFS-SchemaMajorVersion
+defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
+ CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
+systemFlags: 16
+defaultHidingValue: TRUE""")
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ except LdbError:
+ print("Not testing urgent replication when creating classSchema object ...\n")
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
+ "lDAPDisplayName")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ def test_secret_object(self):
+ """Test if the urgent replication is activated when handling a secret object."""
+
+ self.ldb.add({
+ "dn": "cn=test secret,cn=System," + self.base_dn,
+ "objectClass": "secret",
+ "cn": "test secret",
+ "name": "test secret",
+ "currentValue": "xxxxxxx"})
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
+ m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
+ "currentValue")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should NOT be enabled when deleting
+ self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ def test_rIDManager_object(self):
+ """Test if the urgent replication is activated when handling a rIDManager object."""
+ self.ldb.add_ldif(
+ """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
+objectClass: rIDManager
+cn: RID Manager test
+instanceType: 4
+showInAdvancedViewOnly: TRUE
+name: RID Manager test
+systemFlags: -1946157056
+isCriticalSystemObject: TRUE
+rIDAvailablePool: 133001-1073741823""", ["relax:0"])
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "systemFlags")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should NOT be enabled when deleting
+ self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ def test_urgent_attributes(self):
+ """Test if the urgent replication is activated when handling urgent attributes of an object."""
+
+ self.ldb.add({
+ "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "samaccountname": "user UrgAttr test",
+ "userAccountControl": str(dsdb.UF_NORMAL_ACCOUNT),
+ "lockoutTime": "0",
+ "pwdLastSet": "0",
+ "description": "urgent attributes test description"})
+
+ # urgent replication should NOT be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when modifying userAccountControl
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT + dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
+ "userAccountControl")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when modifying lockoutTime
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
+ "lockoutTime")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should be enabled when modifying pwdLastSet
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
+ "pwdLastSet")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should NOT be enabled when modifying a not-urgent
+ # attribute
+ m = Message()
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["description"] = MessageElement("updated urgent attributes test description",
+ FLAG_MOD_REPLACE, "description")
+ self.ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+ # urgent replication should NOT be enabled when deleting
+ self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEqual(res["uSNHighest"], res["uSNUrgent"])
+
+
+TestProgram(module=__name__, opts=subunitopts)