summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/blackbox/claims.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/tests/blackbox/claims.py')
-rwxr-xr-xpython/samba/tests/blackbox/claims.py526
1 files changed, 526 insertions, 0 deletions
diff --git a/python/samba/tests/blackbox/claims.py b/python/samba/tests/blackbox/claims.py
new file mode 100755
index 0000000..cad8095
--- /dev/null
+++ b/python/samba/tests/blackbox/claims.py
@@ -0,0 +1,526 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+#
+# Blackbox tests for claims support
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+
+from samba import NTSTATUSError
+from samba.auth import AuthContext
+from samba.credentials import Credentials
+from samba.gensec import FEATURE_SEAL, Security
+from samba.ntstatus import NT_STATUS_LOGON_FAILURE, NT_STATUS_UNSUCCESSFUL
+from samba.tests import BlackboxTestCase
+
+SERVER = os.environ["SERVER"]
+SERVER_USERNAME = os.environ["USERNAME"]
+SERVER_PASSWORD = os.environ["PASSWORD"]
+
+HOST = f"ldap://{SERVER}"
+CREDS = f"-U{SERVER_USERNAME}%{SERVER_PASSWORD}"
+
+
+class ClaimsSupportTests(BlackboxTestCase):
+ """Blackbox tests for Claims support
+
+ NOTE: all these commands are subcommands of samba-tool.
+
+ NOTE: the addCleanup functions get called automatically in reverse
+ order after the tests finishes, they don't execute straight away.
+ """
+
+ def test_device_group_restrictions(self):
+ client_password = "T3stPassword0nly"
+ target_password = "T3stC0mputerPassword"
+ device_password = "T3stD3vicePassword"
+
+ # Create target computer.
+ self.check_run("computer create claims-server")
+ self.addCleanup(self.run_command, "computer delete claims-server")
+ self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
+
+ # Create device computer.
+ self.check_run("computer create claims-device")
+ self.addCleanup(self.run_command, "computer delete claims-device")
+ self.check_run(rf"user setpassword claims-device\$ --newpassword={device_password}")
+
+ # Create a user.
+ self.check_run(f"user create claimstestuser {client_password}")
+ self.addCleanup(self.run_command, "user delete claimstestuser")
+
+ # Create an authentication policy.
+ self.check_run("domain auth policy create --enforce --name=device-restricted-users-pol")
+ self.addCleanup(self.run_command,
+ "domain auth policy delete --name=device-restricted-users-pol")
+
+ self.check_run("group add allowed-devices")
+ self.addCleanup(self.run_command, "group delete allowed-devices")
+
+ # Set allowed to authenticate from.
+ self.check_run("domain auth policy modify --name=device-restricted-users-pol "
+ "--user-allowed-to-authenticate-from-device-group=allowed-devices")
+
+ self.check_run("user auth policy assign claimstestuser --policy=device-restricted-users-pol")
+
+ with self.assertRaises(NTSTATUSError) as error:
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
+ self.assertEqual(
+ error.exception.args[1],
+ "The attempted logon is invalid. This is either due to a "
+ "bad username or authentication information.")
+
+ self.check_run("group addmembers allowed-devices claims-device")
+
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ def test_device_silo_restrictions(self):
+ client_password = "T3stPassword0nly"
+ target_password = "T3stC0mputerPassword"
+ device_password = "T3stD3vicePassword"
+
+ # Create target computer.
+ self.check_run("computer create claims-server")
+ self.addCleanup(self.run_command, "computer delete claims-server")
+ self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
+
+ # Create device computer.
+ self.check_run("computer create claims-device")
+ self.addCleanup(self.run_command, "computer delete claims-device")
+ self.check_run(rf"user setpassword claims-device\$ --newpassword={device_password}")
+
+ # Create a user.
+ self.check_run(f"user create claimstestuser {client_password}")
+ self.addCleanup(self.run_command, "user delete claimstestuser")
+
+ # Create an authentication policy.
+ self.check_run("domain auth policy create --enforce --name=allowed-devices-only-pol")
+ self.addCleanup(self.run_command,
+ "domain auth policy delete --name=allowed-devices-only-pol")
+
+ # Create an authentication silo.
+ self.check_run("domain auth silo create --enforce --name=allowed-devices-only-silo "
+ "--user-authentication-policy=allowed-devices-only-pol "
+ "--computer-authentication-policy=allowed-devices-only-pol "
+ "--service-authentication-policy=allowed-devices-only-pol")
+ self.addCleanup(self.run_command,
+ "domain auth silo delete --name=allowed-devices-only-silo")
+
+ # Set allowed to authenticate from (where the login can happen) and to
+ # (server requires silo that in term has this rule, so knows the user
+ # was required to authenticate from).
+ self.check_run("domain auth policy modify --name=allowed-devices-only-pol "
+ "--user-allowed-to-authenticate-from-device-silo=allowed-devices-only-silo")
+
+ # Grant access to silo.
+ self.check_run(r"domain auth silo member grant --name=allowed-devices-only-silo --member=claims-device\$")
+ self.check_run("domain auth silo member grant --name=allowed-devices-only-silo --member=claimstestuser")
+
+ # However with nothing assigned, allow-by-default still applies
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ # Show that adding a FAST armor from the device doesn't change
+ # things either way
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ # Assign silo to the user.
+ self.check_run("user auth silo assign claimstestuser --silo=allowed-devices-only-silo")
+
+ # We fail, as the KDC now requires the silo but the client is not using an approved device
+ with self.assertRaises(NTSTATUSError) as error:
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ self.assertEqual(error.exception.args[0], NT_STATUS_UNSUCCESSFUL)
+ self.assertIn(
+ "The requested operation was unsuccessful.",
+ error.exception.args[1])
+
+ # Assign silo to the device.
+ self.check_run(r"user auth silo assign claims-device\$ --silo=allowed-devices-only-silo")
+
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ def test_device_and_server_silo_restrictions(self):
+ client_password = "T3stPassword0nly"
+ target_password = "T3stC0mputerPassword"
+ device_password = "T3stD3vicePassword"
+
+ # Create target computer.
+ self.check_run("computer create claims-server")
+ self.addCleanup(self.run_command, "computer delete claims-server")
+ self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
+
+ # Create device computer.
+ self.check_run("computer create claims-device")
+ self.addCleanup(self.run_command, "computer delete claims-device")
+ self.check_run(rf"user setpassword claims-device\$ --newpassword={device_password}")
+
+ # Create a user.
+ self.check_run(f"user create claimstestuser {client_password}")
+ self.addCleanup(self.run_command, "user delete claimstestuser")
+
+ # Create an authentication policy.
+ self.check_run("domain auth policy create --enforce --name=allowed-devices-only-pol")
+ self.addCleanup(self.run_command,
+ "domain auth policy delete --name=allowed-devices-only-pol")
+
+ # Create an authentication silo.
+ self.check_run("domain auth silo create --enforce --name=allowed-devices-only-silo "
+ "--user-authentication-policy=allowed-devices-only-pol "
+ "--computer-authentication-policy=allowed-devices-only-pol "
+ "--service-authentication-policy=allowed-devices-only-pol")
+ self.addCleanup(self.run_command,
+ "domain auth silo delete --name=allowed-devices-only-silo")
+
+ # Set allowed to authenticate from (where the login can happen) and to
+ # (server requires silo that in term has this rule, so knows the user
+ # was required to authenticate from).
+ # If we assigned services to the silo we would need to add
+ # --service-allowed-to-authenticate-to/from options as well.
+ # Likewise, if there are services running in user accounts, we need
+ # --user-allowed-to-authenticate-to
+ self.check_run("domain auth policy modify --name=allowed-devices-only-pol "
+ "--user-allowed-to-authenticate-from-device-silo=allowed-devices-only-silo "
+ "--computer-allowed-to-authenticate-to-by-silo=allowed-devices-only-silo")
+
+ # Grant access to silo.
+ self.check_run(r"domain auth silo member grant --name=allowed-devices-only-silo --member=claims-device\$")
+ self.check_run(r"domain auth silo member grant --name=allowed-devices-only-silo --member=claims-server\$")
+ self.check_run("domain auth silo member grant --name=allowed-devices-only-silo --member=claimstestuser")
+
+ # However with nothing assigned, allow-by-default still applies
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ # Show that adding a FAST armor from the device doesn't change
+ # things either way
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ self.check_run(r"user auth silo assign claims-server\$ --silo=allowed-devices-only-silo")
+
+ # We fail, as the server now requires the silo but the client is not in it
+ with self.assertRaises(NTSTATUSError) as error:
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
+ self.assertEqual(
+ error.exception.args[1],
+ "The attempted logon is invalid. This is either due to a "
+ "bad username or authentication information.")
+
+ # Assign silo to the user.
+ self.check_run("user auth silo assign claimstestuser --silo=allowed-devices-only-silo")
+
+ # We fail, as the KDC now requires the silo but the client not is using an approved device
+ with self.assertRaises(NTSTATUSError) as error:
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ self.assertEqual(error.exception.args[0], NT_STATUS_UNSUCCESSFUL)
+ self.assertIn(
+ "The requested operation was unsuccessful.",
+ error.exception.args[1])
+
+ # Assign silo to the device.
+ self.check_run(r"user auth silo assign claims-device\$ --silo=allowed-devices-only-silo")
+
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ device_username="claims-device",
+ device_password=device_password,
+ )
+
+ def test_user_group_access(self):
+ """An example use with groups."""
+ client_password = "T3stPassword0nly"
+ target_password = "T3stC0mputerPassword"
+
+ # Create a computer.
+ self.check_run("computer create claims-server")
+ self.addCleanup(self.run_command, "computer delete claims-server")
+ self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
+
+ # Create a user.
+ self.check_run(f"user create claimstestuser {client_password}")
+ self.addCleanup(self.run_command, "user delete claimstestuser")
+
+ # Create an authentication policy.
+ self.check_run("domain auth policy create --enforce --name=restricted-servers-pol")
+ self.addCleanup(self.run_command,
+ "domain auth policy delete --name=restricted-servers-pol")
+
+ self.check_run("group add server-access-group")
+ self.addCleanup(self.run_command, "group delete server-access-group")
+
+ # Set allowed to authenticate to.
+ self.check_run("domain auth policy modify --name=restricted-servers-pol "
+ "--computer-allowed-to-authenticate-to-by-group=server-access-group")
+
+ self.check_run(r"user auth policy assign claims-server\$ --policy=restricted-servers-pol")
+
+ with self.assertRaises(NTSTATUSError) as error:
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
+ self.assertEqual(
+ error.exception.args[1],
+ "The attempted logon is invalid. This is either due to a "
+ "bad username or authentication information.")
+
+ # Add group members.
+ self.check_run("group addmembers server-access-group claimstestuser")
+
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ def test_user_silo_access(self):
+ """An example use with authentication silos."""
+ client_password = "T3stPassword0nly"
+ target_password = "T3stC0mputerPassword"
+
+ # Create a computer.
+ self.check_run("computer create claims-server")
+ self.addCleanup(self.run_command, "computer delete claims-server")
+ self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
+
+ # Create a user.
+ self.check_run(f"user create claimstestuser {client_password}")
+ self.addCleanup(self.run_command, "user delete claimstestuser")
+
+ # Create an authentication policy.
+ self.check_run("domain auth policy create --enforce --name=restricted-servers-pol")
+ self.addCleanup(self.run_command,
+ "domain auth policy delete --name=restricted-servers-pol")
+
+ # Create an authentication silo.
+ self.check_run("domain auth silo create --enforce --name=restricted-servers-silo "
+ "--user-authentication-policy=restricted-servers-pol "
+ "--computer-authentication-policy=restricted-servers-pol "
+ "--service-authentication-policy=restricted-servers-pol")
+ self.addCleanup(self.run_command,
+ "domain auth silo delete --name=restricted-servers-silo")
+
+ # Set allowed to authenticate to.
+ self.check_run("domain auth policy modify --name=restricted-servers-pol "
+ "--computer-allowed-to-authenticate-to-by-silo=restricted-servers-silo")
+
+ # Grant access to silo.
+ self.check_run(r"domain auth silo member grant --name=restricted-servers-silo --member=claims-server\$")
+ self.check_run("domain auth silo member grant --name=restricted-servers-silo --member=claimstestuser")
+
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ self.check_run(r"user auth silo assign claims-server\$ --silo=restricted-servers-silo")
+
+ with self.assertRaises(NTSTATUSError) as error:
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
+ self.assertEqual(
+ error.exception.args[1],
+ "The attempted logon is invalid. This is either due to a "
+ "bad username or authentication information.")
+
+ # Set assigned silo on user and computer.
+ self.check_run("user auth silo assign claimstestuser --silo=restricted-servers-silo")
+
+ self.verify_access(
+ client_username="claimstestuser",
+ client_password=client_password,
+ target_hostname="claims-server",
+ target_username="claims-server",
+ target_password=target_password,
+ )
+
+ @classmethod
+ def _make_cmdline(cls, line):
+ """Override to pass line as samba-tool subcommand instead.
+
+ Automatically fills in HOST and CREDS as well.
+ """
+ if isinstance(line, list):
+ cmd = ["samba-tool"] + line + ["-H", HOST, CREDS]
+ else:
+ cmd = f"samba-tool {line} -H {HOST} {CREDS}"
+
+ return super()._make_cmdline(cmd)
+
+ def verify_access(self, client_username, client_password,
+ target_hostname, target_username, target_password, *,
+ device_username=None, device_password=None):
+
+ lp = self.get_loadparm()
+
+ client_creds = Credentials()
+ client_creds.set_username(client_username)
+ client_creds.set_password(client_password)
+ client_creds.guess(lp)
+
+ if device_username:
+ device_creds = Credentials()
+ device_creds.set_username(device_username)
+ device_creds.set_password(device_password)
+ device_creds.guess(lp)
+ client_creds.set_krb5_fast_armor_credentials(device_creds, True)
+
+ target_creds = Credentials()
+ target_creds.set_username(target_username)
+ target_creds.set_password(target_password)
+ target_creds.guess(lp)
+
+ settings = {
+ "lp_ctx": lp,
+ "target_hostname": target_hostname
+ }
+
+ gensec_client = Security.start_client(settings)
+ gensec_client.set_credentials(client_creds)
+ gensec_client.want_feature(FEATURE_SEAL)
+ gensec_client.start_mech_by_sasl_name("GSSAPI")
+
+ gensec_target = Security.start_server(settings=settings,
+ auth_context=AuthContext(lp_ctx=lp))
+ gensec_target.set_credentials(target_creds)
+ gensec_target.start_mech_by_sasl_name("GSSAPI")
+
+ client_finished = False
+ server_finished = False
+ client_to_server = b""
+ server_to_client = b""
+
+ # Operate as both the client and the server to verify the user's
+ # credentials.
+ while not client_finished or not server_finished:
+ if not client_finished:
+ print("running client gensec_update")
+ client_finished, client_to_server = gensec_client.update(
+ server_to_client)
+ if not server_finished:
+ print("running server gensec_update")
+ server_finished, server_to_client = gensec_target.update(
+ client_to_server)
+
+
+if __name__ == "__main__":
+ import unittest
+ unittest.main()