summaryrefslogtreecommitdiffstats
path: root/tests/scripts/test081-totp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/scripts/test081-totp.py')
-rwxr-xr-xtests/scripts/test081-totp.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/tests/scripts/test081-totp.py b/tests/scripts/test081-totp.py
new file mode 100755
index 0000000..aeedaf2
--- /dev/null
+++ b/tests/scripts/test081-totp.py
@@ -0,0 +1,182 @@
+# -*- coding: utf-8 -*-
+# $OpenLDAP$
+## This work is part of OpenLDAP Software <http://www.openldap.org/>.
+##
+## Copyright 2016-2021 Ondřej Kuzník, Symas Corp.
+## Copyright 2021-2022 The OpenLDAP Foundation.
+## All rights reserved.
+##
+## Redistribution and use in source and binary forms, with or without
+## modification, are permitted only as authorized by the OpenLDAP
+## Public License.
+##
+## A copy of this license is available in the file LICENSE in the
+## top-level directory of the distribution or, alternatively, at
+## <http://www.OpenLDAP.org/license.html>.
+
+from __future__ import print_function
+
+import hashlib
+import hmac
+import os
+import struct
+import sys
+import time
+
+import ldap
+from ldap.cidict import cidict as CIDict
+from ldap.ldapobject import LDAPObject
+
+if len(sys.argv) > 1 and sys.argv[1] == "--check":
+ raise SystemExit(0)
+
+
+def get_digits(h, digits):
+ offset = h[19] & 15
+ number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff
+ number %= (10 ** digits)
+ return ("%0*d" % (digits, number)).encode()
+
+
+def get_hotp_token(secret, interval_no):
+ msg = struct.pack(">Q", interval_no)
+ h = hmac.new(secret, msg, hashlib.sha1).digest()
+ return get_digits(bytearray(h), 6)
+
+
+def get_interval(period=30):
+ return int(time.time() // period)
+
+
+def get_token_for(connection, dn, typ="totp"):
+ result = connection.search_s(dn, ldap.SCOPE_BASE)
+ dn, attrs = result[0]
+ attrs = CIDict(attrs)
+
+ tokendn = attrs['oath'+typ+'token'][0].decode()
+
+ result = connection.search_s(tokendn, ldap.SCOPE_BASE)
+ dn, attrs = result[0]
+ attrs = CIDict(attrs)
+
+ return dn, attrs
+
+
+def main():
+ uri = os.environ["URI1"]
+
+ managerdn = os.environ['MANAGERDN']
+ passwd = os.environ['PASSWD']
+
+ babsdn = os.environ['BABSDN']
+ babspw = b"bjensen"
+
+ bjornsdn = os.environ['BJORNSDN']
+ bjornspw = b"bjorn"
+
+ connection = LDAPObject(uri)
+
+ start = time.time()
+ connection.bind_s(managerdn, passwd)
+ end = time.time()
+
+ if end - start > 1:
+ print("It takes more than a second to connect and bind, "
+ "skipping potentially unstable test", file=sys.stderr)
+ raise SystemExit(0)
+
+ dn, token_entry = get_token_for(connection, babsdn)
+
+ paramsdn = token_entry['oathTOTPParams'][0].decode()
+ result = connection.search_s(paramsdn, ldap.SCOPE_BASE)
+ _, attrs = result[0]
+ params = CIDict(attrs)
+
+ secret = token_entry['oathSecret'][0]
+ period = int(params['oathTOTPTimeStepPeriod'][0].decode())
+
+ bind_conn = LDAPObject(uri)
+
+ interval_no = get_interval(period)
+ token = get_hotp_token(secret, interval_no-3)
+
+ print("Testing old tokens are not useable")
+ bind_conn.bind_s(babsdn, babspw+token)
+ try:
+ bind_conn.bind_s(babsdn, babspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ pass
+ else:
+ raise SystemExit("Bind with an old token should have failed")
+
+ interval_no = get_interval(period)
+ token = get_hotp_token(secret, interval_no)
+
+ print("Testing token can only be used once")
+ bind_conn.bind_s(babsdn, babspw+token)
+ try:
+ bind_conn.bind_s(babsdn, babspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ pass
+ else:
+ raise SystemExit("Bind with a reused token should have failed")
+
+ token = get_hotp_token(secret, interval_no+1)
+ try:
+ bind_conn.bind_s(babsdn, babspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ raise SystemExit("Bind should have succeeded")
+
+ dn, token_entry = get_token_for(connection, babsdn)
+ last = int(token_entry['oathTOTPLastTimeStep'][0].decode())
+ if last != interval_no+1:
+ SystemExit("Unexpected counter value %d (expected %d)" %
+ (last, interval_no+1))
+
+ print("Resetting counter and testing secret sharing between accounts")
+ connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])
+
+ interval_no = get_interval(period)
+ token = get_hotp_token(secret, interval_no)
+
+ try:
+ bind_conn.bind_s(bjornsdn, bjornspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ raise SystemExit("Bind should have succeeded")
+
+ try:
+ bind_conn.bind_s(babsdn, babspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ pass
+ else:
+ raise SystemExit("Bind with a reused token should have failed")
+
+ print("Testing token is retired even with a wrong password")
+ connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])
+
+ interval_no = get_interval(period)
+ token = get_hotp_token(secret, interval_no)
+
+ try:
+ bind_conn.bind_s(babsdn, b"not the password"+token)
+ except ldap.INVALID_CREDENTIALS:
+ pass
+ else:
+ raise SystemExit("Bind with an incorrect password should have failed")
+
+ try:
+ bind_conn.bind_s(babsdn, babspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ pass
+ else:
+ raise SystemExit("Bind with a reused token should have failed")
+
+ token = get_hotp_token(secret, interval_no+1)
+ try:
+ bind_conn.bind_s(babsdn, babspw+token)
+ except ldap.INVALID_CREDENTIALS:
+ raise SystemExit("Bind should have succeeded")
+
+
+if __name__ == "__main__":
+ sys.exit(main())