diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:20:00 +0000 |
commit | 8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch) | |
tree | 4099e8021376c7d8c05bdf8503093d80e9c7bad0 /python/samba/tests/samba_tool/user.py | |
parent | Initial commit. (diff) | |
download | samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip |
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python/samba/tests/samba_tool/user.py')
-rw-r--r-- | python/samba/tests/samba_tool/user.py | 1246 |
1 files changed, 1246 insertions, 0 deletions
diff --git a/python/samba/tests/samba_tool/user.py b/python/samba/tests/samba_tool/user.py new file mode 100644 index 0000000..26c9748 --- /dev/null +++ b/python/samba/tests/samba_tool/user.py @@ -0,0 +1,1246 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import base64 +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import ( + credentials, + nttime2unix, + dsdb, + werror, + ) +from samba.ndr import ndr_unpack +from samba.dcerpc import drsblobs +from samba.common import get_bytes +from samba.common import get_string +from samba.tests import env_loadparm + + +class UserCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool user subcommands""" + users = [] + samdb = None + + def setUp(self): + super().setUp() + self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + # Modify the default template homedir + lp = self.get_loadparm() + self.template_homedir = lp.get('template homedir') + lp.set('template homedir', '/home/test/%D/%U') + + self.users = [] + self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"})) + self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"})) + self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"})) + self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"})) + self.users.append(self._randomPosixUser({"name": "posixuser1"})) + self.users.append(self._randomPosixUser({"name": "posixuser2"})) + self.users.append(self._randomPosixUser({"name": "posixuser3"})) + self.users.append(self._randomPosixUser({"name": "posixuser4"})) + self.users.append(self._randomUnixUser({"name": "unixuser1"})) + self.users.append(self._randomUnixUser({"name": "unixuser2"})) + self.users.append(self._randomUnixUser({"name": "unixuser3"})) + self.users.append(self._randomUnixUser({"name": "unixuser4"})) + + # Make sure users don't exist + for user in self.users: + if self._find_user(user["name"]): + self.runsubcmd("user", "delete", user["name"]) + + # setup the 12 users and ensure they are correct + for user in self.users: + (result, out, err) = user["createUserFn"](user) + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + if 'unix' in user["name"]: + self.assertIn("Modified User '%s' successfully" % user["name"], + out) + else: + self.assertIn("User '%s' added successfully" % user["name"], + out) + + user["checkUserFn"](user) + + def tearDown(self): + super().tearDown() + # clean up all the left over users, just in case + for user in self.users: + if self._find_user(user["name"]): + self.runsubcmd("user", "delete", user["name"]) + lp = env_loadparm() + # second run of this test + # the cache is still there and '--cache-ldb-initialize' + # will fail + cachedb = lp.private_path("user-syncpasswords-cache.ldb") + if os.path.exists(cachedb): + os.remove(cachedb) + lp.set('template homedir', self.template_homedir) + + def test_newuser(self): + # try to add all the users again, this should fail + for user in self.users: + (result, out, err) = self._create_user(user) + self.assertCmdFail(result, "Ensure that create user fails") + self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err) + + # try to delete all the 4 users we just added + for user in self.users: + (result, out, err) = self.runsubcmd("user", "delete", user["name"]) + self.assertCmdSuccess(result, out, err, "Can we delete users") + found = self._find_user(user["name"]) + self.assertIsNone(found) + + # test adding users with --use-username-as-cn + for user in self.users: + (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"], + "--use-username-as-cn", + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn("User '%s' added successfully" % user["name"], out) + + found = self._find_user(user["name"]) + + self.assertEqual("%s" % found.get("cn"), "%(name)s" % user) + self.assertEqual("%s" % found.get("name"), "%(name)s" % user) + + def test_newuser_weak_password(self): + # Ensure that when we try to create a user over LDAP (thus no + # transactions) and the password is too weak, we do not get a + # half-created account. + + def cleanup_user(username): + try: + self.samdb.deleteuser(username) + except Exception as err: + estr = err.args[0] + if 'Unable to find user' not in estr: + raise + + server = os.environ['DC_SERVER'] + dc_username = os.environ['DC_USERNAME'] + dc_password = os.environ['DC_PASSWORD'] + + username = self.randomName() + password = 'a' + + self.addCleanup(cleanup_user, username) + + # Try to add the user and ensure it fails. + result, out, err = self.runsubcmd('user', 'add', + username, password, + '-H', f'ldap://{server}', + f'-U{dc_username}%{dc_password}') + self.assertCmdFail(result) + self.assertIn('Failed to add user', err) + self.assertIn('LDAP_CONSTRAINT_VIOLATION', err) + self.assertIn(f'{werror.WERR_PASSWORD_RESTRICTION:08X}', err) + + # Now search for the user, and make sure we don't find anything. + res = self.samdb.search(self.samdb.domain_dn(), + expression=f'(sAMAccountName={username})', + scope=ldb.SCOPE_SUBTREE) + self.assertEqual(0, len(res), 'expected not to find the user') + + def _verify_supplementalCredentials(self, ldif, + min_packages=3, + max_packages=6): + msgs = self.samdb.parse_ldif(ldif) + (changetype, obj) = next(msgs) + + self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required") + sc_blob = obj["supplementalCredentials"][0] + sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob) + + self.assertGreaterEqual(sc.sub.num_packages, + min_packages, "min_packages check") + self.assertLessEqual(sc.sub.num_packages, + max_packages, "max_packages check") + + if max_packages == 0: + return + + def find_package(packages, name, start_idx=0): + for i in range(start_idx, len(packages)): + if packages[i].name == name: + return (i, packages[i]) + return (None, None) + + # The ordering is this + # + # Primary:Kerberos-Newer-Keys (optional) + # Primary:Kerberos + # Primary:WDigest + # Primary:CLEARTEXT (optional) + # Primary:SambaGPG (optional) + # + # And the 'Packages' package is insert before the last + # other package. + + nidx = 0 + (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx) + self.assertIsNotNone(pp, "Packages required") + self.assertEqual(pidx + 1, sc.sub.num_packages - 1, + "Packages needs to be at num_packages - 1") + + (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys", + start_idx=nidx) + if knidx is not None: + self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position") + nidx = nidx + 1 + if nidx == pidx: + nidx = nidx + 1 + + (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos", + start_idx=nidx) + self.assertIsNotNone(pp, "Primary:Kerberos required") + self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position") + nidx = nidx + 1 + if nidx == pidx: + nidx = nidx + 1 + + (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest", + start_idx=nidx) + self.assertIsNotNone(pp, "Primary:WDigest required") + self.assertEqual(widx, nidx, "Primary:WDigest at wrong position") + nidx = nidx + 1 + if nidx == pidx: + nidx = nidx + 1 + + (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT", + start_idx=nidx) + if cidx is not None: + self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position") + nidx = nidx + 1 + if nidx == pidx: + nidx = nidx + 1 + + (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG", + start_idx=nidx) + if gidx is not None: + self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position") + nidx = nidx + 1 + if nidx == pidx: + nidx = nidx + 1 + + self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found") + + def test_setpassword(self): + expect_nt_hash = bool(int(os.environ.get("EXPECT_NT_HASH", "1"))) + + for user in self.users: + newpasswd = self.random_password(16) + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd, + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Ensure setpassword runs") + self.assertEqual(err, "", "setpassword with url") + self.assertMatch(out, "Changed password OK", "setpassword with url") + + attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG" + (result, out, err) = self.runsubcmd("user", "syncpasswords", + "--cache-ldb-initialize", + "--attributes=%s" % attributes, + "--decrypt-samba-gpg") + self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs") + self.assertEqual(err, "", "getpassword without url") + cache_attrs = { + "objectClass": {"value": "userSyncPasswords"}, + "samdbUrl": {}, + "dirsyncFilter": {}, + "dirsyncAttribute": {}, + "dirsyncControl": {"value": "dirsync:1:0:0"}, + "passwordAttribute": {}, + "decryptSambaGPG": {}, + "currentTime": {}, + } + for a in cache_attrs.keys(): + v = cache_attrs[a].get("value", "") + self.assertMatch(out, "%s: %s" % (a, v), + "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out)) + + (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait") + self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs") + self.assertEqual(err, "", "syncpasswords --no-wait") + self.assertMatch(out, "dirsync_loop(): results 0", + "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out)) + for user in self.users: + self.assertMatch(out, "sAMAccountName: %s" % (user["name"]), + "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out)) + + for user in self.users: + newpasswd = self.random_password(16) + creds = credentials.Credentials() + creds.set_anonymous() + creds.set_password(newpasswd) + unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8') + virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8') + virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8') + + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd) + self.assertCmdSuccess(result, out, err, "Ensure setpassword runs") + self.assertEqual(err, "", "setpassword without url") + self.assertMatch(out, "Changed password OK", "setpassword without url") + + (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait") + self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs") + self.assertEqual(err, "", "syncpasswords --no-wait") + self.assertMatch(out, "dirsync_loop(): results 0", + "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out)) + self.assertMatch(out, "sAMAccountName: %s" % (user["name"]), + "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out)) + self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE", + "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out) + if expect_nt_hash or "virtualSambaGPG:: " in out: + self.assertMatch(out, "unicodePwd:: %s" % unicodePwd, + "getpassword unicodePwd: out[%s]" % out) + else: + self.assertNotIn("unicodePwd:: %s" % unicodePwd, out) + self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE", + "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out) + self.assertMatch(out, "supplementalCredentials:: ", + "getpassword supplementalCredentials: out[%s]" % out) + if "virtualSambaGPG:: " in out: + self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8, + "getpassword virtualClearTextUTF8: out[%s]" % out) + self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16, + "getpassword virtualClearTextUTF16: out[%s]" % out) + self.assertMatch(out, "virtualSSHA: ", + "getpassword virtualSSHA: out[%s]" % out) + + (result, out, err) = self.runsubcmd("user", "getpassword", + user["name"], + "--attributes=%s" % attributes, + "--decrypt-samba-gpg") + self.assertCmdSuccess(result, out, err, "Ensure getpassword runs") + self.assertEqual(err, "Got password OK\n", "getpassword without url") + self.assertMatch(out, "sAMAccountName: %s" % (user["name"]), + "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out)) + if expect_nt_hash or "virtualSambaGPG:: " in out: + self.assertMatch(out, "unicodePwd:: %s" % unicodePwd, + "getpassword unicodePwd: out[%s]" % out) + else: + self.assertNotIn("unicodePwd:: %s" % unicodePwd, out) + self.assertMatch(out, "supplementalCredentials:: ", + "getpassword supplementalCredentials: out[%s]" % out) + self._verify_supplementalCredentials(out) + if "virtualSambaGPG:: " in out: + self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8, + "getpassword virtualClearTextUTF8: out[%s]" % out) + self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16, + "getpassword virtualClearTextUTF16: out[%s]" % out) + self.assertMatch(out, "virtualSSHA: ", + "getpassword virtualSSHA: out[%s]" % out) + + for user in self.users: + newpasswd = self.random_password(16) + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd, + "--must-change-at-next-login", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Ensure setpassword runs") + self.assertEqual(err, "", "setpassword with forced change") + self.assertMatch(out, "Changed password OK", "setpassword with forced change") + + def test_setexpiry(self): + for user in self.users: + twodays = time.time() + (2 * 24 * 60 * 60) + + (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"], + "--days=2", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names") + self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out) + + found = self._find_user(user["name"]) + + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time") + + # TODO: re-enable this after the filter case is sorted out + if "filters are broken, bail now": + return + + # now run the expiration based on a filter + fourdays = time.time() + (4 * 24 * 60 * 60) + (result, out, err) = self.runsubcmd("user", "setexpiry", + "--filter", "(&(objectClass=user)(company=comp2))", + "--days=4", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter") + + for user in self.users: + found = self._find_user(user["name"]) + if ("%s" % found.get("company")) == "comp2": + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time") + else: + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time") + + def test_list(self): + (result, out, err) = self.runsubcmd("user", "list", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + + search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" % + (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)) + + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(userlist) > 0, "no users found in samdb") + + for userobj in userlist: + name = str(userobj.get("samaccountname", idx=0)) + self.assertMatch(out, name, + "user '%s' not found" % name) + + + def test_list_base_dn(self): + base_dn = "CN=Users" + (result, out, err) = self.runsubcmd("user", "list", "-b", base_dn, + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + + search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" % + (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)) + + userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(userlist) > 0, "no users found in samdb") + + for userobj in userlist: + name = str(userobj.get("samaccountname", idx=0)) + self.assertMatch(out, name, + "user '%s' not found" % name) + + def test_list_full_dn(self): + (result, out, err) = self.runsubcmd("user", "list", "--full-dn", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + + search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" % + (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)) + + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["dn"]) + + self.assertTrue(len(userlist) > 0, "no users found in samdb") + + for userobj in userlist: + name = str(userobj.get("dn", idx=0)) + self.assertMatch(out, name, + "user '%s' not found" % name) + + def test_list_hide_expired(self): + expire_username = "expireUser" + expire_user = self._randomUser({"name": expire_username}) + self._create_user(expire_user) + + (result, out, err) = self.runsubcmd( + "user", + "list", + "--hide-expired", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + self.assertTrue(expire_username in out, + "user '%s' not found" % expire_username) + + # user will be expired one second ago + self.samdb.setexpiry( + "(sAMAccountname=%s)" % expire_username, + -1, + False) + + (result, out, err) = self.runsubcmd( + "user", + "list", + "--hide-expired", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + self.assertFalse(expire_username in out, + "user '%s' found" % expire_username) + + self.samdb.deleteuser(expire_username) + + def test_list_hide_disabled(self): + disable_username = "disableUser" + disable_user = self._randomUser({"name": disable_username}) + self._create_user(disable_user) + + (result, out, err) = self.runsubcmd( + "user", + "list", + "--hide-disabled", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + self.assertTrue(disable_username in out, + "user '%s' not found" % disable_username) + + self.samdb.disable_account("(sAMAccountname=%s)" % disable_username) + + (result, out, err) = self.runsubcmd( + "user", + "list", + "--hide-disabled", + "-H", + "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running list") + self.assertFalse(disable_username in out, + "user '%s' found" % disable_username) + + self.samdb.deleteuser(disable_username) + + def test_show(self): + for user in self.users: + (result, out, err) = self.runsubcmd( + "user", "show", user["name"], + "--attributes=sAMAccountName,company", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running show") + + expected_out = """dn: CN=%s %s,CN=Users,%s +company: %s +sAMAccountName: %s + +""" % (user["given-name"], user["surname"], self.samdb.domain_dn(), + user["company"], user["name"]) + + self.assertEqual(out, expected_out, + "Unexpected show output for user '%s'" % + user["name"]) + + time_attrs = [ + "name", # test that invalid values are just ignored + "whenCreated", + "whenChanged", + "accountExpires", + "badPasswordTime", + "lastLogoff", + "lastLogon", + "lastLogonTimestamp", + "lockoutTime", + "msDS-UserPasswordExpiryTimeComputed", + "pwdLastSet", + ] + + attrs = [] + for ta in time_attrs: + attrs.append(ta) + for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]: + attrs.append("%s;format=%s" % (ta, fm)) + + (result, out, err) = self.runsubcmd( + "user", "show", user["name"], + "--attributes=%s" % ",".join(attrs), + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, out, err, "Error running show") + + self.assertIn(";format=GeneralizedTime", out) + self.assertIn(";format=UnixTime", out) + self.assertIn(";format=TimeSpec", out) + + self.assertIn("name: ", out) + self.assertNotIn("name;format=GeneralizedTime: ", out) + self.assertNotIn("name;format=UnixTime: ", out) + self.assertNotIn("name;format=TimeSpec: ", out) + + self.assertIn("whenCreated: 20", out) + self.assertIn("whenCreated;format=GeneralizedTime: 20", out) + self.assertIn("whenCreated;format=UnixTime: 1", out) + self.assertIn("whenCreated;format=TimeSpec: 1", out) + + self.assertIn("whenChanged: 20", out) + self.assertIn("whenChanged;format=GeneralizedTime: 20", out) + self.assertIn("whenChanged;format=UnixTime: 1", out) + self.assertIn("whenChanged;format=TimeSpec: 1", out) + + self.assertIn("accountExpires: 9223372036854775807", out) + self.assertNotIn("accountExpires;format=GeneralizedTime: ", out) + self.assertNotIn("accountExpires;format=UnixTime: ", out) + self.assertNotIn("accountExpires;format=TimeSpec: ", out) + + self.assertIn("badPasswordTime: 0", out) + self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out) + self.assertNotIn("badPasswordTime;format=UnixTime: ", out) + self.assertNotIn("badPasswordTime;format=TimeSpec: ", out) + + self.assertIn("lastLogoff: 0", out) + self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out) + self.assertNotIn("lastLogoff;format=UnixTime: ", out) + self.assertNotIn("lastLogoff;format=TimeSpec: ", out) + + self.assertIn("lastLogon: 0", out) + self.assertNotIn("lastLogon;format=GeneralizedTime: ", out) + self.assertNotIn("lastLogon;format=UnixTime: ", out) + self.assertNotIn("lastLogon;format=TimeSpec: ", out) + + # If a specified attribute is not available on a user object + # it's silently omitted. + self.assertNotIn("lastLogonTimestamp:", out) + self.assertNotIn("lockoutTime:", out) + + self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out) + self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out) + self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out) + self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out) + + self.assertIn("pwdLastSet: 1", out) + self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out) + self.assertIn("pwdLastSet;format=UnixTime: 1", out) + self.assertIn("pwdLastSet;format=TimeSpec: 1", out) + + out_msgs = self.samdb.parse_ldif(out) + out_msg = next(out_msgs)[1] + + self.assertIn("whenCreated", out_msg) + when_created_str = str(out_msg["whenCreated"][0]) + self.assertIn("whenCreated;format=GeneralizedTime", out_msg) + self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str) + when_created_time = ldb.string_to_time(when_created_str) + self.assertIn("whenCreated;format=UnixTime", out_msg) + self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time)) + self.assertIn("whenCreated;format=TimeSpec", out_msg) + self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]), + "%d.000000000" % (when_created_time)) + + self.assertIn("whenChanged", out_msg) + when_changed_str = str(out_msg["whenChanged"][0]) + self.assertIn("whenChanged;format=GeneralizedTime", out_msg) + self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str) + when_changed_time = ldb.string_to_time(when_changed_str) + self.assertIn("whenChanged;format=UnixTime", out_msg) + self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time)) + self.assertIn("whenChanged;format=TimeSpec", out_msg) + self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]), + "%d.000000000" % (when_changed_time)) + + self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg) + pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0]) + pwd_last_set_time = ldb.string_to_time(pwd_last_set_str) + self.assertIn("pwdLastSet;format=UnixTime", out_msg) + self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time)) + self.assertIn("pwdLastSet;format=TimeSpec", out_msg) + self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0])) + self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0])) + + # assert that the pwd has been set in the minute after user creation + self.assertGreaterEqual(pwd_last_set_time, when_created_time) + self.assertLess(pwd_last_set_time, when_created_time + 60) + + self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg) + pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0]) + pwd_expires_time = ldb.string_to_time(pwd_expires_str) + self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg) + self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time)) + self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg) + self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0])) + self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0])) + + # assert that the pwd expires after it was set + self.assertGreater(pwd_expires_time, pwd_last_set_time) + + def test_move(self): + full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest_usr")) + self.addCleanup(self.samdb.delete, full_ou_dn, ["tree_delete:1"]) + + (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "There shouldn't be any error message") + self.assertIn('Added ou "%s"' % full_ou_dn, out) + + for user in self.users: + (result, out, err) = self.runsubcmd( + "user", "move", user["name"], full_ou_dn) + self.assertCmdSuccess(result, out, err, "Error running move") + self.assertIn('Moved user "%s" into "%s"' % + (user["name"], full_ou_dn), out) + + # Should fail as users objects are in OU + (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn) + self.assertCmdFail(result) + self.assertIn(("subtree_delete: Unable to delete a non-leaf node " + "(it has %d children)!") % len(self.users), err) + + for user in self.users: + new_dn = "CN=Users,%s" % self.samdb.domain_dn() + (result, out, err) = self.runsubcmd( + "user", "move", user["name"], new_dn) + self.assertCmdSuccess(result, out, err, "Error running move") + self.assertIn('Moved user "%s" into "%s"' % + (user["name"], new_dn), out) + + def test_rename_surname_initials_givenname(self): + """rename the existing surname and given name and add missing + initials, then remove them, for all users""" + for user in self.users: + new_givenname = "new_given_name_of_" + user["name"] + new_initials = "A" + new_surname = "new_surname_of_" + user["name"] + found = self._find_user(user["name"]) + old_cn = str(found.get("cn")) + + # rename given name, initials and surname + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--surname=%s" % new_surname, + "--initials=%s" % new_initials, + "--given-name=%s" % new_givenname) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual("%s" % found.get("givenName"), new_givenname) + self.assertEqual("%s" % found.get("initials"), new_initials) + self.assertEqual("%s" % found.get("sn"), new_surname) + self.assertEqual("%s" % found.get("name"), + "%s %s. %s" % (new_givenname, new_initials, new_surname)) + self.assertEqual("%s" % found.get("cn"), + "%s %s. %s" % (new_givenname, new_initials, new_surname)) + + # remove given name, initials and surname + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--surname=", + "--initials=", + "--given-name=") + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual(found.get("givenName"), None) + self.assertEqual(found.get("initials"), None) + self.assertEqual(found.get("sn"), None) + self.assertEqual("%s" % found.get("cn"), user["name"]) + + # reset changes (initials are removed) + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--surname=%(surname)s" % user, + "--given-name=%(given-name)s" % user) + self.assertCmdSuccess(result, out, err) + + if old_cn: + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--force-new-cn=%s" % old_cn) + + def test_rename_cn_samaccountname(self): + """rename and try to remove the cn and the samaccount of all users""" + for user in self.users: + new_cn = "new_cn_of_" + user["name"] + new_samaccountname = "new_samaccount_of_" + user["name"] + new_surname = "new_surname_of_" + user["name"] + + # rename cn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--samaccountname=%s" + % new_samaccountname, + "--force-new-cn=%s" % new_cn) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(new_samaccountname) + self.assertEqual("%s" % found.get("cn"), new_cn) + self.assertEqual("%s" % found.get("sAMAccountName"), + new_samaccountname) + + # changing the surname has no effect to the cn + (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname, + "--surname=%s" % new_surname) + self.assertCmdSuccess(result, out, err) + + found = self._find_user(new_samaccountname) + self.assertEqual("%s" % found.get("cn"), new_cn) + + # trying to remove cn (throws an error) + (result, out, err) = self.runsubcmd("user", "rename", + new_samaccountname, + "--force-new-cn=") + self.assertCmdFail(result) + self.assertIn('Failed to rename user', err) + self.assertIn("delete protected attribute", err) + + # trying to remove the samccountname (throws an error) + (result, out, err) = self.runsubcmd("user", "rename", + new_samaccountname, + "--samaccountname=") + self.assertCmdFail(result) + self.assertIn('Failed to rename user', err) + self.assertIn('delete protected attribute', err) + + # reset changes (cn must be the name) + (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname, + "--samaccountname=%(name)s" + % user, + "--force-new-cn=%(name)s" % user) + self.assertCmdSuccess(result, out, err) + + def test_rename_standard_cn(self): + """reset the cn of all users to the standard""" + for user in self.users: + new_cn = "new_cn_of_" + user["name"] + new_givenname = "new_given_name_of_" + user["name"] + new_initials = "A" + new_surname = "new_surname_of_" + user["name"] + + # set different cn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--force-new-cn=%s" % new_cn) + self.assertCmdSuccess(result, out, err) + + # remove given name, initials and surname + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--surname=", + "--initials=", + "--given-name=") + self.assertCmdSuccess(result, out, err) + + # reset the CN (no given name, initials or surname --> samaccountname) + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--reset-cn") + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual("%s" % found.get("cn"), user["name"]) + + # set given name, initials and surname and set different cn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--force-new-cn=%s" % new_cn, + "--surname=%s" % new_surname, + "--initials=%s" % new_initials, + "--given-name=%s" % new_givenname) + self.assertCmdSuccess(result, out, err) + + # reset the CN (given name, initials or surname are given --> given name) + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--reset-cn") + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual("%s" % found.get("cn"), + "%s %s. %s" % (new_givenname, new_initials, new_surname)) + + # reset changes + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--reset-cn", + "--initials=", + "--surname=%(surname)s" % user, + "--given-name=%(given-name)s" % user) + self.assertCmdSuccess(result, out, err) + + def test_rename_mailaddress_displayname(self): + for user in self.users: + new_mail = "new_mailaddress_of_" + user["name"] + new_displayname = "new displayname of " + user["name"] + + # change mail and displayname + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--mail-address=%s" + % new_mail, + "--display-name=%s" + % new_displayname) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual("%s" % found.get("mail"), new_mail) + self.assertEqual("%s" % found.get("displayName"), new_displayname) + + # remove mail and displayname + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--mail-address=", + "--display-name=") + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual(found.get("mail"), None) + self.assertEqual(found.get("displayName"), None) + + def test_rename_upn(self): + """rename upn of all users""" + for user in self.users: + found = self._find_user(user["name"]) + old_upn = "%s" % found.get("userPrincipalName") + valid_suffix = old_upn.split('@')[1] # samba.example.com + + valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix) + invalid_new_upn = "%s@invalid.suffix" + user["name"] + + # trying to set invalid upn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--upn=%s" + % invalid_new_upn) + self.assertCmdFail(result) + self.assertIn('is not a valid upn', err) + + # set valid upn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--upn=%s" + % valid_new_upn) + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn('successfully', out) + + found = self._find_user(user["name"]) + self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn) + + # trying to remove upn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--upn=%s") + self.assertCmdFail(result) + self.assertIn('is not a valid upn', err) + + # reset upn + (result, out, err) = self.runsubcmd("user", "rename", user["name"], + "--upn=%s" % old_upn) + self.assertCmdSuccess(result, out, err) + + def test_getpwent(self): + try: + import pwd + except ImportError: + self.skipTest("Skipping getpwent test, no 'pwd' module available") + return + + # get the current user's data for the test + uid = os.geteuid() + try: + u = pwd.getpwuid(uid) + except KeyError: + self.skipTest("Skipping getpwent test, current EUID not found in NSS") + return + + +# samba-tool user create command didn't support users with empty gecos if none is +# specified on the command line and the user hasn't one in the passwd file it +# will fail, so let's add some contents + + gecos = u[4] + if (gecos is None or len(gecos) == 0): + gecos = "Foo GECOS" + user = self._randomPosixUser({ + "name": u[0], + "uid": u[0], + "uidNumber": u[2], + "gidNumber": u[3], + "gecos": gecos, + "loginShell": u[6], + }) + + # Remove user if it already exists + if self._find_user(u[0]): + self.runsubcmd("user", "delete", u[0]) + # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid() + (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--gecos=%s" % user["gecos"], + "--rfc2307-from-nss", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn("User '%s' added successfully" % user["name"], out) + + self._check_posix_user(user) + self.runsubcmd("user", "delete", user["name"]) + + # Check if overriding the attributes from NSS with explicit values works + # + # get a user with all random posix attributes + user = self._randomPosixUser({"name": u[0]}) + + # Remove user if it already exists + if self._find_user(u[0]): + self.runsubcmd("user", "delete", u[0]) + # create a user with posix attributes from nss but override all of them with the + # random ones just obtained + (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--rfc2307-from-nss", + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "--uid-number=%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result, out, err) + self.assertEqual(err, "", "Shouldn't be any error messages") + self.assertIn("User '%s' added successfully" % user["name"], out) + + self._check_posix_user(user) + self.runsubcmd("user", "delete", user["name"]) + + # Test: samba-tool user unlock + # This test does not verify that the command unlocks the user, it just + # tests the command itself. The unlock test, which unlocks locked users, + # is located in the 'samba4.ldap.password_lockout' test in + # source4/dsdb/tests/python/password_lockout.py + def test_unlock(self): + + # try to unlock a nonexistent user, this should fail + nonexistentusername = "userdoesnotexist" + (result, out, err) = self.runsubcmd( + "user", "unlock", nonexistentusername) + self.assertCmdFail(result, "Ensure that unlock nonexistent user fails") + self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err) + self.assertIn("Unable to find user", err) + + # try to unlock with insufficient permissions, this should fail + unprivileged_username = "unprivilegedunlockuser" + unlocktest_username = "usertounlock" + + self.runsubcmd("user", "add", unprivileged_username, "Passw0rd") + self.runsubcmd("user", "add", unlocktest_username, "Passw0rd") + + (result, out, err) = self.runsubcmd( + "user", "unlock", unlocktest_username, + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (unprivileged_username, + "Passw0rd")) + self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS") + self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err) + self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err) + + self.runsubcmd("user", "delete", unprivileged_username) + self.runsubcmd("user", "delete", unlocktest_username) + + # run unlock against test users + for user in self.users: + (result, out, err) = self.runsubcmd( + "user", "unlock", user["name"]) + self.assertCmdSuccess(result, out, err, "Error running user unlock") + self.assertEqual(err, "", "Shouldn't be any error messages") + + def _randomUser(self, base=None): + """create a user with random attribute values, you can specify base attributes""" + if base is None: + base = {} + user = { + "name": self.randomName(), + "password": self.random_password(16), + "surname": self.randomName(), + "given-name": self.randomName(), + "job-title": self.randomName(), + "department": self.randomName(), + "company": self.randomName(), + "description": self.randomName(count=100), + "createUserFn": self._create_user, + "checkUserFn": self._check_user, + } + user.update(base) + return user + + def _randomPosixUser(self, base=None): + """create a user with random attribute values and additional RFC2307 + attributes, you can specify base attributes""" + if base is None: + base = {} + user = self._randomUser({}) + user.update(base) + posixAttributes = { + "uid": self.randomName(), + "loginShell": self.randomName(), + "gecos": self.randomName(), + "uidNumber": self.randomXid(), + "gidNumber": self.randomXid(), + "createUserFn": self._create_posix_user, + "checkUserFn": self._check_posix_user, + } + user.update(posixAttributes) + user.update(base) + return user + + def _randomUnixUser(self, base=None): + """create a user with random attribute values and additional RFC2307 + attributes, you can specify base attributes""" + if base is None: + base = {} + user = self._randomUser({}) + user.update(base) + posixAttributes = { + "uidNumber": self.randomXid(), + "gidNumber": self.randomXid(), + "uid": self.randomName(), + "loginShell": self.randomName(), + "gecos": self.randomName(), + "createUserFn": self._create_unix_user, + "checkUserFn": self._check_unix_user, + } + user.update(posixAttributes) + user.update(base) + return user + + def _check_user(self, user): + """ check if a user from SamDB has the same attributes as its template """ + found = self._find_user(user["name"]) + + self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user) + self.assertEqual("%s" % found.get("title"), user["job-title"]) + self.assertEqual("%s" % found.get("company"), user["company"]) + self.assertEqual("%s" % found.get("description"), user["description"]) + self.assertEqual("%s" % found.get("department"), user["department"]) + + def _check_posix_user(self, user): + """ check if a posix_user from SamDB has the same attributes as its template """ + found = self._find_user(user["name"]) + + self.assertEqual("%s" % found.get("loginShell"), user["loginShell"]) + self.assertEqual("%s" % found.get("gecos"), user["gecos"]) + self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"]) + self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"]) + self.assertEqual("%s" % found.get("uid"), user["uid"]) + self._check_user(user) + + def _check_unix_user(self, user): + """ check if a unix_user from SamDB has the same attributes as its +template """ + found = self._find_user(user["name"]) + + self.assertEqual("%s" % found.get("loginShell"), user["loginShell"]) + self.assertEqual("%s" % found.get("gecos"), user["gecos"]) + self.assertEqual("%s" % found.get("uidNumber"), "%s" % + user["uidNumber"]) + self.assertEqual("%s" % found.get("gidNumber"), "%s" % + user["gidNumber"]) + self.assertEqual("%s" % found.get("uid"), user["uid"]) + self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory")) + self._check_user(user) + + def _create_user(self, user): + return self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + def _create_posix_user(self, user): + """ create a new user with RFC2307 attributes """ + return self.runsubcmd("user", "create", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "--uid-number=%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + def _create_unix_user(self, user): + """ Add RFC2307 attributes to a user""" + self._create_user(user) + return self.runsubcmd("user", "addunixattrs", user["name"], + "%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + def _find_user(self, name): + search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn()) + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter) + if userlist: + return userlist[0] + else: + return None |