summaryrefslogtreecommitdiffstats
path: root/source4/dsdb/tests/python/rodc_rwdc.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/dsdb/tests/python/rodc_rwdc.py')
-rw-r--r--source4/dsdb/tests/python/rodc_rwdc.py1324
1 files changed, 1324 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/rodc_rwdc.py b/source4/dsdb/tests/python/rodc_rwdc.py
new file mode 100644
index 0000000..b2e8c73
--- /dev/null
+++ b/source4/dsdb/tests/python/rodc_rwdc.py
@@ -0,0 +1,1324 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+"""Test communication of credentials etc, between an RODC and a RWDC.
+
+How does it work when the password is changed on the RWDC?
+"""
+
+import optparse
+import sys
+import base64
+import uuid
+import subprocess
+import itertools
+import time
+
+sys.path.insert(0, "bin/python")
+import ldb
+
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+import samba.getopt as options
+
+from samba.auth import system_session
+from samba.samdb import SamDB
+from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
+from samba import gensec, dsdb
+from ldb import LdbError, ERR_INVALID_CREDENTIALS
+from samba.dcerpc import security, samr
+import os
+
+import password_lockout_base
+
+def adjust_cmd_for_py_version(parts):
+ if os.getenv("PYTHON", None):
+ parts.insert(0, os.environ["PYTHON"])
+ return parts
+
+def passwd_encode(pw):
+ return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
+
+
+class RodcRwdcTestException(Exception):
+ pass
+
+
+def make_creds(username, password, kerberos_state=None, simple_dn=None):
+ # use the global CREDS as a template
+ c = Credentials()
+ c.set_username(username)
+ c.set_password(password)
+ c.set_domain(CREDS.get_domain())
+ c.set_realm(CREDS.get_realm())
+ c.set_workstation(CREDS.get_workstation())
+
+ if simple_dn is not None:
+ c.set_bind_dn(simple_dn)
+
+ if kerberos_state is None:
+ kerberos_state = CREDS.get_kerberos_state()
+ c.set_kerberos_state(kerberos_state)
+
+ print('-' * 73)
+ if kerberos_state == MUST_USE_KERBEROS:
+ print("we seem to be using kerberos for %s %s" % (username, password))
+ elif kerberos_state == DONT_USE_KERBEROS:
+ print("NOT using kerberos for %s %s" % (username, password))
+ else:
+ print("kerberos state is %s" % kerberos_state)
+
+ c.set_gensec_features(c.get_gensec_features() |
+ gensec.FEATURE_SEAL)
+ return c
+
+
+def set_auto_replication(dc, allow):
+ credstring = '-U%s%%%s' % (CREDS.get_username(),
+ CREDS.get_password())
+
+ on_or_off = '-' if allow else '+'
+
+ for opt in ['DISABLE_INBOUND_REPL',
+ 'DISABLE_OUTBOUND_REPL']:
+ cmd = adjust_cmd_for_py_version(['bin/samba-tool',
+ 'drs', 'options',
+ credstring, dc,
+ "--dsa-option=%s%s" % (on_or_off, opt)])
+
+ p = subprocess.Popen(cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode:
+ if b'LDAP_REFERRAL' not in stderr:
+ raise RodcRwdcTestException()
+ print("ignoring +%s REFERRAL error; assuming %s is RODC" %
+ (opt, dc))
+
+
+def preload_rodc_user(user_dn):
+ credstring = '-U%s%%%s' % (CREDS.get_username(),
+ CREDS.get_password())
+
+ set_auto_replication(RWDC, True)
+ cmd = adjust_cmd_for_py_version(['bin/samba-tool',
+ 'rodc', 'preload',
+ user_dn,
+ credstring,
+ '--server', RWDC, ])
+
+ print(' '.join(cmd))
+ subprocess.check_call(cmd)
+ set_auto_replication(RWDC, False)
+
+
+def get_server_ref_from_samdb(samdb):
+ server_name = samdb.get_serverName()
+ res = samdb.search(server_name,
+ scope=ldb.SCOPE_BASE,
+ attrs=['serverReference'])
+
+ return res[0]['serverReference'][0]
+
+
+class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
+
+ def _check_account_initial(self, dn):
+ self.force_replication()
+ return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
+
+ def _check_account(self, dn,
+ badPwdCount=None,
+ badPasswordTime=None,
+ logonCount=None,
+ lastLogon=None,
+ lastLogonTimestamp=None,
+ lockoutTime=None,
+ userAccountControl=None,
+ msDSUserAccountControlComputed=None,
+ effective_bad_password_count=None,
+ msg=None,
+ badPwdCountOnly=False):
+ # Wait for the RWDC to get any delayed messages
+ # e.g. SendToSam or KRB5 bad passwords via winbindd
+ if (self.kerberos and isinstance(badPasswordTime, tuple) or
+ badPwdCount == 0):
+ time.sleep(5)
+
+ return super(RodcRwdcCachedTests,
+ self)._check_account(dn, badPwdCount, badPasswordTime,
+ logonCount, lastLogon,
+ lastLogonTimestamp, lockoutTime,
+ userAccountControl,
+ msDSUserAccountControlComputed,
+ effective_bad_password_count, msg,
+ True)
+
+ def force_replication(self, base=None):
+ if base is None:
+ base = self.base_dn
+
+ # XXX feels like a horrendous way to do it.
+ credstring = '-U%s%%%s' % (CREDS.get_username(),
+ CREDS.get_password())
+ cmd = adjust_cmd_for_py_version(['bin/samba-tool',
+ 'drs', 'replicate',
+ RODC, RWDC, base,
+ credstring,
+ '--sync-forced'])
+
+ p = subprocess.Popen(cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode:
+ print("failed with code %s" % p.returncode)
+ print(' '.join(cmd))
+ print("stdout")
+ print(stdout)
+ print("stderr")
+ print(stderr)
+ raise RodcRwdcTestException()
+
+ def _change_password(self, user_dn, old_password, new_password):
+ self.rwdc_db.modify_ldif(
+ "dn: %s\n"
+ "changetype: modify\n"
+ "delete: userPassword\n"
+ "userPassword: %s\n"
+ "add: userPassword\n"
+ "userPassword: %s\n" % (user_dn, old_password, new_password))
+
+ def tearDown(self):
+ super(RodcRwdcCachedTests, self).tearDown()
+ set_auto_replication(RWDC, True)
+
+ def setUp(self):
+ self.kerberos = False # To be set later
+
+ self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ # Define variables for BasePasswordTestCase
+ self.lp = LP
+ self.global_creds = CREDS
+ self.host = RWDC
+ self.host_url = 'ldap://%s' % RWDC
+ self.host_url_ldaps = 'ldaps://%s' % RWDC
+ self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
+ credentials=self.global_creds, lp=self.lp)
+
+ super(RodcRwdcCachedTests, self).setUp()
+ self.host_url = 'ldap://%s' % RODC
+ self.host_url_ldaps = 'ldaps://%s' % RODC
+
+ self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
+ self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
+ self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
+
+ self.base_dn = self.rwdc_db.domain_dn()
+
+ root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
+ attrs=['dsServiceName'])
+ self.service = root[0]['dsServiceName'][0]
+ self.tag = uuid.uuid4().hex
+
+ self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
+ self.rwdc_db.set_dsheuristics("000000001")
+
+ set_auto_replication(RWDC, False)
+
+ # make sure DCs are synchronized before the test
+ self.force_replication()
+
+ def delete_ldb_connections(self):
+ super(RodcRwdcCachedTests, self).delete_ldb_connections()
+ del self.rwdc_db
+ del self.rodc_db
+
+ def test_cache_and_flush_password(self):
+ username = self.lockout1krb5_creds.get_username()
+ userpass = self.lockout1krb5_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ ldb_system = SamDB(session_info=system_session(self.lp),
+ credentials=self.global_creds, lp=self.lp)
+
+ res = ldb_system.search(userdn, attrs=['unicodePwd'])
+ self.assertFalse('unicodePwd' in res[0])
+
+ preload_rodc_user(userdn)
+
+ res = ldb_system.search(userdn, attrs=['unicodePwd'])
+ self.assertTrue('unicodePwd' in res[0])
+
+ # force replication here to flush any pending preloads (this
+ # was a racy test).
+ self.force_replication()
+
+ newpass = userpass + '!'
+
+ # Forcing replication should blank out password (when changed)
+ self._change_password(userdn, userpass, newpass)
+ self.force_replication()
+
+ res = ldb_system.search(userdn, attrs=['unicodePwd'])
+ self.assertFalse('unicodePwd' in res[0])
+
+ def test_login_lockout_krb5(self):
+ username = self.lockout1krb5_creds.get_username()
+ userpass = self.lockout1krb5_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ self.kerberos = True
+
+ self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
+
+ res = self.rodc_db.search(self.rodc_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-RevealOnDemandGroup'])
+
+ group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, group)
+ m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
+ self.rwdc_db.modify(m)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb, self.base_dn)
+
+ self.account_lockout_duration = 15
+ account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
+
+ m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
+ ldb.FLAG_MOD_REPLACE,
+ "lockoutDuration")
+
+ self.lockout_observation_window = 15
+ lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
+
+ m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
+ ldb.FLAG_MOD_REPLACE,
+ "lockOutObservationWindow")
+
+ self.rwdc_db.modify(m)
+ self.force_replication()
+
+ self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
+
+ def test_login_lockout_ntlm(self):
+ username = self.lockout1ntlm_creds.get_username()
+ userpass = self.lockout1ntlm_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ self.kerberos = False
+
+ self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
+
+ res = self.rodc_db.search(self.rodc_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-RevealOnDemandGroup'])
+
+ group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, group)
+ m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
+ self.rwdc_db.modify(m)
+
+ self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
+
+ def test_login_lockout_not_revealed(self):
+ '''Test that SendToSam is restricted by preloaded users/groups'''
+
+ username = self.lockout1ntlm_creds.get_username()
+ userpass = self.lockout1ntlm_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ # Preload but do not add to revealed group
+ preload_rodc_user(userdn)
+
+ self.kerberos = False
+
+ creds = self.lockout1ntlm_creds
+
+ # Open a second LDB connection with the user credentials. Use the
+ # command line credentials for information like the domain, the realm
+ # and the workstation.
+ creds_lockout = self.insta_creds(creds)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
+
+ badPasswordTime = 0
+ logonCount = 0
+ lastLogon = 0
+ lastLogonTimestamp = 0
+ logoncount_relation = ''
+ lastlogon_relation = ''
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg='lastlogontimestamp with wrong password')
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # BadPwdCount on RODC increases alongside RWDC
+ res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
+ self.assertTrue('badPwdCount' in res[0])
+ self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
+
+ # Correct old password
+ creds_lockout.set_password(userpass)
+
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+
+ # Wait for potential SendToSam...
+ time.sleep(5)
+
+ # BadPwdCount on RODC decreases, but not the RWDC
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lastLogon=('greater', lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg='badPwdCount not reset on RWDC')
+
+ res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
+ self.assertTrue('badPwdCount' in res[0])
+ self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
+
+ def _test_login_lockout_rodc_rwdc(self, creds, userdn):
+ username = creds.get_username()
+ userpass = creds.get_password()
+
+ # Open a second LDB connection with the user credentials. Use the
+ # command line credentials for information like the domain, the realm
+ # and the workstation.
+ creds_lockout = self.insta_creds(creds)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
+
+ badPasswordTime = 0
+ logonCount = 0
+ lastLogon = 0
+ lastLogonTimestamp = 0
+ logoncount_relation = ''
+ lastlogon_relation = ''
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg='lastlogontimestamp with wrong password')
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # Correct old password
+ creds_lockout.set_password(userpass)
+
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+
+ # lastLogonTimestamp should not change
+ # lastLogon increases if badPwdCount is non-zero (!)
+ res = self._check_account(userdn,
+ badPwdCount=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lastLogon=('greater', lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg='LLTimestamp is updated to lastlogon')
+
+ logonCount = int(res[0]["logonCount"][0])
+ lastLogon = int(res[0]["lastLogon"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+
+ except LdbError as e1:
+ (num, msg) = e1.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=2,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ print("two failed password change")
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+
+ except LdbError as e2:
+ (num, msg) = e2.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=("greater", badPasswordTime),
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+ lockoutTime = int(res[0]["lockoutTime"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError as e3:
+ (num, msg) = e3.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError as e4:
+ (num, msg) = e4.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+
+ # The correct password, but we are locked out
+ creds_lockout.set_password(userpass)
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError as e5:
+ (num, msg) = e5.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+
+ # wait for the lockout to end
+ time.sleep(self.account_lockout_duration + 1)
+ print(self.account_lockout_duration + 1)
+
+ res = self._check_account(userdn,
+ badPwdCount=3, effective_bad_password_count=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+
+ # The correct password after letting the timeout expire
+
+ creds_lockout.set_password(userpass)
+
+ creds_lockout2 = self.insta_creds(creds_lockout)
+
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
+ time.sleep(3)
+
+ res = self._check_account(userdn,
+ badPwdCount=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lastLogon=(lastlogon_relation, lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg="lastLogon is way off")
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError as e6:
+ (num, msg) = e6.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError as e7:
+ (num, msg) = e7.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=2,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ time.sleep(self.lockout_observation_window + 1)
+
+ res = self._check_account(userdn,
+ badPwdCount=2, effective_bad_password_count=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError as e8:
+ (num, msg) = e8.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # The correct password without letting the timeout expire
+ creds_lockout.set_password(userpass)
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+
+ res = self._check_account(userdn,
+ badPwdCount=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lockoutTime=lockoutTime,
+ lastLogon=("greater", lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+
+
+class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
+ counter = itertools.count(1, 1)
+
+ def force_replication(self, base=None):
+ if base is None:
+ base = self.base_dn
+
+ # XXX feels like a horrendous way to do it.
+ credstring = '-U%s%%%s' % (CREDS.get_username(),
+ CREDS.get_password())
+ cmd = adjust_cmd_for_py_version(['bin/samba-tool',
+ 'drs', 'replicate',
+ RODC, RWDC, base,
+ credstring,
+ '--sync-forced'])
+
+ p = subprocess.Popen(cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode:
+ print("failed with code %s" % p.returncode)
+ print(' '.join(cmd))
+ print("stdout")
+ print(stdout)
+ print("stderr")
+ print(stderr)
+ raise RodcRwdcTestException()
+
+ def _check_account_initial(self, dn):
+ self.force_replication()
+ return super(RodcRwdcTests, self)._check_account_initial(dn)
+
+ def tearDown(self):
+ super(RodcRwdcTests, self).tearDown()
+ self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ set_auto_replication(RWDC, True)
+
+ def setUp(self):
+ self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ # Define variables for BasePasswordTestCase
+ self.lp = LP
+ self.global_creds = CREDS
+ self.host = RWDC
+ self.host_url = 'ldap://%s' % RWDC
+ self.host_url_ldaps = 'ldaps://%s' % RWDC
+ self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
+ credentials=self.global_creds, lp=self.lp)
+
+ super(RodcRwdcTests, self).setUp()
+ self.host = RODC
+ self.host_url = 'ldap://%s' % RODC
+ self.host_url_ldaps = 'ldaps://%s' % RODC
+ self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
+ credentials=self.global_creds, lp=self.lp)
+
+ self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
+ self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
+ self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
+
+ self.base_dn = self.rwdc_db.domain_dn()
+
+ root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
+ attrs=['dsServiceName'])
+ self.service = root[0]['dsServiceName'][0]
+ self.tag = uuid.uuid4().hex
+
+ self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
+ self.rwdc_db.set_dsheuristics("000000001")
+
+ set_auto_replication(RWDC, False)
+
+ # make sure DCs are synchronized before the test
+ self.force_replication()
+ self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
+ self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
+
+ def delete_ldb_connections(self):
+ super(RodcRwdcTests, self).delete_ldb_connections()
+ del self.rwdc_db
+ del self.rodc_db
+
+ def assertReferral(self, fn, *args, **kwargs):
+ try:
+ fn(*args, **kwargs)
+ self.fail("failed to raise ldap referral")
+ except ldb.LdbError as e9:
+ (code, msg) = e9.args
+ self.assertEqual(code, ldb.ERR_REFERRAL,
+ "expected referral, got %s %s" % (code, msg))
+
+ def _test_rodc_dsheuristics(self):
+ d = self.rodc_db.get_dsheuristics()
+ self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
+ self.assertReferral(self.rodc_db.set_dsheuristics, d)
+
+ def TEST_rodc_heuristics_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_rodc_dsheuristics()
+
+ def TEST_rodc_heuristics_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_rodc_dsheuristics()
+
+ def _test_add(self, objects, cross_ncs=False):
+ for o in objects:
+ dn = o['dn']
+ if cross_ncs:
+ base = str(self.rwdc_db.get_config_basedn())
+ controls = ["search_options:1:2"]
+ cn = dn.split(',', 1)[0]
+ expression = '(%s)' % cn
+ else:
+ base = dn
+ controls = []
+ expression = None
+
+ try:
+ res = self.rodc_db.search(base,
+ expression=expression,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=['dn'],
+ controls=controls)
+ self.assertEqual(len(res), 0)
+ except ldb.LdbError as e:
+ if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
+ raise
+
+ try:
+ self.rwdc_db.add(o)
+ except ldb.LdbError as e:
+ (ecode, emsg) = e.args
+ self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
+ (o, ecode, emsg))
+
+ if cross_ncs:
+ self.force_replication(base=base)
+ else:
+ self.force_replication()
+
+ try:
+ res = self.rodc_db.search(base,
+ expression=expression,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=['dn'],
+ controls=controls)
+ self.assertEqual(len(res), 1)
+ except ldb.LdbError as e:
+ self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
+ "replication seems to have failed")
+
+ def _test_add_replicated_objects(self, mode):
+ tag = "%s%s" % (self.tag, mode)
+ self._test_add([
+ {
+ 'dn': "ou=%s1,%s" % (tag, self.base_dn),
+ "objectclass": "organizationalUnit"
+ },
+ {
+ 'dn': "cn=%s2,%s" % (tag, self.base_dn),
+ "objectclass": "user"
+ },
+ {
+ 'dn': "cn=%s3,%s" % (tag, self.base_dn),
+ "objectclass": "group"
+ },
+ ])
+ self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
+ self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
+ self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
+
+ def test_add_replicated_objects_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_add_replicated_objects('kerberos')
+
+ def test_add_replicated_objects_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_add_replicated_objects('ntlm')
+
+ def _test_add_replicated_connections(self, mode):
+ tag = "%s%s" % (self.tag, mode)
+ self._test_add([
+ {
+ 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
+ "objectclass": "NTDSConnection",
+ 'enabledConnection': 'TRUE',
+ 'fromServer': self.base_dn,
+ 'options': '0'
+ },
+ ], cross_ncs=True)
+ self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
+
+ def test_add_replicated_connections_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_add_replicated_connections('kerberos')
+
+ def test_add_replicated_connections_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_add_replicated_connections('ntlm')
+
+ def _test_modify_replicated_attributes(self):
+ dn = 'CN=Guest,CN=Users,' + self.base_dn
+ value = self.tag
+ for attr in ['carLicense', 'middleName']:
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, dn)
+ m[attr] = ldb.MessageElement(value,
+ ldb.FLAG_MOD_REPLACE,
+ attr)
+ try:
+ self.rwdc_db.modify(m)
+ except ldb.LdbError as e:
+ self.fail("Failed to modify %s %s on RWDC %s with %s" %
+ (dn, attr, RWDC, e))
+
+ self.force_replication()
+
+ try:
+ res = self.rodc_db.search(dn,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=[attr])
+ results = [str(x[attr][0]) for x in res]
+ self.assertEqual(results, [value])
+ except ldb.LdbError as e:
+ self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
+ "replication seems to have failed")
+
+ def test_modify_replicated_attributes_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_modify_replicated_attributes()
+
+ def test_modify_replicated_attributes_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_modify_replicated_attributes()
+
+ def _test_add_modify_delete(self):
+ dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
+ values = ["%s%s" % (i, self.tag) for i in range(3)]
+ attr = "carLicense"
+ self._test_add([
+ {
+ 'dn': dn,
+ "objectclass": "user",
+ attr: values[0]
+ },
+ ])
+ self.force_replication()
+ for value in values[1:]:
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, dn)
+ m[attr] = ldb.MessageElement(value,
+ ldb.FLAG_MOD_REPLACE,
+ attr)
+ try:
+ self.rwdc_db.modify(m)
+ except ldb.LdbError as e:
+ self.fail("Failed to modify %s %s on RWDC %s with %s" %
+ (dn, attr, RWDC, e))
+
+ self.force_replication()
+
+ try:
+ res = self.rodc_db.search(dn,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=[attr])
+ results = [str(x[attr][0]) for x in res]
+ self.assertEqual(results, [value])
+ except ldb.LdbError as e:
+ self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
+ "replication seems to have failed")
+
+ self.rwdc_db.delete(dn)
+ self.force_replication()
+ try:
+ res = self.rodc_db.search(dn,
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=[attr])
+ if len(res) > 0:
+ self.fail("Failed to delete %s" % (dn))
+ except ldb.LdbError as e:
+ self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
+ "Failed to delete %s" % (dn))
+
+ def test_add_modify_delete_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_add_modify_delete()
+
+ def test_add_modify_delete_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_add_modify_delete()
+
+ def _new_user(self):
+ username = "u%sX%s" % (self.tag[:12], next(self.counter))
+ password = 'password#1'
+ dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
+ o = {
+ 'dn': dn,
+ "objectclass": "user",
+ 'sAMAccountName': username,
+ }
+ try:
+ self.rwdc_db.add(o)
+ except ldb.LdbError as e:
+ self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
+
+ self.rwdc_db.modify_ldif("dn: %s\n"
+ "changetype: modify\n"
+ "delete: userPassword\n"
+ "add: userPassword\n"
+ "userPassword: %s\n" % (dn, password))
+ self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
+ return (dn, username, password)
+
+ def _change_password(self, user_dn, old_password, new_password):
+ self.rwdc_db.modify_ldif(
+ "dn: %s\n"
+ "changetype: modify\n"
+ "delete: userPassword\n"
+ "userPassword: %s\n"
+ "add: userPassword\n"
+ "userPassword: %s\n" % (user_dn, old_password, new_password))
+
+ def try_ldap_logon(self, server, creds, errno=None, simple=False):
+ try:
+ if simple:
+ tmpdb = SamDB('ldaps://%s' % server, credentials=creds,
+ session_info=system_session(LP), lp=LP)
+ else:
+ tmpdb = SamDB('ldap://%s' % server, credentials=creds,
+ session_info=system_session(LP), lp=LP)
+ if errno is not None:
+ self.fail("logon failed to fail with ldb error %s" % errno)
+ except ldb.LdbError as e10:
+ (code, msg) = e10.args
+ if code != errno:
+ if errno is None:
+ self.fail("logon incorrectly raised ldb error (code=%s)" %
+ code)
+ else:
+ self.fail("logon failed to raise correct ldb error"
+ "Expected: %s Got: %s" %
+ (errno, code))
+
+ def zero_min_password_age(self):
+ min_pwd_age = int(self.rwdc_db.get_minPwdAge())
+ if min_pwd_age != 0:
+ self.rwdc_db.set_minPwdAge('0')
+
+ def _test_ldap_change_password(self, errno=None, simple=False):
+ self.zero_min_password_age()
+
+ dn, username, password = self._new_user()
+
+ simple_dn = dn if simple else None
+
+ creds1 = make_creds(username, password, simple_dn=simple_dn)
+
+ # With NTLM, this should fail on RODC before replication,
+ # because the user isn't known.
+ self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS,
+ simple=simple)
+ self.force_replication()
+
+ # Now the user is replicated to RODC, so logon should work
+ self.try_ldap_logon(RODC, creds1, simple=simple)
+
+ passwords = ['password#%s' % i for i in range(1, 6)]
+ for prev, password in zip(passwords[:-1], passwords[1:]):
+ self._change_password(dn, prev, password)
+
+ # The password has changed enough times to make the old
+ # password invalid (though with kerberos that doesn't matter).
+ # For NTLM, the old creds should always fail
+ self.try_ldap_logon(RODC, creds1, errno, simple=simple)
+ self.try_ldap_logon(RWDC, creds1, errno, simple=simple)
+
+ creds2 = make_creds(username, password, simple_dn=simple_dn)
+
+ # new creds work straight away with NTLM, because although it
+ # doesn't have the password, it knows the user and forwards
+ # the query.
+ self.try_ldap_logon(RODC, creds2, simple=simple)
+ self.try_ldap_logon(RWDC, creds2, simple=simple)
+
+ self.force_replication()
+
+ # After another replication check RODC still works and fails,
+ # as appropriate to various creds
+ self.try_ldap_logon(RODC, creds2, simple=simple)
+ self.try_ldap_logon(RODC, creds1, errno, simple=simple)
+
+ prev = password
+ password = 'password#6'
+ self._change_password(dn, prev, password)
+ creds3 = make_creds(username, password, simple_dn=simple_dn)
+
+ # previous password should still work.
+ self.try_ldap_logon(RWDC, creds2, simple=simple)
+ self.try_ldap_logon(RODC, creds2, simple=simple)
+
+ # new password should still work.
+ self.try_ldap_logon(RWDC, creds3, simple=simple)
+ self.try_ldap_logon(RODC, creds3, simple=simple)
+
+ # old password should still fail (but not on kerberos).
+ self.try_ldap_logon(RWDC, creds1, errno, simple=simple)
+ self.try_ldap_logon(RODC, creds1, errno, simple=simple)
+
+ def test_ldap_change_password_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_ldap_change_password()
+
+ def test_ldap_change_password_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
+
+ def test_ldap_change_password_simple_bind(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS, simple=True)
+
+ def _test_ldap_change_password_reveal_on_demand(self, errno=None):
+ self.zero_min_password_age()
+
+ res = self.rodc_db.search(self.rodc_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-RevealOnDemandGroup'])
+
+ group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
+
+ user_dn, username, password = self._new_user()
+ creds1 = make_creds(username, password)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, group)
+ m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
+ self.rwdc_db.modify(m)
+
+ # Against Windows, this will just forward if no account exists on the KDC
+ # Therefore, this does not error on Windows.
+ self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
+
+ self.force_replication()
+
+ # The proxy case
+ self.try_ldap_logon(RODC, creds1)
+ preload_rodc_user(user_dn)
+
+ # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
+ self.try_ldap_logon(RODC, creds1)
+
+ passwords = ['password#%s' % i for i in range(1, 6)]
+ for prev, password in zip(passwords[:-1], passwords[1:]):
+ self._change_password(user_dn, prev, password)
+
+ # The password has changed enough times to make the old
+ # password invalid, but the RODC shouldn't know that.
+ self.try_ldap_logon(RODC, creds1)
+ self.try_ldap_logon(RWDC, creds1, errno)
+
+ creds2 = make_creds(username, password)
+ self.try_ldap_logon(RWDC, creds2)
+ # The RODC forward WRONG_PASSWORD to the RWDC
+ self.try_ldap_logon(RODC, creds2)
+
+ def test_change_password_reveal_on_demand_ntlm(self):
+ CREDS.set_kerberos_state(DONT_USE_KERBEROS)
+ self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
+
+ def test_change_password_reveal_on_demand_kerberos(self):
+ CREDS.set_kerberos_state(MUST_USE_KERBEROS)
+ self._test_ldap_change_password_reveal_on_demand()
+
+ def test_login_lockout_krb5(self):
+ username = self.lockout1krb5_creds.get_username()
+ userpass = self.lockout1krb5_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
+ fail_creds = self.insta_creds(self.template_creds,
+ username=username,
+ userpass=userpass + "X",
+ kerberos_state=use_kerberos)
+
+ try:
+ ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
+ self.fail()
+ except LdbError as e11:
+ (num, msg) = e11.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ # Succeed to reset everything to 0
+ success_creds = self.insta_creds(self.template_creds,
+ username=username,
+ userpass=userpass,
+ kerberos_state=use_kerberos)
+
+ ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
+
+ self._test_login_lockout(self.lockout1krb5_creds)
+
+ def test_login_lockout_ntlm(self):
+ username = self.lockout1ntlm_creds.get_username()
+ userpass = self.lockout1ntlm_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
+ fail_creds = self.insta_creds(self.template_creds,
+ username=username,
+ userpass=userpass + "X",
+ kerberos_state=use_kerberos)
+
+ try:
+ ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
+ self.fail()
+ except LdbError as e12:
+ (num, msg) = e12.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ # Succeed to reset everything to 0
+ ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
+
+ self._test_login_lockout(self.lockout1ntlm_creds)
+
+ def test_multiple_logon_krb5(self):
+ username = self.lockout1krb5_creds.get_username()
+ userpass = self.lockout1krb5_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
+ fail_creds = self.insta_creds(self.template_creds,
+ username=username,
+ userpass=userpass + "X",
+ kerberos_state=use_kerberos)
+
+ try:
+ ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
+ self.fail()
+ except LdbError as e13:
+ (num, msg) = e13.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ # Succeed to reset everything to 0
+ success_creds = self.insta_creds(self.template_creds,
+ username=username,
+ userpass=userpass,
+ kerberos_state=use_kerberos)
+
+ ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
+
+ self._test_multiple_logon(self.lockout1krb5_creds)
+
+ def test_multiple_logon_ntlm(self):
+ username = self.lockout1ntlm_creds.get_username()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+ userpass = self.lockout1ntlm_creds.get_password()
+
+ preload_rodc_user(userdn)
+
+ use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
+ fail_creds = self.insta_creds(self.template_creds,
+ username=username,
+ userpass=userpass + "X",
+ kerberos_state=use_kerberos)
+
+ try:
+ ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
+ self.fail()
+ except LdbError as e14:
+ (num, msg) = e14.args
+ self.assertEqual(num, ERR_INVALID_CREDENTIALS)
+
+ # Succeed to reset everything to 0
+ ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
+
+ self._test_multiple_logon(self.lockout1ntlm_creds)
+
+
+def main():
+ global RODC, RWDC, CREDS, LP
+ parser = optparse.OptionParser(
+ "rodc_rwdc.py [options] <rodc host> <rwdc host>")
+
+ sambaopts = options.SambaOptions(parser)
+ versionopts = options.VersionOptions(parser)
+ credopts = options.CredentialsOptions(parser)
+ subunitopts = SubunitOptions(parser)
+
+ parser.add_option_group(sambaopts)
+ parser.add_option_group(versionopts)
+ parser.add_option_group(credopts)
+ parser.add_option_group(subunitopts)
+
+ opts, args = parser.parse_args()
+
+ LP = sambaopts.get_loadparm()
+ CREDS = credopts.get_credentials(LP)
+ CREDS.set_gensec_features(CREDS.get_gensec_features() |
+ gensec.FEATURE_SEAL)
+
+ try:
+ RODC, RWDC = args
+ except ValueError:
+ parser.print_usage()
+ sys.exit(1)
+
+ set_auto_replication(RWDC, True)
+ try:
+ TestProgram(module=__name__, opts=subunitopts)
+ finally:
+ set_auto_replication(RWDC, True)
+
+
+main()