summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/samba_tool/user_get_kerberos_ticket.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/tests/samba_tool/user_get_kerberos_ticket.py')
-rw-r--r--python/samba/tests/samba_tool/user_get_kerberos_ticket.py195
1 files changed, 195 insertions, 0 deletions
diff --git a/python/samba/tests/samba_tool/user_get_kerberos_ticket.py b/python/samba/tests/samba_tool/user_get_kerberos_ticket.py
new file mode 100644
index 0000000..4ac502e
--- /dev/null
+++ b/python/samba/tests/samba_tool/user_get_kerberos_ticket.py
@@ -0,0 +1,195 @@
+# Unix SMB/CIFS implementation.
+#
+# Blackbox tests for getting Kerberos tickets from Group Managed Service Account and other (local) passwords
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# Copyright Andrew Bartlett <abartlet@samba.org> 2023
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from ldb import SCOPE_BASE
+from samba import credentials
+from samba.credentials import Credentials, MUST_USE_KERBEROS
+from samba.dcerpc import security, samr
+from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_NORMAL_ACCOUNT
+from samba.netcmd.domain.models import User
+from samba.ndr import ndr_pack, ndr_unpack
+from samba.tests import connect_samdb, connect_samdb_env, delete_force
+
+from samba.tests import BlackboxTestCase, BlackboxProcessError
+
+
+# If not specified, this is None, meaning local sam.ldb
+PW_READ_URL = os.environ.get("PW_READ_URL")
+
+# We still need to connect to a remote server to check we got the ticket
+SERVER = os.environ.get("SERVER")
+
+PW_CHECK_URL = f"ldap://{SERVER}"
+
+# For authentication to PW_READ_URL if required
+SERVER_USERNAME = os.environ["USERNAME"]
+SERVER_PASSWORD = os.environ["PASSWORD"]
+
+CREDS = f"-U{SERVER_USERNAME}%{SERVER_PASSWORD}"
+
+
+class GetKerberosTiketTest(BlackboxTestCase):
+ """Blackbox tests for GMSA getpassword and connecting as that user."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.lp = cls.get_loadparm()
+ cls.env_creds = cls.get_env_credentials(lp=cls.lp,
+ env_username="USERNAME",
+ env_password="PASSWORD",
+ env_domain="DOMAIN",
+ env_realm="REALM")
+ if PW_READ_URL is None:
+ url = cls.lp.private_path("sam.ldb")
+ else:
+ url = PW_CHECK_URL
+ cls.samdb = connect_samdb(url, lp=cls.lp, credentials=cls.env_creds)
+ super().setUpClass()
+
+ @classmethod
+ def setUpTestData(cls):
+ cls.gmsa_username = "GMSA_K5Test_User$"
+ cls.username = "get-kerberos-ticket-test"
+ cls.user_base_dn = f"CN=Users,{cls.samdb.domain_dn()}"
+ cls.user_dn = f"CN={cls.username},{cls.user_base_dn}"
+ cls.gmsa_base_dn = f"CN=Managed Service Accounts,{cls.samdb.domain_dn()}"
+ cls.gmsa_user_dn = f"CN={cls.gmsa_username},{cls.gmsa_base_dn}"
+
+ msg = cls.samdb.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
+ connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
+
+ domain_sid = security.dom_sid(cls.samdb.get_domain_sid())
+ allow_sddl = f"O:SYD:(A;;RP;;;{connecting_user_sid})"
+ allow_sd = ndr_pack(security.descriptor.from_sddl(allow_sddl, domain_sid))
+
+ details = {
+ "dn": str(cls.gmsa_user_dn),
+ "objectClass": "msDS-GroupManagedServiceAccount",
+ "msDS-ManagedPasswordInterval": "1",
+ "msDS-GroupMSAMembership": allow_sd,
+ "sAMAccountName": cls.gmsa_username,
+ "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT),
+ }
+
+ cls.samdb.add(details)
+ cls.addClassCleanup(delete_force, cls.samdb, cls.gmsa_user_dn)
+
+ user_password = "P@ssw0rd"
+ utf16pw = ('"' + user_password + '"').encode('utf-16-le')
+ user_details = {
+ "dn": str(cls.user_dn),
+ "objectClass": "user",
+ "sAMAccountName": cls.username,
+ "userAccountControl": str(UF_NORMAL_ACCOUNT),
+ "unicodePwd": utf16pw
+ }
+
+ cls.samdb.add(user_details)
+ cls.addClassCleanup(delete_force, cls.samdb, cls.user_dn)
+
+ cls.gmsa_user = User.get(cls.samdb, username=cls.gmsa_username)
+ cls.user = User.get(cls.samdb, username=cls.username)
+
+ def get_ticket(self, username, options=None):
+ if options is None:
+ options = ""
+ ccache_path = f"{self.tempdir}/ccache"
+ ccache_location = f"FILE:{ccache_path}"
+ cmd = f"user get-kerberos-ticket --output-krb5-ccache={ccache_location} {username} {options}"
+
+ try:
+ self.check_output(cmd)
+ except BlackboxProcessError as e:
+ self.fail(e)
+ self.addCleanup(os.unlink, ccache_path)
+ return ccache_location
+
+ def test_gmsa_ticket(self):
+ # Get a ticket with the tool
+ output_ccache = self.get_ticket(self.gmsa_username)
+ creds = self.insta_creds(template=self.env_creds)
+ creds.set_kerberos_state(MUST_USE_KERBEROS)
+ creds.set_named_ccache(self.lp, output_ccache)
+ db = connect_samdb(PW_CHECK_URL, credentials=creds, lp=self.lp)
+ msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
+ connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
+
+ self.assertEqual(self.gmsa_user.object_sid, connecting_user_sid)
+
+ def test_user_ticket(self):
+ output_ccache = self.get_ticket(self.username)
+ # Get a ticket with the tool
+ creds = self.insta_creds(template=self.env_creds)
+ creds.set_kerberos_state(MUST_USE_KERBEROS)
+
+ # Currently this is based on reading the unicodePwd, but this should be expanded
+ creds.set_named_ccache(output_ccache, credentials.SPECIFIED, self.lp)
+
+ db = connect_samdb(PW_CHECK_URL, credentials=creds, lp=self.lp)
+
+ msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
+ connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
+
+ self.assertEqual(self.user.object_sid, connecting_user_sid)
+
+ def test_user_ticket_gpg(self):
+ output_ccache = self.get_ticket(self.username, "--decrypt-samba-gpg")
+ # Get a ticket with the tool
+ creds = self.insta_creds(template=self.env_creds)
+ creds.set_kerberos_state(MUST_USE_KERBEROS)
+ creds.set_named_ccache(output_ccache, credentials.SPECIFIED, self.lp)
+ db = connect_samdb(PW_CHECK_URL, credentials=creds, lp=self.lp)
+
+ msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
+ connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
+
+ self.assertEqual(self.user.object_sid, connecting_user_sid)
+
+ @classmethod
+ def _make_cmdline(cls, line):
+ """Override to pass line as samba-tool subcommand instead.
+
+ Automatically fills in HOST and CREDS as well.
+ """
+ if isinstance(line, list):
+ cmd = ["samba-tool"] + line
+ if PW_READ_URL is not None:
+ cmd += ["-H", PW_READ_URL, CREDS]
+ else:
+ cmd = f"samba-tool {line}"
+ if PW_READ_URL is not None:
+ cmd += "-H {PW_READ_URL} {CREDS}"
+
+ return super()._make_cmdline(cmd)
+
+
+if __name__ == "__main__":
+ import unittest
+ unittest.main()