From 8daa83a594a2e98f39d764422bfbdbc62c9efd44 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 19:20:00 +0200 Subject: Adding upstream version 2:4.20.0+dfsg. Signed-off-by: Daniel Baumann --- python/samba/tests/samba_tool/group.py | 613 +++++++++++++++++++++++++++++++++ 1 file changed, 613 insertions(+) create mode 100644 python/samba/tests/samba_tool/group.py (limited to 'python/samba/tests/samba_tool/group.py') diff --git a/python/samba/tests/samba_tool/group.py b/python/samba/tests/samba_tool/group.py new file mode 100644 index 0000000..e7a660c --- /dev/null +++ b/python/samba/tests/samba_tool/group.py @@ -0,0 +1,613 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Michael Adam 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import ( + nttime2unix, + dsdb + ) + + +class GroupCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool group subcommands""" + groups = [] + samdb = None + + def setUp(self): + super().setUp() + self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.groups = [] + self.groups.append(self._randomGroup({"name": "testgroup1"})) + self.groups.append(self._randomGroup({"name": "testgroup2"})) + self.groups.append(self._randomGroup({"name": "testgroup3"})) + self.groups.append(self._randomGroup({"name": "testgroup4"})) + self.groups.append(self._randomGroup({"name": "testgroup5 (with brackets)"})) + self.groups.append(self._randomPosixGroup({"name": "posixgroup1"})) + self.groups.append(self._randomPosixGroup({"name": "posixgroup2"})) + self.groups.append(self._randomPosixGroup({"name": "posixgroup3"})) + self.groups.append(self._randomPosixGroup({"name": "posixgroup4"})) + self.groups.append(self._randomPosixGroup({"name": "posixgroup5 (with brackets)"})) + self.groups.append(self._randomUnixGroup({"name": "unixgroup1"})) + self.groups.append(self._randomUnixGroup({"name": "unixgroup2"})) + self.groups.append(self._randomUnixGroup({"name": "unixgroup3"})) + self.groups.append(self._randomUnixGroup({"name": "unixgroup4"})) + self.groups.append(self._randomUnixGroup({"name": "unixgroup5 (with brackets)"})) + + # setup the 12 groups and ensure they are correct + for group in self.groups: + (result, out, err) = group["createGroupFn"](group) + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "There shouldn't be any error message") + + if 'unix' in group["name"]: + self.assertIn("Modified Group '%s' successfully" + % group["name"], out) + else: + self.assertIn("Added group %s" % group["name"], out) + + group["checkGroupFn"](group) + + found = self._find_group(group["name"]) + + self.assertIsNotNone(found) + + self.assertEqual("%s" % found.get("name"), group["name"]) + self.assertEqual("%s" % found.get("description"), group["description"]) + + def tearDown(self): + super().tearDown() + # clean up all the left over groups, just in case + for group in self.groups: + if self._find_group(group["name"]): + self.runsubcmd("group", "delete", group["name"]) + + def test_newgroup(self): + """This tests the "group add" and "group delete" commands""" + # try to add all the groups again, this should fail + for group in self.groups: + (result, out, err) = self._create_group(group) + self.assertCmdFail(result, "Succeeded to add existing group") + self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err) + + # try to delete all the groups we just added + for group in self.groups: + (result, out, err) = self.runsubcmd("group", "delete", group["name"]) + self.assertCmdSuccess(result, out, err, + "Failed to delete group '%s'" % group["name"]) + found = self._find_group(group["name"]) + self.assertIsNone(found, + "Deleted group '%s' still exists" % group["name"]) + + # test adding groups + for group in self.groups: + (result, out, err) = self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "There shouldn't be any error message") + self.assertIn("Added group %s" % group["name"], out) + + found = self._find_group(group["name"]) + + self.assertEqual("%s" % found.get("samaccountname"), + "%s" % group["name"]) + + def test_list(self): + (result, out, err) = self.runsubcmd("group", "list", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + + search_filter = "(objectClass=group)" + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = str(groupobj.get("samaccountname", idx=0)) + found = self.assertMatch(out, name, + "group '%s' not found" % name) + + def test_list_verbose(self): + (result, out, err) = self.runsubcmd("group", "list", "--verbose", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list --verbose") + + # use the output to build a dictionary, where key=group-name, + # value=num-members + output_memberships = {} + + # split the output by line, skipping the first 2 header lines + group_lines = out.split('\n')[2:-1] + for line in group_lines: + # split line by column whitespace (but keep the group name together + # if it contains spaces) + values = line.split(" ") + name = values[0] + num_members = int(values[-1]) + output_memberships[name] = num_members + + # build up a similar dict using an LDAP search + search_filter = "(objectClass=group)" + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname", "member"]) + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + ldap_memberships = {} + for groupobj in grouplist: + name = str(groupobj.get("samaccountname", idx=0)) + num_members = len(groupobj.get("member", default=[])) + ldap_memberships[name] = num_members + + # check the command output matches LDAP + self.assertTrue(output_memberships == ldap_memberships, + "Command output doesn't match LDAP results.\n" + + "Command='%s'\nLDAP='%s'" %(output_memberships, + ldap_memberships)) + + def test_list_full_dn(self): + (result, out, err) = self.runsubcmd("group", "list", "--full-dn", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + + search_filter = "(objectClass=group)" + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=[]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = str(groupobj.get("dn", idx=0)) + found = self.assertMatch(out, name, + "group '%s' not found" % name) + + def test_list_base_dn(self): + base_dn = "CN=Users" + (result, out, err) = self.runsubcmd("group", "list", "--base-dn", base_dn, + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + + search_filter = "(objectClass=group)" + + grouplist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["name"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = str(groupobj.get("name", idx=0)) + found = self.assertMatch(out, name, + "group '%s' not found" % name) + + def test_listmembers(self): + (result, out, err) = self.runsubcmd("group", "listmembers", "Domain Users", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running listmembers") + + search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn() + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samAccountName"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = str(groupobj.get("samAccountName", idx=0)) + found = self.assertMatch(out, name, "group '%s' not found" % name) + + def test_listmembers_hide_expired(self): + expire_username = "expireUser" + expire_user = self._random_user({"name": expire_username}) + self._create_user(expire_user) + + (result, out, err) = self.runsubcmd( + "group", + "listmembers", + "Domain Users", + "--hide-expired", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running listmembers") + self.assertTrue(expire_username in out, + "user '%s' not found" % expire_username) + + # user will be expired one second ago + self.samdb.setexpiry( + "(sAMAccountname=%s)" % expire_username, + -1, + False) + + (result, out, err) = self.runsubcmd( + "group", + "listmembers", + "Domain Users", + "--hide-expired", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running listmembers") + self.assertFalse(expire_username in out, + "user '%s' not found" % expire_username) + + self.samdb.deleteuser(expire_username) + + def test_listmembers_hide_disabled(self): + disable_username = "disableUser" + disable_user = self._random_user({"name": disable_username}) + self._create_user(disable_user) + + (result, out, err) = self.runsubcmd( + "group", + "listmembers", + "Domain Users", + "--hide-disabled", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running listmembers") + self.assertTrue(disable_username in out, + "user '%s' not found" % disable_username) + + self.samdb.disable_account("(sAMAccountname=%s)" % disable_username) + + (result, out, err) = self.runsubcmd( + "group", + "listmembers", + "Domain Users", + "--hide-disabled", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running listmembers") + self.assertFalse(disable_username in out, + "user '%s' not found" % disable_username) + + self.samdb.deleteuser(disable_username) + + def test_listmembers_full_dn(self): + (result, out, err) = self.runsubcmd("group", "listmembers", "Domain Users", + "--full-dn", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running listmembers") + + search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn() + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["dn"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = str(groupobj.get("dn", idx=0)) + found = self.assertMatch(out, name, "group '%s' not found" % name) + + + def test_move(self): + full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest_grp")) + self.addCleanup(self.samdb.delete, full_ou_dn, ["tree_delete:1"]) + + (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "There shouldn't be any error message") + self.assertIn('Added ou "%s"' % full_ou_dn, out) + + for group in self.groups: + (result, out, err) = self.runsubcmd( + "group", "move", group["name"], full_ou_dn) + self.assertCmdSuccess(result, out, err, "Error running move") + self.assertIn('Moved group "%s" into "%s"' % + (group["name"], full_ou_dn), out) + + # Should fail as groups objects are in OU + (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn) + self.assertCmdFail(result) + self.assertIn(("subtree_delete: Unable to delete a non-leaf node " + "(it has %d children)!") % len(self.groups), err) + + for group in self.groups: + new_dn = "CN=Users,%s" % self.samdb.domain_dn() + (result, out, err) = self.runsubcmd( + "group", "move", group["name"], new_dn) + self.assertCmdSuccess(result, out, err, "Error running move") + self.assertIn('Moved group "%s" into "%s"' % + (group["name"], new_dn), out) + + def test_show(self): + """Assert that we can show a group correctly.""" + (result, out, err) = self.runsubcmd("group", "show", "Domain Users", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn("dn: CN=Domain Users,CN=Users,DC=addom,DC=samba,DC=example,DC=com", out) + + def test_rename_samaccountname(self): + """rename the samaccountname of all groups""" + for group in self.groups: + new_name = "new_samaccountname_of_" + group["name"] + + # change samaccountname + (result, out, err) = self.runsubcmd("group", "rename", group["name"], + "--samaccountname=" + new_name) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_group(new_name) + self.assertEqual("%s" % found.get("description"), group["description"]) + if not "cn" in group or str(group["cn"]) == str(group["name"]): + self.assertEqual("%s" % found.get("cn"), new_name) + else: + self.assertEqual("%s" % found.get("cn"), group["cn"]) + + # trying to remove the samaccountname throws an error + (result, out, err) = self.runsubcmd("group", "rename", new_name, + "--samaccountname=") + self.assertCmdFail(result) + self.assertIn('Failed to rename group', err) + self.assertIn('delete protected attribute', err) + + # reset changes + (result, out, err) = self.runsubcmd("group", "rename", new_name, + "--samaccountname=" + group["name"]) + self.assertCmdSuccess(result, out, err) + if "cn" in group: + (result, out, err) = self.runsubcmd("group", "rename", group["name"], + "--force-new-cn=%s" % group["cn"]) + self.assertCmdSuccess(result, out, err) + + def test_rename_cn_mail(self): + """change and remove the cn and mail attributes of all groups""" + for group in self.groups: + new_mail = "new mail of " + group["name"] + new_cn = "new cn of " + group["name"] + + # change attributes + (result, out, err) = self.runsubcmd("group", "rename", group["name"], + "--mail-address=" + new_mail, + "--force-new-cn=" + new_cn) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_group(group["name"]) + self.assertEqual("%s" % found.get("mail"), new_mail) + self.assertEqual("%s" % found.get("cn"), new_cn) + + # remove mail + (result, out, err) = self.runsubcmd("group", "rename", group["name"], + "--mail-address=") + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_group(group["name"]) + self.assertEqual(found.get("mail"), None) + + # trying to remove cn (throws an error) + (result, out, err) = self.runsubcmd("group", "rename", group["name"], + "--force-new-cn=") + self.assertCmdFail(result) + self.assertIn("Failed to rename group", err) + self.assertIn("delete protected attribute", err) + + # reset CN (mail is already empty) + (result, out, err) = self.runsubcmd("group", "rename", group["name"], + "--reset-cn") + self.assertCmdSuccess(result, out, err) + + def _randomGroup(self, base={}): + """create a group with random attribute values, you can specify base + attributes""" + group = { + "name": self.randomName(), + "description": self.randomName(count=100), + "createGroupFn": self._create_group, + "checkGroupFn": self._check_group, + } + group.update(base) + return group + + def _randomPosixGroup(self, base={}): + """create a group with random attribute values and additional RFC2307 + attributes, you can specify base attributes""" + group = self._randomGroup({}) + group.update(base) + posixAttributes = { + "unixdomain": self.randomName(), + "gidNumber": self.randomXid(), + "createGroupFn": self._create_posix_group, + "checkGroupFn": self._check_posix_group, + } + group.update(posixAttributes) + group.update(base) + return group + + def _randomUnixGroup(self, base={}): + """create a group with random attribute values and additional RFC2307 + attributes, you can specify base attributes""" + group = self._randomGroup({}) + group.update(base) + posixAttributes = { + "gidNumber": self.randomXid(), + "createGroupFn": self._create_unix_group, + "checkGroupFn": self._check_unix_group, + } + group.update(posixAttributes) + group.update(base) + return group + + def _check_group(self, group): + """ check if a group from SamDB has the same attributes as + its template """ + found = self._find_group(group["name"]) + + self.assertEqual("%s" % found.get("name"), group["name"]) + self.assertEqual("%s" % found.get("description"), group["description"]) + + def _check_posix_group(self, group): + """ check if a posix_group from SamDB has the same attributes as + its template """ + found = self._find_group(group["name"]) + + self.assertEqual("%s" % found.get("gidNumber"), "%s" % + group["gidNumber"]) + self._check_group(group) + + def _check_unix_group(self, group): + """ check if a unix_group from SamDB has the same attributes as its +template """ + found = self._find_group(group["name"]) + + self.assertEqual("%s" % found.get("gidNumber"), "%s" % + group["gidNumber"]) + self._check_group(group) + + def _create_group(self, group): + return self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + def _create_posix_group(self, group): + """ create a new group with RFC2307 attributes """ + return self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "--nis-domain=%s" % group["unixdomain"], + "--gid-number=%s" % group["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + def _create_unix_group(self, group): + """ Add RFC2307 attributes to a group""" + self._create_group(group) + return self.runsubcmd("group", "addunixattrs", group["name"], + "%s" % group["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + def _find_group(self, name): + search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" % + (ldb.binary_encode(name), + "CN=Group,CN=Schema,CN=Configuration", + self.samdb.domain_dn())) + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter) + if grouplist: + return grouplist[0] + else: + return None + + def test_stats(self): + (result, out, err) = self.runsubcmd("group", "stats", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running stats") + + # sanity-check the command reports 'total groups' correctly + search_filter = "(objectClass=group)" + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=[]) + + total_groups = len(grouplist) + self.assertTrue("Total groups: {0}".format(total_groups) in out, + "Total groups not reported correctly") + + def _random_user(self, base=None): + """ + create a user with random attribute values, you can specify + base attributes + """ + if base is None: + base = {} + user = { + "name": self.randomName(), + "password": self.random_password(16), + "surname": self.randomName(), + "given-name": self.randomName(), + "job-title": self.randomName(), + "department": self.randomName(), + "company": self.randomName(), + "description": self.randomName(count=100), + "createUserFn": self._create_user, + } + user.update(base) + return user + + def _create_user(self, user): + return self.runsubcmd( + "user", + "add", + user["name"], + user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) -- cgit v1.2.3