# -*- coding: utf-8 -*- # $OpenLDAP$ ## This work is part of OpenLDAP Software . ## ## Copyright 2016-2021 Ondřej Kuzník, Symas Corp. ## Copyright 2021 The OpenLDAP Foundation. ## All rights reserved. ## ## Redistribution and use in source and binary forms, with or without ## modification, are permitted only as authorized by the OpenLDAP ## Public License. ## ## A copy of this license is available in the file LICENSE in the ## top-level directory of the distribution or, alternatively, at ## . from __future__ import print_function import hashlib import hmac import os import struct import sys import time import ldap from ldap.cidict import cidict as CIDict from ldap.ldapobject import LDAPObject if len(sys.argv) > 1 and sys.argv[1] == "--check": raise SystemExit(0) def get_digits(h, digits): offset = h[19] & 15 number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff number %= (10 ** digits) return ("%0*d" % (digits, number)).encode() def get_hotp_token(secret, interval_no): msg = struct.pack(">Q", interval_no) h = hmac.new(secret, msg, hashlib.sha1).digest() return get_digits(bytearray(h), 6) def get_interval(period=30): return int(time.time() // period) def get_token_for(connection, dn, typ="totp"): result = connection.search_s(dn, ldap.SCOPE_BASE) dn, attrs = result[0] attrs = CIDict(attrs) tokendn = attrs['oath'+typ+'token'][0].decode() result = connection.search_s(tokendn, ldap.SCOPE_BASE) dn, attrs = result[0] attrs = CIDict(attrs) return dn, attrs def main(): uri = os.environ["URI1"] managerdn = os.environ['MANAGERDN'] passwd = os.environ['PASSWD'] babsdn = os.environ['BABSDN'] babspw = b"bjensen" bjornsdn = os.environ['BJORNSDN'] bjornspw = b"bjorn" connection = LDAPObject(uri) start = time.time() connection.bind_s(managerdn, passwd) end = time.time() if end - start > 1: print("It takes more than a second to connect and bind, " "skipping potentially unstable test", file=sys.stderr) raise SystemExit(0) dn, token_entry = get_token_for(connection, babsdn) paramsdn = token_entry['oathTOTPParams'][0].decode() result = connection.search_s(paramsdn, ldap.SCOPE_BASE) _, attrs = result[0] params = CIDict(attrs) secret = token_entry['oathSecret'][0] period = int(params['oathTOTPTimeStepPeriod'][0].decode()) bind_conn = LDAPObject(uri) interval_no = get_interval(period) token = get_hotp_token(secret, interval_no-3) print("Testing old tokens are not useable") bind_conn.bind_s(babsdn, babspw+token) try: bind_conn.bind_s(babsdn, babspw+token) except ldap.INVALID_CREDENTIALS: pass else: raise SystemExit("Bind with an old token should have failed") interval_no = get_interval(period) token = get_hotp_token(secret, interval_no) print("Testing token can only be used once") bind_conn.bind_s(babsdn, babspw+token) try: bind_conn.bind_s(babsdn, babspw+token) except ldap.INVALID_CREDENTIALS: pass else: raise SystemExit("Bind with a reused token should have failed") token = get_hotp_token(secret, interval_no+1) try: bind_conn.bind_s(babsdn, babspw+token) except ldap.INVALID_CREDENTIALS: raise SystemExit("Bind should have succeeded") dn, token_entry = get_token_for(connection, babsdn) last = int(token_entry['oathTOTPLastTimeStep'][0].decode()) if last != interval_no+1: SystemExit("Unexpected counter value %d (expected %d)" % (last, interval_no+1)) print("Resetting counter and testing secret sharing between accounts") connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])]) interval_no = get_interval(period) token = get_hotp_token(secret, interval_no) try: bind_conn.bind_s(bjornsdn, bjornspw+token) except ldap.INVALID_CREDENTIALS: raise SystemExit("Bind should have succeeded") try: bind_conn.bind_s(babsdn, babspw+token) except ldap.INVALID_CREDENTIALS: pass else: raise SystemExit("Bind with a reused token should have failed") print("Testing token is retired even with a wrong password") connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])]) interval_no = get_interval(period) token = get_hotp_token(secret, interval_no) try: bind_conn.bind_s(babsdn, b"not the password"+token) except ldap.INVALID_CREDENTIALS: pass else: raise SystemExit("Bind with an incorrect password should have failed") try: bind_conn.bind_s(babsdn, babspw+token) except ldap.INVALID_CREDENTIALS: pass else: raise SystemExit("Bind with a reused token should have failed") token = get_hotp_token(secret, interval_no+1) try: bind_conn.bind_s(babsdn, babspw+token) except ldap.INVALID_CREDENTIALS: raise SystemExit("Bind should have succeeded") if __name__ == "__main__": sys.exit(main())