summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/krb5/lockout_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/tests/krb5/lockout_tests.py')
-rwxr-xr-xpython/samba/tests/krb5/lockout_tests.py1137
1 files changed, 1137 insertions, 0 deletions
diff --git a/python/samba/tests/krb5/lockout_tests.py b/python/samba/tests/krb5/lockout_tests.py
new file mode 100755
index 0000000..d91eb1d
--- /dev/null
+++ b/python/samba/tests/krb5/lockout_tests.py
@@ -0,0 +1,1137 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+# Copyright (C) Stefan Metzmacher 2020
+# Copyright (C) Catalyst.Net Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+import os
+
+sys.path.insert(0, 'bin/python')
+os.environ['PYTHONUNBUFFERED'] = '1'
+
+from concurrent import futures
+from enum import Enum
+from functools import partial
+from multiprocessing import Pipe
+import time
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.ciphers.base import Cipher
+from cryptography.hazmat.primitives.ciphers import algorithms
+
+import ldb
+
+from samba import (
+ NTSTATUSError,
+ dsdb,
+ generate_random_bytes,
+ generate_random_password,
+ ntstatus,
+ unix2nttime,
+ werror,
+)
+from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
+from samba.crypto import (
+ aead_aes_256_cbc_hmac_sha512_blob,
+ des_crypt_blob_16,
+ md4_hash_blob,
+ sha512_pbkdf2,
+)
+from samba.dcerpc import lsa, samr
+from samba.samdb import SamDB
+
+from samba.tests import connect_samdb, env_get_var_value, env_loadparm
+
+from samba.tests.krb5.as_req_tests import AsReqBaseTest
+from samba.tests.krb5 import kcrypto
+from samba.tests.krb5.kdc_base_test import KDCBaseTest
+from samba.tests.krb5.raw_testcase import KerberosCredentials
+import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
+from samba.tests.krb5.rfc4120_constants import (
+ KDC_ERR_CLIENT_REVOKED,
+ KDC_ERR_PREAUTH_FAILED,
+ KRB_AS_REP,
+ KRB_ERROR,
+ NT_PRINCIPAL,
+ NT_SRV_INST,
+)
+
+global_asn1_print = False
+global_hexdump = False
+
+
+class ConnectionResult(Enum):
+ LOCKED_OUT = 1
+ WRONG_PASSWORD = 2
+ SUCCESS = 3
+
+
+def connect_kdc(pipe,
+ url,
+ hostname,
+ username,
+ password,
+ domain,
+ realm,
+ workstation,
+ dn,
+ expect_error=True,
+ expect_status=None):
+ AsReqBaseTest.setUpClass()
+ as_req_base = AsReqBaseTest()
+ as_req_base.setUp()
+
+ user_creds = KerberosCredentials()
+ user_creds.set_username(username)
+ user_creds.set_password(password)
+ user_creds.set_domain(domain)
+ user_creds.set_realm(realm)
+ user_creds.set_workstation(workstation)
+ user_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ user_name = user_creds.get_username()
+ cname = as_req_base.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=user_name.split('/'))
+
+ krbtgt_creds = as_req_base.get_krbtgt_creds()
+ krbtgt_supported_etypes = krbtgt_creds.tgs_supported_enctypes
+ realm = krbtgt_creds.get_realm()
+
+ krbtgt_account = krbtgt_creds.get_username()
+ sname = as_req_base.PrincipalName_create(name_type=NT_SRV_INST,
+ names=[krbtgt_account, realm])
+
+ expected_salt = user_creds.get_salt()
+
+ till = as_req_base.get_KerberosTime(offset=36000)
+
+ kdc_options = krb5_asn1.KDCOptions('postdated')
+
+ preauth_key = as_req_base.PasswordKey_from_creds(user_creds,
+ kcrypto.Enctype.AES256)
+
+ ts_enc_padata = as_req_base.get_enc_timestamp_pa_data_from_key(preauth_key)
+ padata = [ts_enc_padata]
+
+ krbtgt_decryption_key = (
+ as_req_base.TicketDecryptionKey_from_creds(krbtgt_creds))
+
+ etypes = as_req_base.get_default_enctypes(user_creds)
+
+ # Remove the LDAP connection.
+ del type(as_req_base)._ldb
+
+ if expect_error:
+ expected_error_modes = (KDC_ERR_CLIENT_REVOKED,
+ KDC_ERR_PREAUTH_FAILED)
+
+ # Wrap generic_check_kdc_error() to expect an NTSTATUS code when the
+ # account is locked out.
+ def check_error_fn(kdc_exchange_dict,
+ callback_dict,
+ rep):
+ error_code = rep.get('error-code')
+ if error_code == KDC_ERR_CLIENT_REVOKED:
+ # The account was locked out.
+ kdc_exchange_dict['expected_status'] = (
+ ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT)
+
+ if expect_status:
+ # Expect to get a LOCKED_OUT NTSTATUS code.
+ kdc_exchange_dict['expect_edata'] = True
+ kdc_exchange_dict['expect_status'] = True
+
+ elif error_code == KDC_ERR_PREAUTH_FAILED:
+ # Just a wrong password: the account wasn’t locked out. Don’t
+ # expect an NTSTATUS code.
+ kdc_exchange_dict['expect_status'] = False
+
+ # Continue with the generic error-checking logic.
+ return as_req_base.generic_check_kdc_error(
+ kdc_exchange_dict,
+ callback_dict,
+ rep)
+
+ check_rep_fn = None
+ else:
+ expected_error_modes = 0
+
+ check_error_fn = None
+ check_rep_fn = as_req_base.generic_check_kdc_rep
+
+ def _generate_padata_copy(_kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ return padata, req_body
+
+ kdc_exchange_dict = as_req_base.as_exchange_dict(
+ creds=user_creds,
+ expected_crealm=realm,
+ expected_cname=cname,
+ expected_srealm=realm,
+ expected_sname=sname,
+ expected_account_name=user_name,
+ expected_supported_etypes=krbtgt_supported_etypes,
+ ticket_decryption_key=krbtgt_decryption_key,
+ generate_padata_fn=_generate_padata_copy,
+ check_error_fn=check_error_fn,
+ check_rep_fn=check_rep_fn,
+ check_kdc_private_fn=as_req_base.generic_check_kdc_private,
+ expected_error_mode=expected_error_modes,
+ expected_salt=expected_salt,
+ preauth_key=preauth_key,
+ kdc_options=str(kdc_options),
+ pac_request=True)
+
+ # Indicate that we're ready. This ensures we hit the right transaction
+ # lock.
+ pipe.send_bytes(b'0')
+
+ # Wait for the main process to take out a transaction lock.
+ if not pipe.poll(timeout=5):
+ raise AssertionError('main process failed to indicate readiness')
+
+ # Try making a Kerberos AS-REQ to the KDC. This might fail, either due to
+ # the user's account being locked out or due to using the wrong password.
+ as_rep = as_req_base._generic_kdc_exchange(kdc_exchange_dict,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ till_time=till,
+ etypes=etypes)
+
+ as_req_base.assertIsNotNone(as_rep)
+
+ msg_type = as_rep['msg-type']
+ if expect_error and msg_type != KRB_ERROR or (
+ not expect_error and msg_type != KRB_AS_REP):
+ raise AssertionError(f'wrong message type {msg_type}')
+
+ if not expect_error:
+ return ConnectionResult.SUCCESS
+
+ error_code = as_rep['error-code']
+ if error_code == KDC_ERR_CLIENT_REVOKED:
+ return ConnectionResult.LOCKED_OUT
+ elif error_code == KDC_ERR_PREAUTH_FAILED:
+ return ConnectionResult.WRONG_PASSWORD
+ else:
+ raise AssertionError(f'wrong error code {error_code}')
+
+
+def connect_ntlm(pipe,
+ url,
+ hostname,
+ username,
+ password,
+ domain,
+ realm,
+ workstation,
+ dn):
+ user_creds = KerberosCredentials()
+ user_creds.set_username(username)
+ user_creds.set_password(password)
+ user_creds.set_domain(domain)
+ user_creds.set_workstation(workstation)
+ user_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ # Indicate that we're ready. This ensures we hit the right transaction
+ # lock.
+ pipe.send_bytes(b'0')
+
+ # Wait for the main process to take out a transaction lock.
+ if not pipe.poll(timeout=5):
+ raise AssertionError('main process failed to indicate readiness')
+
+ try:
+ # Try connecting to SamDB. This should fail, either due to our
+ # account being locked out or due to using the wrong password.
+ SamDB(url=url,
+ credentials=user_creds,
+ lp=env_loadparm())
+ except ldb.LdbError as err:
+ num, estr = err.args
+
+ if num != ldb.ERR_INVALID_CREDENTIALS:
+ raise AssertionError(f'connection raised wrong error code '
+ f'({err})')
+
+ if f'data {werror.WERR_ACCOUNT_LOCKED_OUT:x},' in estr:
+ return ConnectionResult.LOCKED_OUT
+ elif f'data {werror.WERR_LOGON_FAILURE:x},' in estr:
+ return ConnectionResult.WRONG_PASSWORD
+ else:
+ raise AssertionError(f'connection raised wrong error code '
+ f'({estr})')
+ else:
+ return ConnectionResult.SUCCESS
+
+
+def connect_samr(pipe,
+ url,
+ hostname,
+ username,
+ password,
+ domain,
+ realm,
+ workstation,
+ dn):
+ # Get the user's NT hash.
+ user_creds = KerberosCredentials()
+ user_creds.set_password(password)
+ nt_hash = user_creds.get_nt_hash()
+
+ # Generate a new UTF-16 password.
+ new_password = generate_random_password(32, 32)
+ new_password = new_password.encode('utf-16le')
+
+ # Generate the MD4 hash of the password.
+ new_password_md4 = md4_hash_blob(new_password)
+
+ # Prefix the password with padding so it is 512 bytes long.
+ new_password_len = len(new_password)
+ remaining_len = 512 - new_password_len
+ new_password = bytes(remaining_len) + new_password
+
+ # Append the 32-bit length of the password..
+ new_password += int.to_bytes(new_password_len,
+ length=4,
+ byteorder='little')
+
+ # Encrypt the password with RC4 and the existing NT hash.
+ encryptor = Cipher(algorithms.ARC4(nt_hash),
+ None,
+ default_backend()).encryptor()
+ new_password = encryptor.update(new_password)
+
+ # Create a key from the MD4 hash of the new password.
+ key = new_password_md4[:14]
+
+ # Encrypt the old NT hash with DES to obtain the verifier.
+ verifier = des_crypt_blob_16(nt_hash, key)
+
+ server = lsa.String()
+ server.string = hostname
+
+ account = lsa.String()
+ account.string = username
+
+ nt_password = samr.CryptPassword()
+ nt_password.data = list(new_password)
+
+ nt_verifier = samr.Password()
+ nt_verifier.hash = list(verifier)
+
+ conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
+
+ # Indicate that we're ready. This ensures we hit the right transaction
+ # lock.
+ pipe.send_bytes(b'0')
+
+ # Wait for the main process to take out a transaction lock.
+ if not pipe.poll(timeout=5):
+ raise AssertionError('main process failed to indicate readiness')
+
+ try:
+ # Try changing the password. This should fail, either due to our
+ # account being locked out or due to using the wrong password.
+ conn.ChangePasswordUser3(server=server,
+ account=account,
+ nt_password=nt_password,
+ nt_verifier=nt_verifier,
+ lm_change=True,
+ lm_password=None,
+ lm_verifier=None,
+ password3=None)
+ except NTSTATUSError as err:
+ num, estr = err.args
+
+ if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
+ return ConnectionResult.LOCKED_OUT
+ elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
+ return ConnectionResult.WRONG_PASSWORD
+ else:
+ raise AssertionError(f'pwd change raised wrong error code '
+ f'({num:08X})')
+ else:
+ return ConnectionResult.SUCCESS
+
+
+def connect_samr_aes(pipe,
+ url,
+ hostname,
+ username,
+ password,
+ domain,
+ realm,
+ workstation,
+ dn):
+ # Get the user's NT hash.
+ user_creds = KerberosCredentials()
+ user_creds.set_password(password)
+ nt_hash = user_creds.get_nt_hash()
+
+ # Generate a new UTF-16 password.
+ new_password = generate_random_password(32, 32)
+ new_password = new_password.encode('utf-16le')
+
+ # Prepend the 16-bit length of the password..
+ new_password_len = int.to_bytes(len(new_password),
+ length=2,
+ byteorder='little')
+ new_password = new_password_len + new_password
+
+ server = lsa.String()
+ server.string = hostname
+
+ account = lsa.String()
+ account.string = username
+
+ # Derive a key from the user's NT hash.
+ iv = generate_random_bytes(16)
+ iterations = 5555
+ cek = sha512_pbkdf2(nt_hash, iv, iterations)
+
+ enc_key_salt = (b'Microsoft SAM encryption key '
+ b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
+ mac_key_salt = (b'Microsoft SAM MAC key '
+ b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
+
+ # Encrypt the new password.
+ ciphertext, auth_data = aead_aes_256_cbc_hmac_sha512_blob(new_password,
+ cek,
+ enc_key_salt,
+ mac_key_salt,
+ iv)
+
+ # Create the new password structure
+ pwd_buf = samr.EncryptedPasswordAES()
+ pwd_buf.auth_data = list(auth_data)
+ pwd_buf.salt = list(iv)
+ pwd_buf.cipher_len = len(ciphertext)
+ pwd_buf.cipher = list(ciphertext)
+ pwd_buf.PBKDF2Iterations = iterations
+
+ conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
+
+ # Indicate that we're ready. This ensures we hit the right transaction
+ # lock.
+ pipe.send_bytes(b'0')
+
+ # Wait for the main process to take out a transaction lock.
+ if not pipe.poll(timeout=5):
+ raise AssertionError('main process failed to indicate readiness')
+
+ try:
+ # Try changing the password. This should fail, either due to our
+ # account being locked out or due to using the wrong password.
+ conn.ChangePasswordUser4(server=server,
+ account=account,
+ password=pwd_buf)
+ except NTSTATUSError as err:
+ num, estr = err.args
+
+ if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
+ return ConnectionResult.LOCKED_OUT
+ elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
+ return ConnectionResult.WRONG_PASSWORD
+ else:
+ raise AssertionError(f'pwd change raised wrong error code '
+ f'({num:08X})')
+ else:
+ return ConnectionResult.SUCCESS
+
+
+def ldap_pwd_change(pipe,
+ url,
+ hostname,
+ username,
+ password,
+ domain,
+ realm,
+ workstation,
+ dn):
+ lp = env_loadparm()
+
+ admin_creds = KerberosCredentials()
+ admin_creds.guess(lp)
+ admin_creds.set_username(env_get_var_value('ADMIN_USERNAME'))
+ admin_creds.set_password(env_get_var_value('ADMIN_PASSWORD'))
+ admin_creds.set_kerberos_state(MUST_USE_KERBEROS)
+
+ samdb = SamDB(url=url,
+ credentials=admin_creds,
+ lp=lp)
+
+ old_utf16pw = f'"{password}"'.encode('utf-16le')
+
+ new_password = generate_random_password(32, 32)
+ new_utf16pw = f'"{new_password}"'.encode('utf-16le')
+
+ msg = ldb.Message(ldb.Dn(samdb, dn))
+ msg['0'] = ldb.MessageElement(old_utf16pw,
+ ldb.FLAG_MOD_DELETE,
+ 'unicodePwd')
+ msg['1'] = ldb.MessageElement(new_utf16pw,
+ ldb.FLAG_MOD_ADD,
+ 'unicodePwd')
+
+ # Indicate that we're ready. This ensures we hit the right transaction
+ # lock.
+ pipe.send_bytes(b'0')
+
+ # Wait for the main process to take out a transaction lock.
+ if not pipe.poll(timeout=5):
+ raise AssertionError('main process failed to indicate readiness')
+
+ # Try changing the user's password. This should fail, either due to the
+ # user's account being locked out or due to specifying the wrong password.
+ try:
+ samdb.modify(msg)
+ except ldb.LdbError as err:
+ num, estr = err.args
+ if num != ldb.ERR_CONSTRAINT_VIOLATION:
+ raise AssertionError(f'pwd change raised wrong error code ({err})')
+
+ if f'<{werror.WERR_ACCOUNT_LOCKED_OUT:08X}:' in estr:
+ return ConnectionResult.LOCKED_OUT
+ elif f'<{werror.WERR_INVALID_PASSWORD:08X}:' in estr:
+ return ConnectionResult.WRONG_PASSWORD
+ else:
+ raise AssertionError(f'pwd change raised wrong error code '
+ f'({estr})')
+ else:
+ return ConnectionResult.SUCCESS
+
+
+class LockoutTests(KDCBaseTest):
+
+ def setUp(self):
+ super().setUp()
+ self.do_asn1_print = global_asn1_print
+ self.do_hexdump = global_hexdump
+
+ samdb = self.get_samdb()
+ base_dn = ldb.Dn(samdb, samdb.domain_dn())
+
+ def modify_attr(attr, value):
+ if value is None:
+ value = []
+ flag = ldb.FLAG_MOD_DELETE
+ else:
+ value = str(value)
+ flag = ldb.FLAG_MOD_REPLACE
+
+ msg = ldb.Message(base_dn)
+ msg[attr] = ldb.MessageElement(
+ value, flag, attr)
+ samdb.modify(msg)
+
+ res = samdb.search(base_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['lockoutDuration',
+ 'lockoutThreshold',
+ 'msDS-LogonTimeSyncInterval'])
+ self.assertEqual(1, len(res))
+
+ # Reset the lockout duration as it was before.
+ lockout_duration = res[0].get('lockoutDuration', idx=0)
+ self.addCleanup(modify_attr, 'lockoutDuration', lockout_duration)
+
+ # Set the new lockout duration: locked out accounts now stay locked
+ # out.
+ modify_attr('lockoutDuration', 0)
+
+ # Reset the lockout threshold as it was before.
+ lockout_threshold = res[0].get('lockoutThreshold', idx=0)
+ self.addCleanup(modify_attr, 'lockoutThreshold', lockout_threshold)
+
+ # Set the new lockout threshold.
+ self.lockout_threshold = 3
+ modify_attr('lockoutThreshold', self.lockout_threshold)
+
+ # Reset the logon time sync interval as it was before.
+ sync_interval = res[0].get('msDS-LogonTimeSyncInterval', idx=0)
+ self.addCleanup(modify_attr,
+ 'msDS-LogonTimeSyncInterval',
+ sync_interval)
+
+ # Set the new logon time sync interval. Setting it to 0 eliminates the
+ # need for this attribute to be updated on logon, and thus the
+ # requirement to take out a transaction.
+ modify_attr('msDS-LogonTimeSyncInterval', 0)
+
+ # Get the old 'minPwdAge'.
+ minPwdAge = samdb.get_minPwdAge()
+
+ # Reset the 'minPwdAge' as it was before.
+ self.addCleanup(samdb.set_minPwdAge, minPwdAge)
+
+ # Set it temporarily to '0'.
+ samdb.set_minPwdAge('0')
+
+ def assertLocalSamDB(self, samdb):
+ if samdb.url.startswith('tdb://'):
+ return
+ if samdb.url.startswith('mdb://'):
+ return
+
+ self.fail(f'connection to {samdb.url} is not local!')
+
+ def wait_for_ready(self, pipe, future):
+ if pipe.poll(timeout=5):
+ return
+
+ # We failed to read a response from the pipe, so see if the test raised
+ # an exception with more information.
+ if future.done():
+ exception = future.exception(timeout=0)
+ if exception is not None:
+ raise exception
+
+ self.fail('test failed to indicate readiness')
+
+ def test_lockout_transaction_kdc(self):
+ self.do_lockout_transaction(connect_kdc)
+
+ def test_lockout_transaction_kdc_ntstatus(self):
+ self.do_lockout_transaction(partial(connect_kdc, expect_status=True))
+
+ def test_lockout_transaction_ntlm(self):
+ self.do_lockout_transaction(connect_ntlm)
+
+ def test_lockout_transaction_samr(self):
+ self.do_lockout_transaction(connect_samr)
+
+ def test_lockout_transaction_samr_aes(self):
+ self.do_lockout_transaction(connect_samr_aes)
+
+ def test_lockout_transaction_ldap_pw_change(self):
+ self.do_lockout_transaction(ldap_pwd_change)
+
+ # Tests to ensure we can handle the account being renamed. We do not test
+ # renames with SAMR password changes, because in that case the entire
+ # process happens inside a transaction, and the password change method only
+ # receives the account username. By the time it searches for the account,
+ # it will have already been renamed, and so it will always fail to find the
+ # account.
+
+ def test_lockout_transaction_rename_kdc(self):
+ self.do_lockout_transaction(connect_kdc, rename=True)
+
+ def test_lockout_transaction_rename_kdc_ntstatus(self):
+ self.do_lockout_transaction(partial(connect_kdc, expect_status=True),
+ rename=True)
+
+ def test_lockout_transaction_rename_ntlm(self):
+ self.do_lockout_transaction(connect_ntlm, rename=True)
+
+ def test_lockout_transaction_rename_ldap_pw_change(self):
+ self.do_lockout_transaction(ldap_pwd_change, rename=True)
+
+ def test_lockout_transaction_bad_pwd_kdc(self):
+ self.do_lockout_transaction(connect_kdc, correct_pw=False)
+
+ def test_lockout_transaction_bad_pwd_kdc_ntstatus(self):
+ self.do_lockout_transaction(partial(connect_kdc, expect_status=True),
+ correct_pw=False)
+
+ def test_lockout_transaction_bad_pwd_ntlm(self):
+ self.do_lockout_transaction(connect_ntlm, correct_pw=False)
+
+ def test_lockout_transaction_bad_pwd_samr(self):
+ self.do_lockout_transaction(connect_samr, correct_pw=False)
+
+ def test_lockout_transaction_bad_pwd_samr_aes(self):
+ self.do_lockout_transaction(connect_samr_aes, correct_pw=False)
+
+ def test_lockout_transaction_bad_pwd_ldap_pw_change(self):
+ self.do_lockout_transaction(ldap_pwd_change, correct_pw=False)
+
+ def test_bad_pwd_count_transaction_kdc(self):
+ self.do_bad_pwd_count_transaction(connect_kdc)
+
+ def test_bad_pwd_count_transaction_ntlm(self):
+ self.do_bad_pwd_count_transaction(connect_ntlm)
+
+ def test_bad_pwd_count_transaction_samr(self):
+ self.do_bad_pwd_count_transaction(connect_samr)
+
+ def test_bad_pwd_count_transaction_samr_aes(self):
+ self.do_bad_pwd_count_transaction(connect_samr_aes)
+
+ def test_bad_pwd_count_transaction_ldap_pw_change(self):
+ self.do_bad_pwd_count_transaction(ldap_pwd_change)
+
+ def test_bad_pwd_count_transaction_rename_kdc(self):
+ self.do_bad_pwd_count_transaction(connect_kdc, rename=True)
+
+ def test_bad_pwd_count_transaction_rename_ntlm(self):
+ self.do_bad_pwd_count_transaction(connect_ntlm, rename=True)
+
+ def test_bad_pwd_count_transaction_rename_ldap_pw_change(self):
+ self.do_bad_pwd_count_transaction(ldap_pwd_change, rename=True)
+
+ def test_lockout_race_kdc(self):
+ self.do_lockout_race(connect_kdc)
+
+ def test_lockout_race_kdc_ntstatus(self):
+ self.do_lockout_race(partial(connect_kdc, expect_status=True))
+
+ def test_lockout_race_ntlm(self):
+ self.do_lockout_race(connect_ntlm)
+
+ def test_lockout_race_samr(self):
+ self.do_lockout_race(connect_samr)
+
+ def test_lockout_race_samr_aes(self):
+ self.do_lockout_race(connect_samr_aes)
+
+ def test_lockout_race_ldap_pw_change(self):
+ self.do_lockout_race(ldap_pwd_change)
+
+ def test_logon_without_transaction_ntlm(self):
+ self.do_logon_without_transaction(connect_ntlm)
+
+ # Tests to ensure that the connection functions work correctly in the happy
+ # path.
+
+ def test_logon_kdc(self):
+ self.do_logon(partial(connect_kdc, expect_error=False))
+
+ def test_logon_ntlm(self):
+ self.do_logon(connect_ntlm)
+
+ def test_logon_samr(self):
+ self.do_logon(connect_samr)
+
+ def test_logon_samr_aes(self):
+ self.do_logon(connect_samr_aes)
+
+ def test_logon_ldap_pw_change(self):
+ self.do_logon(ldap_pwd_change)
+
+ # Test that connection without a correct password works.
+ def do_logon(self, connect_fn):
+ # Create the user account for testing.
+ user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+ use_cache=False)
+ user_dn = user_creds.get_dn()
+
+ admin_creds = self.get_admin_creds()
+ lp = self.get_lp()
+
+ # Get a connection to our local SamDB.
+ samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+ credentials=admin_creds)
+ self.assertLocalSamDB(samdb)
+
+ password = user_creds.get_password()
+
+ # Prepare to connect to the server with a valid password.
+ our_pipe, their_pipe = Pipe(duplex=True)
+
+ # Inform the test function that it may proceed.
+ our_pipe.send_bytes(b'0')
+
+ result = connect_fn(pipe=their_pipe,
+ url=f'ldap://{samdb.host_dns_name()}',
+ hostname=samdb.host_dns_name(),
+ username=user_creds.get_username(),
+ password=password,
+ domain=user_creds.get_domain(),
+ realm=user_creds.get_realm(),
+ workstation=user_creds.get_workstation(),
+ dn=str(user_dn))
+
+ # The connection should succeed.
+ self.assertEqual(result, ConnectionResult.SUCCESS)
+
+ # Lock out the account while holding a transaction lock, then release the
+ # lock. A logon attempt already in progress should reread the account
+ # details and recognise the account is locked out. The account can
+ # additionally be renamed within the transaction to ensure that, by using
+ # the GUID, rereading the account's details still succeeds.
+ def do_lockout_transaction(self, connect_fn,
+ rename=False,
+ correct_pw=True):
+ # Create the user account for testing.
+ user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+ use_cache=False)
+ user_dn = user_creds.get_dn()
+
+ admin_creds = self.get_admin_creds()
+ lp = self.get_lp()
+
+ # Get a connection to our local SamDB.
+ samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+ credentials=admin_creds)
+ self.assertLocalSamDB(samdb)
+
+ password = user_creds.get_password()
+ if not correct_pw:
+ password = password[:-1]
+
+ # Prepare to connect to the server.
+ with futures.ProcessPoolExecutor(max_workers=1) as executor:
+ our_pipe, their_pipe = Pipe(duplex=True)
+ connect_future = executor.submit(
+ connect_fn,
+ pipe=their_pipe,
+ url=f'ldap://{samdb.host_dns_name()}',
+ hostname=samdb.host_dns_name(),
+ username=user_creds.get_username(),
+ password=password,
+ domain=user_creds.get_domain(),
+ realm=user_creds.get_realm(),
+ workstation=user_creds.get_workstation(),
+ dn=str(user_dn))
+
+ # Wait until the test process indicates it's ready.
+ self.wait_for_ready(our_pipe, connect_future)
+
+ # Take out a transaction.
+ samdb.transaction_start()
+ try:
+ # Lock out the account. We must do it using an actual password
+ # check like so, rather than directly with a database
+ # modification, so that the account is also added to the
+ # auxiliary bad password database.
+
+ old_utf16pw = '"Secret007"'.encode('utf-16le') # invalid pwd
+ new_utf16pw = '"Secret008"'.encode('utf-16le')
+
+ msg = ldb.Message(user_dn)
+ msg['0'] = ldb.MessageElement(old_utf16pw,
+ ldb.FLAG_MOD_DELETE,
+ 'unicodePwd')
+ msg['1'] = ldb.MessageElement(new_utf16pw,
+ ldb.FLAG_MOD_ADD,
+ 'unicodePwd')
+
+ for i in range(self.lockout_threshold):
+ try:
+ samdb.modify(msg)
+ except ldb.LdbError as err:
+ num, estr = err.args
+
+ # We get an error, but the bad password count should
+ # still be updated.
+ self.assertEqual(num, ldb.ERR_OPERATIONS_ERROR)
+ self.assertEqual('Failed to obtain remote address for '
+ 'the LDAP client while changing the '
+ 'password',
+ estr)
+ else:
+ self.fail('pwd change should have failed')
+
+ # Ensure the account is locked out.
+
+ res = samdb.search(
+ user_dn, scope=ldb.SCOPE_BASE,
+ attrs=['msDS-User-Account-Control-Computed'])
+ self.assertEqual(1, len(res))
+
+ uac = int(res[0].get('msDS-User-Account-Control-Computed',
+ idx=0))
+ self.assertTrue(uac & dsdb.UF_LOCKOUT)
+
+ # Now the bad password database has been updated, inform the
+ # test process that it may proceed.
+ our_pipe.send_bytes(b'0')
+
+ # Wait one second to ensure the test process hits the
+ # transaction lock.
+ time.sleep(1)
+
+ if rename:
+ # While we're at it, rename the account to ensure that is
+ # also safe if a race occurs.
+ msg = ldb.Message(user_dn)
+ new_username = self.get_new_username()
+ msg['sAMAccountName'] = ldb.MessageElement(
+ new_username,
+ ldb.FLAG_MOD_REPLACE,
+ 'sAMAccountName')
+ samdb.modify(msg)
+
+ except Exception:
+ samdb.transaction_cancel()
+ raise
+
+ # Commit the local transaction.
+ samdb.transaction_commit()
+
+ result = connect_future.result(timeout=5)
+ self.assertEqual(result, ConnectionResult.LOCKED_OUT)
+
+ # Update the bad password count while holding a transaction lock, then
+ # release the lock. A logon attempt already in progress should reread the
+ # account details and ensure the bad password count is atomically
+ # updated. The account can additionally be renamed within the transaction
+ # to ensure that, by using the GUID, rereading the account's details still
+ # succeeds.
+ def do_bad_pwd_count_transaction(self, connect_fn, rename=False):
+ # Create the user account for testing.
+ user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+ use_cache=False)
+ user_dn = user_creds.get_dn()
+
+ admin_creds = self.get_admin_creds()
+ lp = self.get_lp()
+
+ # Get a connection to our local SamDB.
+ samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+ credentials=admin_creds)
+ self.assertLocalSamDB(samdb)
+
+ # Prepare to connect to the server with an invalid password.
+ with futures.ProcessPoolExecutor(max_workers=1) as executor:
+ our_pipe, their_pipe = Pipe(duplex=True)
+ connect_future = executor.submit(
+ connect_fn,
+ pipe=their_pipe,
+ url=f'ldap://{samdb.host_dns_name()}',
+ hostname=samdb.host_dns_name(),
+ username=user_creds.get_username(),
+ password=user_creds.get_password()[:-1], # invalid password
+ domain=user_creds.get_domain(),
+ realm=user_creds.get_realm(),
+ workstation=user_creds.get_workstation(),
+ dn=str(user_dn))
+
+ # Wait until the test process indicates it's ready.
+ self.wait_for_ready(our_pipe, connect_future)
+
+ # Take out a transaction.
+ samdb.transaction_start()
+ try:
+ # Inform the test process that it may proceed.
+ our_pipe.send_bytes(b'0')
+
+ # Wait one second to ensure the test process hits the
+ # transaction lock.
+ time.sleep(1)
+
+ # Set badPwdCount to 1.
+ msg = ldb.Message(user_dn)
+ now = int(time.time())
+ bad_pwd_time = unix2nttime(now)
+ msg['badPwdCount'] = ldb.MessageElement(
+ '1',
+ ldb.FLAG_MOD_REPLACE,
+ 'badPwdCount')
+ msg['badPasswordTime'] = ldb.MessageElement(
+ str(bad_pwd_time),
+ ldb.FLAG_MOD_REPLACE,
+ 'badPasswordTime')
+ if rename:
+ # While we're at it, rename the account to ensure that is
+ # also safe if a race occurs.
+ new_username = self.get_new_username()
+ msg['sAMAccountName'] = ldb.MessageElement(
+ new_username,
+ ldb.FLAG_MOD_REPLACE,
+ 'sAMAccountName')
+ samdb.modify(msg)
+
+ # Ensure the account is not yet locked out.
+
+ res = samdb.search(
+ user_dn, scope=ldb.SCOPE_BASE,
+ attrs=['msDS-User-Account-Control-Computed'])
+ self.assertEqual(1, len(res))
+
+ uac = int(res[0].get('msDS-User-Account-Control-Computed',
+ idx=0))
+ self.assertFalse(uac & dsdb.UF_LOCKOUT)
+ except Exception:
+ samdb.transaction_cancel()
+ raise
+
+ # Commit the local transaction.
+ samdb.transaction_commit()
+
+ result = connect_future.result(timeout=5)
+ self.assertEqual(result, ConnectionResult.WRONG_PASSWORD, result)
+
+ # Check that badPwdCount has now increased to 2.
+
+ res = samdb.search(user_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['badPwdCount'])
+ self.assertEqual(1, len(res))
+
+ bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
+ self.assertEqual(2, bad_pwd_count)
+
+ # Attempt to log in to the account with an incorrect password, using
+ # lockoutThreshold+1 simultaneous attempts. We should get three 'wrong
+ # password' errors and one 'locked out' error, showing that the bad
+ # password count is checked and incremented atomically.
+ def do_lockout_race(self, connect_fn):
+ # Create the user account for testing.
+ user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+ use_cache=False)
+ user_dn = user_creds.get_dn()
+
+ admin_creds = self.get_admin_creds()
+ lp = self.get_lp()
+
+ # Get a connection to our local SamDB.
+ samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+ credentials=admin_creds)
+ self.assertLocalSamDB(samdb)
+
+ # Prepare to connect to the server with an invalid password, using four
+ # simultaneous requests. Only three of those attempts should get
+ # through before the account is locked out.
+ num_attempts = self.lockout_threshold + 1
+ with futures.ProcessPoolExecutor(max_workers=num_attempts) as executor:
+ connect_futures = []
+ our_pipes = []
+ for i in range(num_attempts):
+ our_pipe, their_pipe = Pipe(duplex=True)
+ our_pipes.append(our_pipe)
+
+ connect_future = executor.submit(
+ connect_fn,
+ pipe=their_pipe,
+ url=f'ldap://{samdb.host_dns_name()}',
+ hostname=samdb.host_dns_name(),
+ username=user_creds.get_username(),
+ password=user_creds.get_password()[:-1], # invalid pw
+ domain=user_creds.get_domain(),
+ realm=user_creds.get_realm(),
+ workstation=user_creds.get_workstation(),
+ dn=str(user_dn))
+ connect_futures.append(connect_future)
+
+ # Wait until the test process indicates it's ready.
+ self.wait_for_ready(our_pipe, connect_future)
+
+ # Take out a transaction.
+ samdb.transaction_start()
+ try:
+ # Inform the test processes that they may proceed.
+ for our_pipe in our_pipes:
+ our_pipe.send_bytes(b'0')
+
+ # Wait one second to ensure the test processes hit the
+ # transaction lock.
+ time.sleep(1)
+ except Exception:
+ samdb.transaction_cancel()
+ raise
+
+ # Commit the local transaction.
+ samdb.transaction_commit()
+
+ lockouts = 0
+ wrong_passwords = 0
+ for i, connect_future in enumerate(connect_futures):
+ result = connect_future.result(timeout=5)
+ if result == ConnectionResult.LOCKED_OUT:
+ lockouts += 1
+ elif result == ConnectionResult.WRONG_PASSWORD:
+ wrong_passwords += 1
+ else:
+ self.fail(f'process {i} gave an unexpected result '
+ f'{result}')
+
+ self.assertEqual(wrong_passwords, self.lockout_threshold)
+ self.assertEqual(lockouts, num_attempts - self.lockout_threshold)
+
+ # Ensure the account is now locked out.
+
+ res = samdb.search(
+ user_dn, scope=ldb.SCOPE_BASE,
+ attrs=['badPwdCount',
+ 'msDS-User-Account-Control-Computed'])
+ self.assertEqual(1, len(res))
+
+ bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
+ self.assertEqual(self.lockout_threshold, bad_pwd_count)
+
+ uac = int(res[0].get('msDS-User-Account-Control-Computed',
+ idx=0))
+ self.assertTrue(uac & dsdb.UF_LOCKOUT)
+
+ # Test that logon is possible even while we locally hold a transaction
+ # lock. This test only works with NTLM authentication; Kerberos
+ # authentication must take out a transaction to update the logonCount
+ # attribute, and LDAP and SAMR password changes both take out a transaction
+ # to effect the password change. NTLM is the only logon method that does
+ # not require a transaction, and can thus be performed while we're holding
+ # the lock.
+ def do_logon_without_transaction(self, connect_fn):
+ # Create the user account for testing.
+ user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+ use_cache=False)
+ user_dn = user_creds.get_dn()
+
+ admin_creds = self.get_admin_creds()
+ lp = self.get_lp()
+
+ # Get a connection to our local SamDB.
+ samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+ credentials=admin_creds)
+ self.assertLocalSamDB(samdb)
+
+ password = user_creds.get_password()
+
+ # Prepare to connect to the server with a valid password.
+ with futures.ProcessPoolExecutor(max_workers=1) as executor:
+ our_pipe, their_pipe = Pipe(duplex=True)
+ connect_future = executor.submit(
+ connect_fn,
+ pipe=their_pipe,
+ url=f'ldap://{samdb.host_dns_name()}',
+ hostname=samdb.host_dns_name(),
+ username=user_creds.get_username(),
+ password=password,
+ domain=user_creds.get_domain(),
+ realm=user_creds.get_realm(),
+ workstation=user_creds.get_workstation(),
+ dn=str(user_dn))
+
+ # Wait until the test process indicates it's ready.
+ self.wait_for_ready(our_pipe, connect_future)
+
+ # Take out a transaction.
+ samdb.transaction_start()
+ try:
+ # Inform the test process that it may proceed.
+ our_pipe.send_bytes(b'0')
+
+ # The connection should succeed, despite our holding a
+ # transaction.
+ result = connect_future.result(timeout=5)
+ self.assertEqual(result, ConnectionResult.SUCCESS)
+ except Exception:
+ samdb.transaction_cancel()
+ raise
+
+ # Commit the local transaction.
+ samdb.transaction_commit()
+
+
+if __name__ == '__main__':
+ global_asn1_print = False
+ global_hexdump = False
+ import unittest
+ unittest.main()