summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/auth_log_samlogon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/tests/auth_log_samlogon.py')
-rw-r--r--python/samba/tests/auth_log_samlogon.py181
1 files changed, 181 insertions, 0 deletions
diff --git a/python/samba/tests/auth_log_samlogon.py b/python/samba/tests/auth_log_samlogon.py
new file mode 100644
index 0000000..f3dfeba
--- /dev/null
+++ b/python/samba/tests/auth_log_samlogon.py
@@ -0,0 +1,181 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""
+ Tests auth logging tests that exercise SamLogon
+"""
+
+import samba.tests
+import os
+from samba.samdb import SamDB
+import samba.tests.auth_log_base
+from samba.credentials import (
+ Credentials,
+ DONT_USE_KERBEROS,
+ CLI_CRED_NTLMv2_AUTH
+)
+from samba.dcerpc import ntlmssp, netlogon
+from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN
+from samba.ndr import ndr_pack
+from samba.auth import system_session
+from samba.tests import delete_force
+from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
+from samba.dcerpc.misc import SEC_CHAN_WKSTA
+from samba.dcerpc.windows_event_ids import (
+ EVT_ID_SUCCESSFUL_LOGON,
+ EVT_LOGON_NETWORK
+)
+
+
+class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
+
+ def setUp(self):
+ super().setUp()
+ self.lp = samba.tests.env_loadparm()
+ self.session = system_session()
+ self.ldb = SamDB(
+ session_info=self.session,
+ lp=self.lp)
+
+ self.domain = os.environ["DOMAIN"]
+ self.netbios_name = "SamLogonTest"
+ self.machinepass = "abcdefghij"
+ self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN
+ self.base_dn = self.ldb.domain_dn()
+ self.samlogon_dn = ("cn=%s,cn=users,%s" %
+ (self.netbios_name, self.base_dn))
+
+ def tearDown(self):
+ super().tearDown()
+ delete_force(self.ldb, self.samlogon_dn)
+
+ def _test_samlogon(self, binding, creds, checkFunction):
+
+ def isLastExpectedMessage(msg):
+ return (
+ msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"] == "SamLogon" and
+ msg["Authentication"]["authDescription"] == "network" and
+ msg["Authentication"]["passwordType"] == "NTLMv2" and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] == EVT_LOGON_NETWORK))
+
+ if binding:
+ binding = "[schannel,%s]" % binding
+ else:
+ binding = "[schannel]"
+
+ utf16pw = ('"' + self.machinepass + '"').encode('utf-16-le')
+ self.ldb.add({
+ "dn": self.samlogon_dn,
+ "objectclass": "computer",
+ "sAMAccountName": "%s$" % self.netbios_name,
+ "userAccountControl":
+ str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
+ "unicodePwd": utf16pw})
+
+ machine_creds = Credentials()
+ machine_creds.guess(self.get_loadparm())
+ machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
+ machine_creds.set_password(self.machinepass)
+ machine_creds.set_username(self.netbios_name + "$")
+
+ netlogon_conn = netlogon.netlogon("ncalrpc:%s" % binding,
+ self.get_loadparm(),
+ machine_creds)
+ challenge = b"abcdefgh"
+
+ target_info = ntlmssp.AV_PAIR_LIST()
+ target_info.count = 3
+
+ domainname = ntlmssp.AV_PAIR()
+ domainname.AvId = ntlmssp.MsvAvNbDomainName
+ domainname.Value = self.domain
+
+ computername = ntlmssp.AV_PAIR()
+ computername.AvId = ntlmssp.MsvAvNbComputerName
+ computername.Value = self.netbios_name
+
+ eol = ntlmssp.AV_PAIR()
+ eol.AvId = ntlmssp.MsvAvEOL
+ target_info.pair = [domainname, computername, eol]
+
+ target_info_blob = ndr_pack(target_info)
+
+ response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
+ challenge=challenge,
+ target_info=target_info_blob)
+
+ netr_flags = 0
+
+ logon_level = netlogon.NetlogonNetworkTransitiveInformation
+ logon = samba.dcerpc.netlogon.netr_NetworkInfo()
+
+ logon.challenge = [
+ x if isinstance(x, int) else ord(x) for x in challenge]
+ logon.nt = netlogon.netr_ChallengeResponse()
+ logon.nt.length = len(response["nt_response"])
+ logon.nt.data = [
+ x if isinstance(x, int) else ord(x) for
+ x in response["nt_response"]
+ ]
+ logon.identity_info = samba.dcerpc.netlogon.netr_IdentityInfo()
+ (username, domain) = creds.get_ntlm_username_domain()
+
+ logon.identity_info.domain_name.string = domain
+ logon.identity_info.account_name.string = username
+ logon.identity_info.workstation.string = creds.get_workstation()
+
+ validation_level = samba.dcerpc.netlogon.NetlogonValidationSamInfo4
+
+ result = netlogon_conn.netr_LogonSamLogonEx(
+ os.environ["SERVER"],
+ machine_creds.get_workstation(),
+ logon_level, logon,
+ validation_level, netr_flags)
+
+ (validation, authoritative, netr_flags_out) = result
+
+ messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn)
+ checkFunction(messages)
+
+ def samlogon_check(self, messages):
+
+ messages = self.remove_netlogon_messages(messages)
+ expected_messages = 5
+ self.assertEqual(expected_messages,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authorization
+ msg = messages[0]
+ self.assertEqual("Authorization", msg["type"])
+ self.assertEqual("DCE/RPC",
+ msg["Authorization"]["serviceDescription"])
+ self.assertEqual("ncalrpc", msg["Authorization"]["authType"])
+ self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
+
+ def test_ncalrpc_samlogon(self):
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ try:
+ self._test_samlogon("SEAL", creds, self.samlogon_check)
+ except Exception as e:
+ self.fail("Unexpected exception: " + str(e))