summaryrefslogtreecommitdiffstats
path: root/source4/dsdb/tests/python/notification.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:47:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:47:29 +0000
commit4f5791ebd03eaec1c7da0865a383175b05102712 (patch)
tree8ce7b00f7a76baa386372422adebbe64510812d4 /source4/dsdb/tests/python/notification.py
parentInitial commit. (diff)
downloadsamba-4f5791ebd03eaec1c7da0865a383175b05102712.tar.xz
samba-4f5791ebd03eaec1c7da0865a383175b05102712.zip
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'source4/dsdb/tests/python/notification.py')
-rwxr-xr-xsource4/dsdb/tests/python/notification.py372
1 files changed, 372 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/notification.py b/source4/dsdb/tests/python/notification.py
new file mode 100755
index 0000000..b9e1032
--- /dev/null
+++ b/source4/dsdb/tests/python/notification.py
@@ -0,0 +1,372 @@
+#!/usr/bin/env python3
+#
+# Unit tests for the notification control
+# Copyright (C) Stefan Metzmacher 2016
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import optparse
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+import samba
+
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+
+import samba.getopt as options
+
+from samba.auth import system_session
+from samba import ldb
+from samba.samdb import SamDB
+from samba.ndr import ndr_unpack
+from samba import gensec
+from samba.credentials import Credentials
+import samba.tests
+
+from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
+
+from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
+from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
+from ldb import Message
+
+parser = optparse.OptionParser("notification.py [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+ parser.print_usage()
+ sys.exit(1)
+
+url = args[0]
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+
+class LDAPNotificationTest(samba.tests.TestCase):
+
+ def setUp(self):
+ super(LDAPNotificationTest, self).setUp()
+ self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
+ self.base_dn = self.ldb.domain_dn()
+
+ res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
+ self.assertEqual(len(res), 1)
+
+ self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
+
+ def test_simple_search(self):
+ """Testing a notification with an modify and a timeout"""
+ if not url.startswith("ldap"):
+ self.fail(msg="This test is only valid on ldap")
+
+ msg1 = None
+ search1 = self.ldb.search_iterator(base=self.user_sid_dn,
+ expression="(objectClass=*)",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name", "objectGUID", "displayName"])
+ for reply in search1:
+ self.assertIsInstance(reply, ldb.Message)
+ self.assertIsNone(msg1)
+ msg1 = reply
+ res1 = search1.result()
+
+ search2 = self.ldb.search_iterator(base=self.base_dn,
+ expression="(objectClass=*)",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name", "objectGUID", "displayName"])
+ refs2 = 0
+ msg2 = None
+ for reply in search2:
+ if isinstance(reply, str):
+ refs2 += 1
+ continue
+ self.assertIsInstance(reply, ldb.Message)
+ if reply["objectGUID"][0] == msg1["objectGUID"][0]:
+ self.assertIsNone(msg2)
+ msg2 = reply
+ self.assertEqual(msg1.dn, msg2.dn)
+ self.assertEqual(len(msg1), len(msg2))
+ self.assertEqual(msg1["name"], msg2["name"])
+ #self.assertEqual(msg1["displayName"], msg2["displayName"])
+ res2 = search2.result()
+
+ self.ldb.modify_ldif("""
+dn: """ + self.user_sid_dn + """
+changetype: modify
+replace: otherLoginWorkstations
+otherLoginWorkstations: BEFORE"
+""")
+ notify1 = self.ldb.search_iterator(base=self.base_dn,
+ expression="(objectClass=*)",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name", "objectGUID", "displayName"],
+ controls=["notification:1"],
+ timeout=1)
+
+ self.ldb.modify_ldif("""
+dn: """ + self.user_sid_dn + """
+changetype: modify
+replace: otherLoginWorkstations
+otherLoginWorkstations: AFTER"
+""")
+
+ msg3 = None
+ for reply in notify1:
+ self.assertIsInstance(reply, ldb.Message)
+ if reply["objectGUID"][0] == msg1["objectGUID"][0]:
+ self.assertIsNone(msg3)
+ msg3 = reply
+ self.assertEqual(msg1.dn, msg3.dn)
+ self.assertEqual(len(msg1), len(msg3))
+ self.assertEqual(msg1["name"], msg3["name"])
+ #self.assertEqual(msg1["displayName"], msg3["displayName"])
+ try:
+ res = notify1.result()
+ self.fail()
+ except LdbError as e10:
+ (num, _) = e10.args
+ self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
+ self.assertIsNotNone(msg3)
+
+ self.ldb.modify_ldif("""
+dn: """ + self.user_sid_dn + """
+changetype: delete
+delete: otherLoginWorkstations
+""")
+
+ def test_max_search(self):
+ """Testing the max allowed notifications"""
+ if not url.startswith("ldap"):
+ self.fail(msg="This test is only valid on ldap")
+
+ max_notifications = 5
+
+ notifies = [None] * (max_notifications + 1)
+ for i in range(0, max_notifications + 1):
+ notifies[i] = self.ldb.search_iterator(base=self.base_dn,
+ expression="(objectClass=*)",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=1)
+ num_admin_limit = 0
+ num_time_limit = 0
+ for i in range(0, max_notifications + 1):
+ try:
+ for msg in notifies[i]:
+ continue
+ res = notifies[i].result()
+ self.fail()
+ except LdbError as e:
+ (num, _) = e.args
+ if num == ERR_ADMIN_LIMIT_EXCEEDED:
+ num_admin_limit += 1
+ continue
+ if num == ERR_TIME_LIMIT_EXCEEDED:
+ num_time_limit += 1
+ continue
+ raise
+ self.assertEqual(num_admin_limit, 1)
+ self.assertEqual(num_time_limit, max_notifications)
+
+ def test_invalid_filter(self):
+ """Testing invalid filters for notifications"""
+ if not url.startswith("ldap"):
+ self.fail(msg="This test is only valid on ldap")
+
+ valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
+
+ for va in valid_attrs:
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s=*)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=1)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e1:
+ (num, _) = e1.args
+ self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(|(%s=*)(%s=value))" % (va, va),
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=1)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e2:
+ (num, _) = e2.args
+ self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(&(%s=*)(%s=value))" % (va, va),
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e3:
+ (num, _) = e3.args
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s=value)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e4:
+ (num, _) = e4.args
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s>=value)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e5:
+ (num, _) = e5.args
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s<=value)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e6:
+ (num, _) = e6.args
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s=*value*)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e7:
+ (num, _) = e7.args
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(!(%s=*))" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e8:
+ (num, _) = e8.args
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ res = self.ldb.search(base=self.ldb.get_schema_basedn(),
+ expression="(objectClass=attributeSchema)",
+ scope=ldb.SCOPE_ONELEVEL,
+ attrs=["lDAPDisplayName"],
+ controls=["paged_results:1:2500"])
+ for msg in res:
+ va = str(msg["lDAPDisplayName"][0])
+ if va in valid_attrs:
+ continue
+
+ try:
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s=*)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e9:
+ (num, _) = e9.args
+ if num != ERR_UNWILLING_TO_PERFORM:
+ print("va[%s]" % va)
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+ try:
+ va = "noneAttributeName"
+ hnd = self.ldb.search_iterator(base=self.base_dn,
+ expression="(%s=*)" % va,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["name"],
+ controls=["notification:1"],
+ timeout=0)
+ for reply in hnd:
+ self.fail()
+ res = hnd.result()
+ self.fail()
+ except LdbError as e11:
+ (num, _) = e11.args
+ if num != ERR_UNWILLING_TO_PERFORM:
+ print("va[%s]" % va)
+ self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
+
+
+if "://" not in url:
+ if os.path.isfile(url):
+ url = "tdb://%s" % url
+ else:
+ url = "ldap://%s" % url
+
+TestProgram(module=__name__, opts=subunitopts)