# Unix SMB/CIFS implementation. # Copyright (C) Isaac Boukris 2020 # Copyright (C) Stefan Metzmacher 2020 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import socket import struct import time import datetime import random import binascii import itertools import collections import math from enum import Enum from pprint import pprint from cryptography import x509 from cryptography.hazmat.primitives import asymmetric, hashes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from pyasn1.codec.der.decoder import decode as pyasn1_der_decode from pyasn1.codec.der.encoder import encode as pyasn1_der_encode from pyasn1.codec.native.decoder import decode as pyasn1_native_decode from pyasn1.codec.native.encoder import encode as pyasn1_native_encode from pyasn1.codec.ber.encoder import BitStringEncoder import pyasn1.type.univ from pyasn1.error import PyAsn1Error from samba import unix2nttime from samba.credentials import Credentials from samba.dcerpc import claims, krb5pac, netlogon, samr, security from samba.gensec import FEATURE_SEAL from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc.misc import ( SEC_CHAN_WKSTA, SEC_CHAN_BDC, ) import samba.tests from samba.tests import TestCase import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 from samba.tests.krb5.rfc4120_constants import ( AD_IF_RELEVANT, AD_WIN2K_PAC, FX_FAST_ARMOR_AP_REQUEST, KDC_ERR_CLIENT_REVOKED, KDC_ERR_GENERIC, KDC_ERR_POLICY, KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW, KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS, KERB_ERR_TYPE_EXTENDED, KRB_AP_REP, KRB_AP_REQ, KRB_AS_REP, KRB_AS_REQ, KRB_ERROR, KRB_PRIV, KRB_TGS_REP, KRB_TGS_REQ, KU_AP_REQ_AUTH, KU_AP_REQ_ENC_PART, KU_AS_FRESHNESS, KU_AS_REP_ENC_PART, KU_AS_REQ, KU_ENC_CHALLENGE_KDC, KU_FAST_ENC, KU_FAST_FINISHED, KU_FAST_REP, KU_FAST_REQ_CHKSUM, KU_KRB_PRIV, KU_NON_KERB_CKSUM_SALT, KU_NON_KERB_SALT, KU_PKINIT_AS_REQ, KU_TGS_REP_ENC_PART_SESSION, KU_TGS_REP_ENC_PART_SUB_KEY, KU_TGS_REQ_AUTH, KU_TGS_REQ_AUTH_CKSUM, KU_TGS_REQ_AUTH_DAT_SESSION, KU_TGS_REQ_AUTH_DAT_SUBKEY, KU_TICKET, NT_PRINCIPAL, NT_SRV_INST, NT_WELLKNOWN, PADATA_AS_FRESHNESS, PADATA_ENCRYPTED_CHALLENGE, PADATA_ENC_TIMESTAMP, PADATA_ETYPE_INFO, PADATA_ETYPE_INFO2, PADATA_FOR_USER, PADATA_FX_COOKIE, PADATA_FX_ERROR, PADATA_FX_FAST, PADATA_GSS, PADATA_KDC_REQ, PADATA_PAC_OPTIONS, PADATA_PAC_REQUEST, PADATA_PKINIT_KX, PADATA_PK_AS_REP, PADATA_PK_AS_REP_19, PADATA_PK_AS_REQ, PADATA_PW_SALT, PADATA_REQ_ENC_PA_REP, PADATA_SUPPORTED_ETYPES, ) import samba.tests.krb5.kcrypto as kcrypto def BitStringEncoder_encodeValue32( self, value, asn1Spec, encodeFun, **options): # # BitStrings like KDCOptions or TicketFlags should at least # be 32-Bit on the wire # if asn1Spec is not None: # TODO: try to avoid ASN.1 schema instantiation value = asn1Spec.clone(value) valueLength = len(value) if valueLength % 8: alignedValue = value << (8 - valueLength % 8) else: alignedValue = value substrate = alignedValue.asOctets() length = len(substrate) # We need at least 32-Bit / 4-Bytes if length < 4: padding = 4 - length else: padding = 0 ret = b'\x00' + substrate + (b'\x00' * padding) return ret, False, True BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32 def BitString_NamedValues_prettyPrint(self, scope=0): ret = "%s" % self.asBinary() bits = [] highest_bit = 32 for byte in self.asNumbers(): for bit in [7, 6, 5, 4, 3, 2, 1, 0]: mask = 1 << bit if byte & mask: val = 1 else: val = 0 bits.append(val) if len(bits) < highest_bit: for bitPosition in range(len(bits), highest_bit): bits.append(0) indent = " " * scope delim = ": (\n%s " % indent for bitPosition in range(highest_bit): if bitPosition in self.prettyPrintNamedValues: name = self.prettyPrintNamedValues[bitPosition] elif bits[bitPosition] != 0: name = "unknown-bit-%u" % bitPosition else: continue ret += "%s%s:%u" % (delim, name, bits[bitPosition]) delim = ",\n%s " % indent ret += "\n%s)" % indent return ret krb5_asn1.TicketFlags.prettyPrintNamedValues =\ krb5_asn1.TicketFlagsValues.namedValues krb5_asn1.TicketFlags.namedValues =\ krb5_asn1.TicketFlagsValues.namedValues krb5_asn1.TicketFlags.prettyPrint =\ BitString_NamedValues_prettyPrint krb5_asn1.KDCOptions.prettyPrintNamedValues =\ krb5_asn1.KDCOptionsValues.namedValues krb5_asn1.KDCOptions.namedValues =\ krb5_asn1.KDCOptionsValues.namedValues krb5_asn1.KDCOptions.prettyPrint =\ BitString_NamedValues_prettyPrint krb5_asn1.APOptions.prettyPrintNamedValues =\ krb5_asn1.APOptionsValues.namedValues krb5_asn1.APOptions.namedValues =\ krb5_asn1.APOptionsValues.namedValues krb5_asn1.APOptions.prettyPrint =\ BitString_NamedValues_prettyPrint krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\ krb5_asn1.PACOptionFlagsValues.namedValues krb5_asn1.PACOptionFlags.namedValues =\ krb5_asn1.PACOptionFlagsValues.namedValues krb5_asn1.PACOptionFlags.prettyPrint =\ BitString_NamedValues_prettyPrint def Integer_NamedValues_prettyPrint(self, scope=0): intval = int(self) if intval in self.prettyPrintNamedValues: name = self.prettyPrintNamedValues[intval] else: name = "<__unknown__>" ret = "%d (0x%x) %s" % (intval, intval, name) return ret krb5_asn1.NameType.prettyPrintNamedValues =\ krb5_asn1.NameTypeValues.namedValues krb5_asn1.NameType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.AuthDataType.prettyPrintNamedValues =\ krb5_asn1.AuthDataTypeValues.namedValues krb5_asn1.AuthDataType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.PADataType.prettyPrintNamedValues =\ krb5_asn1.PADataTypeValues.namedValues krb5_asn1.PADataType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.EncryptionType.prettyPrintNamedValues =\ krb5_asn1.EncryptionTypeValues.namedValues krb5_asn1.EncryptionType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.ChecksumType.prettyPrintNamedValues =\ krb5_asn1.ChecksumTypeValues.namedValues krb5_asn1.ChecksumType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\ krb5_asn1.KerbErrorDataTypeValues.namedValues krb5_asn1.KerbErrorDataType.prettyPrint =\ Integer_NamedValues_prettyPrint class Krb5EncryptionKey: __slots__ = [ 'ctype', 'etype', 'key', 'kvno', ] def __init__(self, key, kvno): EncTypeChecksum = { kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256, kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128, kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5, } self.key = key self.etype = key.enctype self.ctype = EncTypeChecksum[self.etype] self.kvno = kvno def __str__(self): return "etype=%d ctype=%d kvno=%d key=%s" % ( self.etype, self.ctype, self.kvno, self.key) def encrypt(self, usage, plaintext): ciphertext = kcrypto.encrypt(self.key, usage, plaintext) return ciphertext def decrypt(self, usage, ciphertext): plaintext = kcrypto.decrypt(self.key, usage, ciphertext) return plaintext def make_zeroed_checksum(self, ctype=None): if ctype is None: ctype = self.ctype checksum_len = kcrypto.checksum_len(ctype) return bytes(checksum_len) def make_checksum(self, usage, plaintext, ctype=None): if ctype is None: ctype = self.ctype cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext) return cksum def verify_checksum(self, usage, plaintext, ctype, cksum): if self.ctype != ctype: raise AssertionError(f'key checksum type ({self.ctype}) != ' f'checksum type ({ctype})') kcrypto.verify_checksum(ctype, self.key, usage, plaintext, cksum) def export_obj(self): EncryptionKey_obj = { 'keytype': self.etype, 'keyvalue': self.key.contents, } return EncryptionKey_obj class RodcPacEncryptionKey(Krb5EncryptionKey): __slots__ = ['rodc_id'] def __init__(self, key, kvno, rodc_id=None): super().__init__(key, kvno) if rodc_id is None: kvno = self.kvno if kvno is not None: kvno >>= 16 kvno &= (1 << 16) - 1 rodc_id = kvno or None if rodc_id is not None: self.rodc_id = rodc_id.to_bytes(2, byteorder='little') else: self.rodc_id = b'' def make_rodc_zeroed_checksum(self, ctype=None): checksum = super().make_zeroed_checksum(ctype) return checksum + bytes(len(self.rodc_id)) def make_rodc_checksum(self, usage, plaintext, ctype=None): checksum = super().make_checksum(usage, plaintext, ctype) return checksum + self.rodc_id def verify_rodc_checksum(self, usage, plaintext, ctype, cksum): if self.rodc_id: cksum, cksum_rodc_id = cksum[:-2], cksum[-2:] if self.rodc_id != cksum_rodc_id: raise AssertionError(f'{self.rodc_id.hex()} != ' f'{cksum_rodc_id.hex()}') super().verify_checksum(usage, plaintext, ctype, cksum) class ZeroedChecksumKey(RodcPacEncryptionKey): def make_checksum(self, usage, plaintext, ctype=None): return self.make_zeroed_checksum(ctype) def make_rodc_checksum(self, usage, plaintext, ctype=None): return self.make_rodc_zeroed_checksum(ctype) class WrongLengthChecksumKey(RodcPacEncryptionKey): __slots__ = ['_length'] def __init__(self, key, kvno, length): super().__init__(key, kvno) self._length = length @classmethod def _adjust_to_length(cls, checksum, length): diff = length - len(checksum) if diff > 0: checksum += bytes(diff) elif diff < 0: checksum = checksum[:length] return checksum def make_zeroed_checksum(self, ctype=None): return bytes(self._length) def make_checksum(self, usage, plaintext, ctype=None): checksum = super().make_checksum(usage, plaintext, ctype) return self._adjust_to_length(checksum, self._length) def make_rodc_zeroed_checksum(self, ctype=None): return bytes(self._length) def make_rodc_checksum(self, usage, plaintext, ctype=None): checksum = super().make_rodc_checksum(usage, plaintext, ctype) return self._adjust_to_length(checksum, self._length) class KerberosCredentials(Credentials): __slots__ = [ '_private_key', 'account_type', 'ap_supported_enctypes', 'as_supported_enctypes', 'dn', 'forced_keys', 'forced_salt', 'kvno', 'sid', 'spn', 'tgs_supported_enctypes', 'upn', ] non_etype_bits = ( security.KERB_ENCTYPE_FAST_SUPPORTED) | ( security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED) | ( security.KERB_ENCTYPE_CLAIMS_SUPPORTED) | ( security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED) | ( security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) def __init__(self): super().__init__() all_enc_types = 0 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 self.as_supported_enctypes = all_enc_types self.tgs_supported_enctypes = all_enc_types self.ap_supported_enctypes = all_enc_types self.kvno = None self.forced_keys = {} self.forced_salt = None self.dn = None self.upn = None self.spn = None self.sid = None self.account_type = None self._private_key = None def set_as_supported_enctypes(self, value): self.as_supported_enctypes = int(value) def set_tgs_supported_enctypes(self, value): self.tgs_supported_enctypes = int(value) def set_ap_supported_enctypes(self, value): self.ap_supported_enctypes = int(value) etype_map = collections.OrderedDict([ (kcrypto.Enctype.AES256, security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96), (kcrypto.Enctype.AES128, security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96), (kcrypto.Enctype.RC4, security.KERB_ENCTYPE_RC4_HMAC_MD5), (kcrypto.Enctype.DES_MD5, security.KERB_ENCTYPE_DES_CBC_MD5), (kcrypto.Enctype.DES_CRC, security.KERB_ENCTYPE_DES_CBC_CRC) ]) @classmethod def etypes_to_bits(cls, etypes): bits = 0 for etype in etypes: bit = cls.etype_map[etype] if bits & bit: raise ValueError(f'Got duplicate etype: {etype}') bits |= bit return bits @classmethod def bits_to_etypes(cls, bits): etypes = () for etype, bit in cls.etype_map.items(): if bit & bits: bits &= ~bit etypes += (etype,) bits &= ~cls.non_etype_bits if bits != 0: raise ValueError(f'Unsupported etype bits: {bits}') return etypes def get_as_krb5_etypes(self): return self.bits_to_etypes(self.as_supported_enctypes) def get_tgs_krb5_etypes(self): return self.bits_to_etypes(self.tgs_supported_enctypes) def get_ap_krb5_etypes(self): return self.bits_to_etypes(self.ap_supported_enctypes) def set_kvno(self, kvno): # Sign-extend from 32 bits. if kvno & 1 << 31: kvno |= -1 << 31 self.kvno = kvno def get_kvno(self): return self.kvno def set_forced_key(self, etype, hexkey): etype = int(etype) contents = binascii.a2b_hex(hexkey) key = kcrypto.Key(etype, contents) self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno) # Also set the NT hash of computer accounts for which we don’t know the # password. if etype == kcrypto.Enctype.RC4 and self.get_password() is None: nt_hash = samr.Password() nt_hash.hash = list(contents) self.set_nt_hash(nt_hash) def get_forced_key(self, etype): etype = int(etype) return self.forced_keys.get(etype) def set_forced_salt(self, salt): self.forced_salt = bytes(salt) def get_forced_salt(self): return self.forced_salt def get_salt(self): if self.forced_salt is not None: return self.forced_salt upn = self.get_upn() if upn is not None: salt_name = upn.rsplit('@', 1)[0].replace('/', '') else: salt_name = self.get_username() secure_schannel_type = self.get_secure_channel_type() if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC]: salt_name = self.get_username().lower() if salt_name[-1] == '$': salt_name = salt_name[:-1] salt_string = '%shost%s.%s' % ( self.get_realm().upper(), salt_name, self.get_realm().lower()) else: salt_string = self.get_realm().upper() + salt_name return salt_string.encode('utf-8') def set_dn(self, dn): self.dn = dn def get_dn(self): return self.dn def set_spn(self, spn): self.spn = spn def get_spn(self): return self.spn def set_upn(self, upn): self.upn = upn def get_upn(self): return self.upn def set_sid(self, sid): self.sid = sid def get_sid(self): return self.sid def get_rid(self): sid = self.get_sid() if sid is None: return None _, rid = sid.rsplit('-', 1) return int(rid) def set_type(self, account_type): self.account_type = account_type def get_type(self): return self.account_type def update_password(self, password): self.set_password(password) self.set_kvno(self.get_kvno() + 1) def get_private_key(self): if self._private_key is None: # Generate a new keypair. self._private_key = asymmetric.rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) return self._private_key def get_public_key(self): return self.get_private_key().public_key() class KerberosTicketCreds: __slots__ = [ 'cname', 'crealm', 'decryption_key', 'encpart_private', 'session_key', 'sname', 'srealm', 'ticket_private', 'ticket', ] def __init__(self, ticket, session_key, crealm=None, cname=None, srealm=None, sname=None, decryption_key=None, ticket_private=None, encpart_private=None): self.ticket = ticket self.session_key = session_key self.crealm = crealm self.cname = cname self.srealm = srealm self.sname = sname self.decryption_key = decryption_key self.ticket_private = ticket_private self.encpart_private = encpart_private def set_sname(self, sname): self.ticket['sname'] = sname self.sname = sname class PkInit(Enum): NOT_USED = object() PUBLIC_KEY = object() DIFFIE_HELLMAN = object() class RawKerberosTest(TestCase): """A raw Kerberos Test case.""" class KpasswdMode(Enum): SET = object() CHANGE = object() # The location of a SID within the PAC class SidType(Enum): BASE_SID = object() # in info3.base.groups EXTRA_SID = object() # in info3.sids RESOURCE_SID = object() # in resource_groups PRIMARY_GID = object() # the (sole) primary group def __repr__(self): return self.__str__() pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM, krb5pac.PAC_TYPE_KDC_CHECKSUM, krb5pac.PAC_TYPE_TICKET_CHECKSUM, krb5pac.PAC_TYPE_FULL_CHECKSUM} etypes_to_test = ( {"value": -1111, "name": "dummy", }, {"value": kcrypto.Enctype.AES256, "name": "aes128", }, {"value": kcrypto.Enctype.AES128, "name": "aes256", }, {"value": kcrypto.Enctype.RC4, "name": "rc4", }, ) expect_padata_outer = object() setup_etype_test_permutations_done = False @classmethod def setup_etype_test_permutations(cls): if cls.setup_etype_test_permutations_done: return res = [] num_idxs = len(cls.etypes_to_test) permutations = [] for num in range(1, num_idxs + 1): chunk = list(itertools.permutations(range(num_idxs), num)) for e in chunk: el = list(e) permutations.append(el) for p in permutations: name = None etypes = () for idx in p: n = cls.etypes_to_test[idx]["name"] if name is None: name = n else: name += "_%s" % n etypes += (cls.etypes_to_test[idx]["value"],) r = {"name": name, "etypes": etypes, } res.append(r) cls.etype_test_permutations = res cls.setup_etype_test_permutations_done = True @classmethod def etype_test_permutation_name_idx(cls): cls.setup_etype_test_permutations() res = [] idx = 0 for e in cls.etype_test_permutations: r = (e['name'], idx) idx += 1 res.append(r) return res def etype_test_permutation_by_idx(self, idx): e = self.etype_test_permutations[idx] return (e['name'], e['etypes']) @classmethod def setUpClass(cls): super().setUpClass() cls.host = samba.tests.env_get_var_value('SERVER') cls.dc_host = samba.tests.env_get_var_value('DC_SERVER') # A dictionary containing credentials that have already been # obtained. cls.creds_dict = {} kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT', allow_missing=True) if kdc_fast_support is None: kdc_fast_support = '0' cls.kdc_fast_support = bool(int(kdc_fast_support)) kdc_claims_support = samba.tests.env_get_var_value('CLAIMS_SUPPORT', allow_missing=True) if kdc_claims_support is None: kdc_claims_support = '0' cls.kdc_claims_support = bool(int(kdc_claims_support)) kdc_compound_id_support = samba.tests.env_get_var_value( 'COMPOUND_ID_SUPPORT', allow_missing=True) if kdc_compound_id_support is None: kdc_compound_id_support = '0' cls.kdc_compound_id_support = bool(int(kdc_compound_id_support)) tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT', allow_missing=True) if tkt_sig_support is None: tkt_sig_support = '1' cls.tkt_sig_support = bool(int(tkt_sig_support)) full_sig_support = samba.tests.env_get_var_value('FULL_SIG_SUPPORT', allow_missing=True) if full_sig_support is None: full_sig_support = '1' cls.full_sig_support = bool(int(full_sig_support)) expect_pac = samba.tests.env_get_var_value('EXPECT_PAC', allow_missing=True) if expect_pac is None: expect_pac = '1' cls.expect_pac = bool(int(expect_pac)) expect_extra_pac_buffers = samba.tests.env_get_var_value( 'EXPECT_EXTRA_PAC_BUFFERS', allow_missing=True) if expect_extra_pac_buffers is None: expect_extra_pac_buffers = '1' cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers)) cname_checking = samba.tests.env_get_var_value('CHECK_CNAME', allow_missing=True) if cname_checking is None: cname_checking = '1' cls.cname_checking = bool(int(cname_checking)) padata_checking = samba.tests.env_get_var_value('CHECK_PADATA', allow_missing=True) if padata_checking is None: padata_checking = '1' cls.padata_checking = bool(int(padata_checking)) kadmin_is_tgs = samba.tests.env_get_var_value('KADMIN_IS_TGS', allow_missing=True) if kadmin_is_tgs is None: kadmin_is_tgs = '0' cls.kadmin_is_tgs = bool(int(kadmin_is_tgs)) default_etypes = samba.tests.env_get_var_value('DEFAULT_ETYPES', allow_missing=True) if default_etypes is not None: default_etypes = int(default_etypes) cls.default_etypes = default_etypes forced_rc4 = samba.tests.env_get_var_value('FORCED_RC4', allow_missing=True) if forced_rc4 is None: forced_rc4 = '0' cls.forced_rc4 = bool(int(forced_rc4)) expect_nt_hash = samba.tests.env_get_var_value('EXPECT_NT_HASH', allow_missing=True) if expect_nt_hash is None: expect_nt_hash = '1' cls.expect_nt_hash = bool(int(expect_nt_hash)) expect_nt_status = samba.tests.env_get_var_value('EXPECT_NT_STATUS', allow_missing=True) if expect_nt_status is None: expect_nt_status = '1' cls.expect_nt_status = bool(int(expect_nt_status)) crash_windows = samba.tests.env_get_var_value('CRASH_WINDOWS', allow_missing=True) if crash_windows is None: crash_windows = '1' cls.crash_windows = bool(int(crash_windows)) def setUp(self): super().setUp() self.do_asn1_print = False self.do_hexdump = False strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True) if strict_checking is None: strict_checking = '1' self.strict_checking = bool(int(strict_checking)) self.s = None self.unspecified_kvno = object() def tearDown(self): self._disconnect("tearDown") super().tearDown() def _disconnect(self, reason): if self.s is None: return self.s.close() self.s = None if self.do_hexdump: sys.stderr.write("disconnect[%s]\n" % reason) def _connect_tcp(self, host, port=None): if port is None: port = 88 try: self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.SOL_TCP, 0) self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2]) self.s.settimeout(10) self.s.connect(self.a[0][4]) except socket.error: self.s.close() raise def connect(self, host, port=None): self.assertNotConnected() self._connect_tcp(host, port) if self.do_hexdump: sys.stderr.write("connected[%s]\n" % host) def env_get_var(self, varname, prefix, fallback_default=True, allow_missing=False): val = None if prefix is not None: allow_missing_prefix = allow_missing or fallback_default val = samba.tests.env_get_var_value( '%s_%s' % (prefix, varname), allow_missing=allow_missing_prefix) else: fallback_default = True if val is None and fallback_default: val = samba.tests.env_get_var_value(varname, allow_missing=allow_missing) return val def _get_krb5_creds_from_env(self, prefix, default_username=None, allow_missing_password=False, allow_missing_keys=True, require_strongest_key=False): c = KerberosCredentials() c.guess() domain = self.env_get_var('DOMAIN', prefix) realm = self.env_get_var('REALM', prefix) allow_missing_username = default_username is not None username = self.env_get_var('USERNAME', prefix, fallback_default=False, allow_missing=allow_missing_username) if username is None: username = default_username password = self.env_get_var('PASSWORD', prefix, fallback_default=False, allow_missing=allow_missing_password) c.set_domain(domain) c.set_realm(realm) c.set_username(username) if password is not None: c.set_password(password) as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES', prefix, allow_missing=True) if as_supported_enctypes is not None: c.set_as_supported_enctypes(as_supported_enctypes) tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES', prefix, allow_missing=True) if tgs_supported_enctypes is not None: c.set_tgs_supported_enctypes(tgs_supported_enctypes) ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES', prefix, allow_missing=True) if ap_supported_enctypes is not None: c.set_ap_supported_enctypes(ap_supported_enctypes) if require_strongest_key: kvno_allow_missing = False if password is None: aes256_allow_missing = False else: aes256_allow_missing = True else: kvno_allow_missing = allow_missing_keys aes256_allow_missing = allow_missing_keys kvno = self.env_get_var('KVNO', prefix, fallback_default=False, allow_missing=kvno_allow_missing) if kvno is not None: c.set_kvno(int(kvno)) aes256_key = self.env_get_var('AES256_KEY_HEX', prefix, fallback_default=False, allow_missing=aes256_allow_missing) if aes256_key is not None: c.set_forced_key(kcrypto.Enctype.AES256, aes256_key) aes128_key = self.env_get_var('AES128_KEY_HEX', prefix, fallback_default=False, allow_missing=True) if aes128_key is not None: c.set_forced_key(kcrypto.Enctype.AES128, aes128_key) rc4_key = self.env_get_var('RC4_KEY_HEX', prefix, fallback_default=False, allow_missing=True) if rc4_key is not None: c.set_forced_key(kcrypto.Enctype.RC4, rc4_key) if not allow_missing_keys: self.assertTrue(c.forced_keys, 'Please supply %s encryption keys ' 'in environment' % prefix) return c def _get_krb5_creds(self, prefix, default_username=None, allow_missing_password=False, allow_missing_keys=True, require_strongest_key=False, fallback_creds_fn=None): if prefix in self.creds_dict: return self.creds_dict[prefix] # We don't have the credentials already creds = None env_err = None try: # Try to obtain them from the environment creds = self._get_krb5_creds_from_env( prefix, default_username=default_username, allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys, require_strongest_key=require_strongest_key) except Exception as err: # An error occurred, so save it for later env_err = err else: self.assertIsNotNone(creds) # Save the obtained credentials self.creds_dict[prefix] = creds return creds if fallback_creds_fn is not None: try: # Try to use the fallback method creds = fallback_creds_fn() except Exception as err: print("ERROR FROM ENV: %r" % (env_err)) print("FALLBACK-FN: %s" % (fallback_creds_fn)) print("FALLBACK-ERROR: %r" % (err)) else: self.assertIsNotNone(creds) # Save the obtained credentials self.creds_dict[prefix] = creds return creds # Both methods failed, so raise the exception from the # environment method raise env_err def get_user_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix=None, allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_service_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='SERVICE', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_client_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='CLIENT', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_server_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='SERVER', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_admin_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='ADMIN', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL) c.set_workstation('') return c def get_rodc_krbtgt_creds(self, require_keys=True, require_strongest_key=False): if require_strongest_key: self.assertTrue(require_keys) c = self._get_krb5_creds(prefix='RODC_KRBTGT', allow_missing_password=True, allow_missing_keys=not require_keys, require_strongest_key=require_strongest_key) return c def get_krbtgt_creds(self, require_keys=True, require_strongest_key=False): if require_strongest_key: self.assertTrue(require_keys) c = self._get_krb5_creds(prefix='KRBTGT', default_username='krbtgt', allow_missing_password=True, allow_missing_keys=not require_keys, require_strongest_key=require_strongest_key) return c def get_anon_creds(self): c = Credentials() c.set_anonymous() return c # Overridden by KDCBaseTest. At this level we don't know what actual # enctypes are supported, so the best we can do is go by whether NT hashes # are expected and whether the account is a workstation or not. This # matches the behaviour that tests expect by default. def get_default_enctypes(self, creds): self.assertIsNotNone(creds) default_enctypes = [ kcrypto.Enctype.AES256, kcrypto.Enctype.AES128, ] if self.expect_nt_hash or creds.get_workstation(): default_enctypes.append(kcrypto.Enctype.RC4) return default_enctypes def asn1_dump(self, name, obj, asn1_print=None): if asn1_print is None: asn1_print = self.do_asn1_print if asn1_print: if name is not None: sys.stderr.write("%s:\n%s" % (name, obj)) else: sys.stderr.write("%s" % (obj)) def hex_dump(self, name, blob, hexdump=None): if hexdump is None: hexdump = self.do_hexdump if hexdump: sys.stderr.write( "%s: %d\n%s" % (name, len(blob), self.hexdump(blob))) def der_decode( self, blob, asn1Spec=None, native_encode=True, asn1_print=None, hexdump=None): if asn1Spec is not None: class_name = type(asn1Spec).__name__.split(':')[0] else: class_name = "" self.hex_dump(class_name, blob, hexdump=hexdump) obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec) self.asn1_dump(None, obj, asn1_print=asn1_print) if native_encode: obj = pyasn1_native_encode(obj) return obj def der_encode( self, obj, asn1Spec=None, native_decode=True, asn1_print=None, hexdump=None): if native_decode: obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec) class_name = type(obj).__name__.split(':')[0] if class_name is not None: self.asn1_dump(None, obj, asn1_print=asn1_print) blob = pyasn1_der_encode(obj) if class_name is not None: self.hex_dump(class_name, blob, hexdump=hexdump) return blob def send_pdu(self, req, asn1_print=None, hexdump=None): k5_pdu = self.der_encode( req, native_decode=False, asn1_print=asn1_print, hexdump=False) self.send_msg(k5_pdu, hexdump=hexdump) def send_msg(self, msg, hexdump=None): header = struct.pack('>I', len(msg)) req_pdu = header req_pdu += msg self.hex_dump("send_msg", header, hexdump=hexdump) self.hex_dump("send_msg", msg, hexdump=hexdump) try: while True: sent = self.s.send(req_pdu, 0) if sent == len(req_pdu): return req_pdu = req_pdu[sent:] except socket.error as e: self._disconnect("send_msg: %s" % e) raise def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None): rep_pdu = None try: if timeout is not None: self.s.settimeout(timeout) rep_pdu = self.s.recv(num_recv, 0) self.s.settimeout(10) if len(rep_pdu) == 0: self._disconnect("recv_raw: EOF") return None self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump) except socket.timeout: self.s.settimeout(10) sys.stderr.write("recv_raw: TIMEOUT\n") except socket.error as e: self._disconnect("recv_raw: %s" % e) raise return rep_pdu def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None): raw_pdu = self.recv_raw( num_recv=4, hexdump=hexdump, timeout=timeout) if raw_pdu is None: return None header = struct.unpack(">I", raw_pdu[0:4]) k5_len = header[0] if k5_len == 0: return "" missing = k5_len rep_pdu = b'' while missing > 0: raw_pdu = self.recv_raw( num_recv=missing, hexdump=hexdump, timeout=timeout) self.assertGreaterEqual(len(raw_pdu), 1) rep_pdu += raw_pdu missing = k5_len - len(rep_pdu) return rep_pdu def recv_reply(self, asn1_print=None, hexdump=None, timeout=None): rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) if not rep_pdu: return None, rep_pdu k5_raw = self.der_decode( rep_pdu, asn1Spec=None, native_encode=False, asn1_print=False, hexdump=False) pvno = k5_raw['field-0'] self.assertEqual(pvno, 5) msg_type = k5_raw['field-1'] self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR]) if msg_type == KRB_AS_REP: asn1Spec = krb5_asn1.AS_REP() elif msg_type == KRB_TGS_REP: asn1Spec = krb5_asn1.TGS_REP() elif msg_type == KRB_ERROR: asn1Spec = krb5_asn1.KRB_ERROR() rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec, asn1_print=asn1_print, hexdump=False) return (rep, rep_pdu) def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None): (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) return rep def assertIsConnected(self): self.assertIsNotNone(self.s, msg="Not connected") def assertNotConnected(self): self.assertIsNone(self.s, msg="Is connected") def send_recv_transaction( self, req, asn1_print=None, hexdump=None, timeout=None, to_rodc=False): host = self.host if to_rodc else self.dc_host self.connect(host) try: self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump) rep = self.recv_pdu( asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) except Exception: self._disconnect("transaction failed") raise self._disconnect("transaction done") return rep def getElementValue(self, obj, elem): return obj.get(elem) def assertElementMissing(self, obj, elem): v = self.getElementValue(obj, elem) self.assertIsNone(v) def assertElementPresent(self, obj, elem, expect_empty=False): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) if self.strict_checking: if isinstance(v, collections.abc.Container): if expect_empty: self.assertEqual(0, len(v)) else: self.assertNotEqual(0, len(v)) def assertElementEqual(self, obj, elem, value): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) self.assertEqual(v, value) def assertElementEqualUTF8(self, obj, elem, value): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) self.assertEqual(v, bytes(value, 'utf8')) def assertPrincipalEqual(self, princ1, princ2): self.assertEqual(princ1['name-type'], princ2['name-type']) self.assertEqual( len(princ1['name-string']), len(princ2['name-string']), msg="princ1=%s != princ2=%s" % (princ1, princ2)) for idx in range(len(princ1['name-string'])): self.assertEqual( princ1['name-string'][idx], princ2['name-string'][idx], msg="princ1=%s != princ2=%s" % (princ1, princ2)) def assertElementEqualPrincipal(self, obj, elem, value): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName()) self.assertPrincipalEqual(v, value) def assertElementKVNO(self, obj, elem, value): v = self.getElementValue(obj, elem) if value == "autodetect": value = v if value is not None: self.assertIsNotNone(v) # The value on the wire should never be 0 self.assertNotEqual(v, 0) # unspecified_kvno means we don't know the kvno, # but want to enforce its presence if value is not self.unspecified_kvno: value = int(value) self.assertNotEqual(value, 0) self.assertEqual(v, value) else: self.assertIsNone(v) def assertElementFlags(self, obj, elem, expected, unexpected): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) if expected is not None: self.assertIsInstance(expected, krb5_asn1.TicketFlags) for i, flag in enumerate(expected): if flag == 1: self.assertEqual('1', v[i], f"'{expected.namedValues[i]}' " f"expected in {v}") if unexpected is not None: self.assertIsInstance(unexpected, krb5_asn1.TicketFlags) for i, flag in enumerate(unexpected): if flag == 1: self.assertEqual('0', v[i], f"'{unexpected.namedValues[i]}' " f"unexpected in {v}") def assertSequenceElementsEqual(self, expected, got, *, require_strict=None, unchecked=None, require_ordered=True): if self.strict_checking and require_ordered and not unchecked: self.assertEqual(expected, got) else: fail_msg = f'expected: {expected} got: {got}' ignored = set() if unchecked: ignored.update(unchecked) if require_strict and not self.strict_checking: ignored.update(require_strict) if ignored: fail_msg += f' (ignoring: {ignored})' expected = (x for x in expected if x not in ignored) got = (x for x in got if x not in ignored) self.assertCountEqual(expected, got, fail_msg) def get_KerberosTimeWithUsec(self, epoch=None, offset=None): if epoch is None: epoch = time.time() if offset is not None: epoch = epoch + int(offset) dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc) return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond) def get_KerberosTime(self, epoch=None, offset=None): (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset) return s def get_EpochFromKerberosTime(self, kerberos_time): if isinstance(kerberos_time, bytes): kerberos_time = kerberos_time.decode() epoch = datetime.datetime.strptime(kerberos_time, '%Y%m%d%H%M%SZ') epoch = epoch.replace(tzinfo=datetime.timezone.utc) epoch = int(epoch.timestamp()) return epoch def get_Nonce(self): nonce_min = 0x7f000000 nonce_max = 0x7fffffff v = random.randint(nonce_min, nonce_max) return v def get_pa_dict(self, pa_data): pa_dict = {} if pa_data is not None: for pa in pa_data: pa_type = pa['padata-type'] if pa_type in pa_dict: raise RuntimeError(f'Duplicate type {pa_type}') pa_dict[pa_type] = pa['padata-value'] return pa_dict def SessionKey_create(self, etype, contents, kvno=None): key = kcrypto.Key(etype, contents) return RodcPacEncryptionKey(key, kvno) def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None, params=None): self.assertIsNotNone(pwd) self.assertIsNotNone(salt) key = kcrypto.string_to_key(etype, pwd, salt, params=params) return RodcPacEncryptionKey(key, kvno) def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None): e = etype_info2['etype'] salt = etype_info2.get('salt') _params = etype_info2.get('s2kparams') return self.PasswordKey_from_etype(creds, e, kvno=kvno, salt=salt) def PasswordKey_from_creds(self, creds, etype): kvno = creds.get_kvno() salt = creds.get_salt() return self.PasswordKey_from_etype(creds, etype, kvno=kvno, salt=salt) def PasswordKey_from_etype(self, creds, etype, kvno=None, salt=None): if etype == kcrypto.Enctype.RC4: nthash = creds.get_nt_hash() return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno) password = creds.get_password().encode('utf-8') return self.PasswordKey_create( etype=etype, pwd=password, salt=salt, kvno=kvno) def TicketDecryptionKey_from_creds(self, creds, etype=None): if etype is None: etypes = creds.get_tgs_krb5_etypes() if etypes and etypes[0] not in (kcrypto.Enctype.DES_CRC, kcrypto.Enctype.DES_MD5): etype = etypes[0] else: etype = kcrypto.Enctype.RC4 forced_key = creds.get_forced_key(etype) if forced_key is not None: return forced_key kvno = creds.get_kvno() fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] " "nor a password specified, " % ( creds.get_username(), etype, kvno)) if etype == kcrypto.Enctype.RC4: nthash = creds.get_nt_hash() self.assertIsNotNone(nthash, msg=fail_msg) return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno) password = creds.get_password() self.assertIsNotNone(password, msg=fail_msg) salt = creds.get_salt() return self.PasswordKey_create(etype=etype, pwd=password, salt=salt, kvno=kvno) def RandomKey(self, etype): e = kcrypto._get_enctype_profile(etype) contents = samba.generate_random_bytes(e.keysize) return self.SessionKey_create(etype=etype, contents=contents) def EncryptionKey_import(self, EncryptionKey_obj): return self.SessionKey_create(EncryptionKey_obj['keytype'], EncryptionKey_obj['keyvalue']) def EncryptedData_create(self, key, usage, plaintext): # EncryptedData ::= SEQUENCE { # etype [0] Int32 -- EncryptionType --, # kvno [1] Int32 OPTIONAL, # cipher [2] OCTET STRING -- ciphertext # } ciphertext = key.encrypt(usage, plaintext) EncryptedData_obj = { 'etype': key.etype, 'cipher': ciphertext } if key.kvno is not None: EncryptedData_obj['kvno'] = key.kvno return EncryptedData_obj def Checksum_create(self, key, usage, plaintext, ctype=None): # Checksum ::= SEQUENCE { # cksumtype [0] Int32, # checksum [1] OCTET STRING # } if ctype is None: ctype = key.ctype checksum = key.make_checksum(usage, plaintext, ctype=ctype) Checksum_obj = { 'cksumtype': ctype, 'checksum': checksum, } return Checksum_obj @classmethod def PrincipalName_create(cls, name_type, names): # PrincipalName ::= SEQUENCE { # name-type [0] Int32, # name-string [1] SEQUENCE OF KerberosString # } PrincipalName_obj = { 'name-type': name_type, 'name-string': names, } return PrincipalName_obj def AuthorizationData_create(self, ad_type, ad_data): # AuthorizationData ::= SEQUENCE { # ad-type [0] Int32, # ad-data [1] OCTET STRING # } AUTH_DATA_obj = { 'ad-type': ad_type, 'ad-data': ad_data } return AUTH_DATA_obj def PA_DATA_create(self, padata_type, padata_value): # PA-DATA ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # padata-type [1] Int32, # padata-value [2] OCTET STRING -- might be encoded AP-REQ # } PA_DATA_obj = { 'padata-type': padata_type, 'padata-value': padata_value, } return PA_DATA_obj def PA_ENC_TS_ENC_create(self, ts, usec): # PA-ENC-TS-ENC ::= SEQUENCE { # patimestamp[0] KerberosTime, -- client's time # pausec[1] krb5int32 OPTIONAL # } PA_ENC_TS_ENC_obj = { 'patimestamp': ts, 'pausec': usec, } return PA_ENC_TS_ENC_obj def PA_PAC_OPTIONS_create(self, options): # PA-PAC-OPTIONS ::= SEQUENCE { # options [0] PACOptionFlags # } PA_PAC_OPTIONS_obj = { 'options': options } return PA_PAC_OPTIONS_obj def KRB_FAST_ARMOR_create(self, armor_type, armor_value): # KrbFastArmor ::= SEQUENCE { # armor-type [0] Int32, # armor-value [1] OCTET STRING, # ... # } KRB_FAST_ARMOR_obj = { 'armor-type': armor_type, 'armor-value': armor_value } return KRB_FAST_ARMOR_obj def KRB_FAST_REQ_create(self, fast_options, padata, req_body): # KrbFastReq ::= SEQUENCE { # fast-options [0] FastOptions, # padata [1] SEQUENCE OF PA-DATA, # req-body [2] KDC-REQ-BODY, # ... # } KRB_FAST_REQ_obj = { 'fast-options': fast_options, 'padata': padata, 'req-body': req_body } return KRB_FAST_REQ_obj def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req): # KrbFastArmoredReq ::= SEQUENCE { # armor [0] KrbFastArmor OPTIONAL, # req-checksum [1] Checksum, # enc-fast-req [2] EncryptedData -- KrbFastReq -- # } KRB_FAST_ARMORED_REQ_obj = { 'req-checksum': req_checksum, 'enc-fast-req': enc_fast_req } if armor is not None: KRB_FAST_ARMORED_REQ_obj['armor'] = armor return KRB_FAST_ARMORED_REQ_obj def PA_FX_FAST_REQUEST_create(self, armored_data): # PA-FX-FAST-REQUEST ::= CHOICE { # armored-data [0] KrbFastArmoredReq, # ... # } PA_FX_FAST_REQUEST_obj = { 'armored-data': armored_data } return PA_FX_FAST_REQUEST_obj def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True): # KERB-PA-PAC-REQUEST ::= SEQUENCE { # include-pac[0] BOOLEAN --If TRUE, and no pac present, # -- include PAC. # --If FALSE, and PAC present, # -- remove PAC. # } KERB_PA_PAC_REQUEST_obj = { 'include-pac': include_pac, } if not pa_data_create: return KERB_PA_PAC_REQUEST_obj pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj, asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST()) pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac) return pa_data def get_pa_pac_options(self, options): pac_options = self.PA_PAC_OPTIONS_create(options) pac_options = self.der_encode(pac_options, asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options) return pac_options def KDC_REQ_BODY_create(self, kdc_options, cname, realm, sname, from_time, till_time, renew_time, nonce, etypes, addresses, additional_tickets, EncAuthorizationData, EncAuthorizationData_key, EncAuthorizationData_usage, asn1_print=None, hexdump=None): # KDC-REQ-BODY ::= SEQUENCE { # kdc-options [0] KDCOptions, # cname [1] PrincipalName OPTIONAL # -- Used only in AS-REQ --, # realm [2] Realm # -- Server's realm # -- Also client's in AS-REQ --, # sname [3] PrincipalName OPTIONAL, # from [4] KerberosTime OPTIONAL, # till [5] KerberosTime, # rtime [6] KerberosTime OPTIONAL, # nonce [7] UInt32, # etype [8] SEQUENCE OF Int32 # -- EncryptionType # -- in preference order --, # addresses [9] HostAddresses OPTIONAL, # enc-authorization-data [10] EncryptedData OPTIONAL # -- AuthorizationData --, # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # -- NOTE: not empty # } if EncAuthorizationData is not None: enc_ad_plain = self.der_encode( EncAuthorizationData, asn1Spec=krb5_asn1.AuthorizationData(), asn1_print=asn1_print, hexdump=hexdump) enc_ad = self.EncryptedData_create(EncAuthorizationData_key, EncAuthorizationData_usage, enc_ad_plain) else: enc_ad = None KDC_REQ_BODY_obj = { 'kdc-options': kdc_options, 'realm': realm, 'till': till_time, 'nonce': nonce, 'etype': etypes, } if cname is not None: KDC_REQ_BODY_obj['cname'] = cname if sname is not None: KDC_REQ_BODY_obj['sname'] = sname if from_time is not None: KDC_REQ_BODY_obj['from'] = from_time if renew_time is not None: KDC_REQ_BODY_obj['rtime'] = renew_time if addresses is not None: KDC_REQ_BODY_obj['addresses'] = addresses if enc_ad is not None: KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad if additional_tickets is not None: KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets return KDC_REQ_BODY_obj def KDC_REQ_create(self, msg_type, padata, req_body, asn1Spec=None, asn1_print=None, hexdump=None): # KDC-REQ ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # pvno [1] INTEGER (5) , # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), # padata [3] SEQUENCE OF PA-DATA OPTIONAL # -- NOTE: not empty --, # req-body [4] KDC-REQ-BODY # } # KDC_REQ_obj = { 'pvno': 5, 'msg-type': msg_type, 'req-body': req_body, } if padata is not None: KDC_REQ_obj['padata'] = padata if asn1Spec is not None: KDC_REQ_decoded = pyasn1_native_decode( KDC_REQ_obj, asn1Spec=asn1Spec) else: KDC_REQ_decoded = None return KDC_REQ_obj, KDC_REQ_decoded def AS_REQ_create(self, padata, # optional kdc_options, # required cname, # optional realm, # required sname, # optional from_time, # optional till_time, # required renew_time, # optional nonce, # required etypes, # required addresses, # optional additional_tickets, native_decoded_only=True, asn1_print=None, hexdump=None): # KDC-REQ ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # pvno [1] INTEGER (5) , # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), # padata [3] SEQUENCE OF PA-DATA OPTIONAL # -- NOTE: not empty --, # req-body [4] KDC-REQ-BODY # } # # KDC-REQ-BODY ::= SEQUENCE { # kdc-options [0] KDCOptions, # cname [1] PrincipalName OPTIONAL # -- Used only in AS-REQ --, # realm [2] Realm # -- Server's realm # -- Also client's in AS-REQ --, # sname [3] PrincipalName OPTIONAL, # from [4] KerberosTime OPTIONAL, # till [5] KerberosTime, # rtime [6] KerberosTime OPTIONAL, # nonce [7] UInt32, # etype [8] SEQUENCE OF Int32 # -- EncryptionType # -- in preference order --, # addresses [9] HostAddresses OPTIONAL, # enc-authorization-data [10] EncryptedData OPTIONAL # -- AuthorizationData --, # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # -- NOTE: not empty # } KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create( kdc_options, cname, realm, sname, from_time, till_time, renew_time, nonce, etypes, addresses, additional_tickets, EncAuthorizationData=None, EncAuthorizationData_key=None, EncAuthorizationData_usage=None, asn1_print=asn1_print, hexdump=hexdump) obj, decoded = self.KDC_REQ_create( msg_type=KRB_AS_REQ, padata=padata, req_body=KDC_REQ_BODY_obj, asn1Spec=krb5_asn1.AS_REQ(), asn1_print=asn1_print, hexdump=hexdump) if native_decoded_only: return decoded return decoded, obj def AP_REQ_create(self, ap_options, ticket, authenticator): # AP-REQ ::= [APPLICATION 14] SEQUENCE { # pvno [0] INTEGER (5), # msg-type [1] INTEGER (14), # ap-options [2] APOptions, # ticket [3] Ticket, # authenticator [4] EncryptedData -- Authenticator # } AP_REQ_obj = { 'pvno': 5, 'msg-type': KRB_AP_REQ, 'ap-options': ap_options, 'ticket': ticket, 'authenticator': authenticator, } return AP_REQ_obj def Authenticator_create( self, crealm, cname, cksum, cusec, ctime, subkey, seq_number, authorization_data): # -- Unencrypted authenticator # Authenticator ::= [APPLICATION 2] SEQUENCE { # authenticator-vno [0] INTEGER (5), # crealm [1] Realm, # cname [2] PrincipalName, # cksum [3] Checksum OPTIONAL, # cusec [4] Microseconds, # ctime [5] KerberosTime, # subkey [6] EncryptionKey OPTIONAL, # seq-number [7] UInt32 OPTIONAL, # authorization-data [8] AuthorizationData OPTIONAL # } Authenticator_obj = { 'authenticator-vno': 5, 'crealm': crealm, 'cname': cname, 'cusec': cusec, 'ctime': ctime, } if cksum is not None: Authenticator_obj['cksum'] = cksum if subkey is not None: Authenticator_obj['subkey'] = subkey if seq_number is not None: Authenticator_obj['seq-number'] = seq_number if authorization_data is not None: Authenticator_obj['authorization-data'] = authorization_data return Authenticator_obj def PKAuthenticator_create(self, cusec, ctime, nonce, *, pa_checksum=None, freshness_token=None, kdc_name=None, kdc_realm=None, win2k_variant=False): if win2k_variant: self.assertIsNone(pa_checksum) self.assertIsNone(freshness_token) self.assertIsNotNone(kdc_name) self.assertIsNotNone(kdc_realm) else: self.assertIsNone(kdc_name) self.assertIsNone(kdc_realm) pk_authenticator_obj = { 'cusec': cusec, 'ctime': ctime, 'nonce': nonce, } if pa_checksum is not None: pk_authenticator_obj['paChecksum'] = pa_checksum if freshness_token is not None: pk_authenticator_obj['freshnessToken'] = freshness_token if kdc_name is not None: pk_authenticator_obj['kdcName'] = kdc_name if kdc_realm is not None: pk_authenticator_obj['kdcRealm'] = kdc_realm return pk_authenticator_obj def TGS_REQ_create(self, padata, # optional cusec, ctime, ticket, kdc_options, # required cname, # optional realm, # required sname, # optional from_time, # optional till_time, # required renew_time, # optional nonce, # required etypes, # required addresses, # optional EncAuthorizationData, EncAuthorizationData_key, additional_tickets, ticket_session_key, authenticator_subkey=None, body_checksum_type=None, native_decoded_only=True, asn1_print=None, hexdump=None): # KDC-REQ ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # pvno [1] INTEGER (5) , # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), # padata [3] SEQUENCE OF PA-DATA OPTIONAL # -- NOTE: not empty --, # req-body [4] KDC-REQ-BODY # } # # KDC-REQ-BODY ::= SEQUENCE { # kdc-options [0] KDCOptions, # cname [1] PrincipalName OPTIONAL # -- Used only in AS-REQ --, # realm [2] Realm # -- Server's realm # -- Also client's in AS-REQ --, # sname [3] PrincipalName OPTIONAL, # from [4] KerberosTime OPTIONAL, # till [5] KerberosTime, # rtime [6] KerberosTime OPTIONAL, # nonce [7] UInt32, # etype [8] SEQUENCE OF Int32 # -- EncryptionType # -- in preference order --, # addresses [9] HostAddresses OPTIONAL, # enc-authorization-data [10] EncryptedData OPTIONAL # -- AuthorizationData --, # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # -- NOTE: not empty # } if authenticator_subkey is not None: EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY else: EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION req_body = self.KDC_REQ_BODY_create( kdc_options=kdc_options, cname=None, realm=realm, sname=sname, from_time=from_time, till_time=till_time, renew_time=renew_time, nonce=nonce, etypes=etypes, addresses=addresses, additional_tickets=additional_tickets, EncAuthorizationData=EncAuthorizationData, EncAuthorizationData_key=EncAuthorizationData_key, EncAuthorizationData_usage=EncAuthorizationData_usage) req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(), asn1_print=asn1_print, hexdump=hexdump) req_body_checksum = self.Checksum_create(ticket_session_key, KU_TGS_REQ_AUTH_CKSUM, req_body_blob, ctype=body_checksum_type) subkey_obj = None if authenticator_subkey is not None: subkey_obj = authenticator_subkey.export_obj() seq_number = random.randint(0, 0xfffffffe) authenticator = self.Authenticator_create( crealm=realm, cname=cname, cksum=req_body_checksum, cusec=cusec, ctime=ctime, subkey=subkey_obj, seq_number=seq_number, authorization_data=None) authenticator = self.der_encode( authenticator, asn1Spec=krb5_asn1.Authenticator(), asn1_print=asn1_print, hexdump=hexdump) authenticator = self.EncryptedData_create( ticket_session_key, KU_TGS_REQ_AUTH, authenticator) ap_options = krb5_asn1.APOptions('0') ap_req = self.AP_REQ_create(ap_options=str(ap_options), ticket=ticket, authenticator=authenticator) ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(), asn1_print=asn1_print, hexdump=hexdump) pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) if padata is not None: padata.append(pa_tgs_req) else: padata = [pa_tgs_req] obj, decoded = self.KDC_REQ_create( msg_type=KRB_TGS_REQ, padata=padata, req_body=req_body, asn1Spec=krb5_asn1.TGS_REQ(), asn1_print=asn1_print, hexdump=hexdump) if native_decoded_only: return decoded return decoded, obj def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None): # PA-S4U2Self ::= SEQUENCE { # name [0] PrincipalName, # realm [1] Realm, # cksum [2] Checksum, # auth [3] GeneralString # } cksum_data = name['name-type'].to_bytes(4, byteorder='little') for n in name['name-string']: cksum_data += n.encode() cksum_data += realm.encode() cksum_data += "Kerberos".encode() cksum = self.Checksum_create(tgt_session_key, KU_NON_KERB_CKSUM_SALT, cksum_data, ctype) PA_S4U2Self_obj = { 'name': name, 'realm': realm, 'cksum': cksum, 'auth': "Kerberos", } pa_s4u2self = self.der_encode( PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self()) return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self) def ChangePasswdDataMS_create(self, new_password, target_princ=None, target_realm=None): ChangePasswdDataMS_obj = { 'newpasswd': new_password, } if target_princ is not None: ChangePasswdDataMS_obj['targname'] = target_princ if target_realm is not None: ChangePasswdDataMS_obj['targrealm'] = target_realm change_password_data = self.der_encode( ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS()) return change_password_data def KRB_PRIV_create(self, subkey, user_data, s_address, timestamp=None, usec=None, seq_number=None, r_address=None): EncKrbPrivPart_obj = { 'user-data': user_data, 's-address': s_address, } if timestamp is not None: EncKrbPrivPart_obj['timestamp'] = timestamp if usec is not None: EncKrbPrivPart_obj['usec'] = usec if seq_number is not None: EncKrbPrivPart_obj['seq-number'] = seq_number if r_address is not None: EncKrbPrivPart_obj['r-address'] = r_address enc_krb_priv_part = self.der_encode( EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart()) enc_data = self.EncryptedData_create(subkey, KU_KRB_PRIV, enc_krb_priv_part) KRB_PRIV_obj = { 'pvno': 5, 'msg-type': KRB_PRIV, 'enc-part': enc_data, } krb_priv = self.der_encode( KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV()) return krb_priv def ContentInfo_create(self, content_type, content): content_info_obj = { 'contentType': content_type, 'content': content, } return content_info_obj def EncapsulatedContentInfo_create(self, content_type, content): encapsulated_content_info_obj = { 'eContentType': content_type, 'eContent': content, } return encapsulated_content_info_obj def SignedData_create(self, digest_algorithms, encap_content_info, signer_infos, *, version=None, certificates=None, crls=None): def is_cert_version_present(version): return certificates is not None and any( version in cert for cert in certificates) def is_crl_version_present(version): return crls is not None and any( version in crl for crl in crls) def is_signer_info_version_present(version): return signer_infos is not None and any( signer_info['version'] == version for signer_info in signer_infos) def data_version(): # per RFC5652 5.1: if is_cert_version_present('other') or ( is_crl_version_present('other')): return 5 if is_cert_version_present('v2AttrCert'): return 4 if is_cert_version_present('v1AttrCert') or ( is_signer_info_version_present(3)) or ( encap_content_info['eContentType'] != krb5_asn1.id_data ): return 3 return 1 if version is None: version = data_version() signed_data_obj = { 'version': version, 'digestAlgorithms': digest_algorithms, 'encapContentInfo': encap_content_info, 'signerInfos': signer_infos, } if certificates is not None: signed_data_obj['certificates'] = certificates if crls is not None: signed_data_obj['crls'] = crls return signed_data_obj def AuthPack_create(self, pk_authenticator, *, client_public_value=None, supported_cms_types=None, client_dh_nonce=None, win2k_variant=False): if win2k_variant: self.assertIsNone(supported_cms_types) self.assertIsNone(client_dh_nonce) auth_pack_obj = { 'pkAuthenticator': pk_authenticator, } if client_public_value is not None: auth_pack_obj['clientPublicValue'] = client_public_value if supported_cms_types is not None: auth_pack_obj['supportedCMSTypes'] = supported_cms_types if client_dh_nonce is not None: auth_pack_obj['clientDHNonce'] = client_dh_nonce return auth_pack_obj def PK_AS_REQ_create(self, signed_auth_pack, *, trusted_certifiers=None, kdc_pk_id=None, kdc_cert=None, encryption_cert=None, win2k_variant=False): if win2k_variant: self.assertIsNone(kdc_pk_id) asn1_spec = krb5_asn1.PA_PK_AS_REQ_Win2k else: self.assertIsNone(kdc_cert) self.assertIsNone(encryption_cert) asn1_spec = krb5_asn1.PA_PK_AS_REQ content_info_obj = self.ContentInfo_create( krb5_asn1.id_signedData, signed_auth_pack) content_info = self.der_encode(content_info_obj, asn1Spec=krb5_asn1.ContentInfo()) pk_as_req_obj = { 'signedAuthPack': content_info, } if trusted_certifiers is not None: pk_as_req_obj['trustedCertifiers'] = trusted_certifiers if kdc_pk_id is not None: pk_as_req_obj['kdcPkId'] = kdc_pk_id if kdc_cert is not None: pk_as_req_obj['kdcCert'] = kdc_cert if encryption_cert is not None: pk_as_req_obj['encryptionCert'] = encryption_cert return self.der_encode(pk_as_req_obj, asn1Spec=asn1_spec()) def SignerInfo_create(self, signer_id, digest_algorithm, signature_algorithm, signature, *, version=None, signed_attrs=None, unsigned_attrs=None): if version is None: # per RFC5652 5.3: if 'issuerAndSerialNumber' in signer_id: version = 1 elif 'subjectKeyIdentifier' in signer_id: version = 3 else: self.fail(f'unknown signer ID version ({signer_id})') signer_info_obj = { 'version': version, 'sid': signer_id, 'digestAlgorithm': digest_algorithm, 'signatureAlgorithm': signature_algorithm, 'signature': signature, } if signed_attrs is not None: signer_info_obj['signedAttrs'] = signed_attrs if unsigned_attrs is not None: signer_info_obj['unsignedAttrs'] = unsigned_attrs return signer_info_obj def SignerIdentifier_create(self, *, issuer_and_serial_number=None, subject_key_id=None): if issuer_and_serial_number is not None: return {'issuerAndSerialNumber': issuer_and_serial_number} if subject_key_id is not None: return {'subjectKeyIdentifier': subject_key_id} self.fail('identifier not specified') def AlgorithmIdentifier_create(self, algorithm, *, parameters=None): algorithm_id_obj = { 'algorithm': algorithm, } if parameters is not None: algorithm_id_obj['parameters'] = parameters return algorithm_id_obj def SubjectPublicKeyInfo_create(self, algorithm, public_key): return { 'algorithm': algorithm, 'subjectPublicKey': public_key, } def ValidationParms_create(self, seed, pgen_counter): return { 'seed': seed, 'pgenCounter': pgen_counter, } def DomainParameters_create(self, p, g, *, q=None, j=None, validation_parms=None): domain_params_obj = { 'p': p, 'g': g, } if q is not None: domain_params_obj['q'] = q if j is not None: domain_params_obj['j'] = j if validation_parms is not None: domain_params_obj['validationParms'] = validation_parms return domain_params_obj def length_in_bytes(self, value): """Return the length in bytes of an integer once it is encoded as bytes.""" self.assertGreaterEqual(value, 0, 'value must be positive') self.assertIsInstance(value, int) length_in_bits = max(1, math.log2(value + 1)) length_in_bytes = math.ceil(length_in_bits / 8) return length_in_bytes def bytes_from_int(self, value, *, length=None): """Return an integer encoded big-endian into bytes of an optionally specified length. """ if length is None: length = self.length_in_bytes(value) return value.to_bytes(length, 'big') def int_from_bytes(self, data): """Return an integer decoded from bytes in big-endian format.""" return int.from_bytes(data, 'big') def int_from_bit_string(self, string): """Return an integer decoded from a bitstring.""" return int(string, base=2) def bit_string_from_int(self, value): """Return a bitstring encoding of an integer.""" string = f'{value:b}' # The bitstring must be padded to a multiple of 8 bits in length, or # pyasn1 will interpret it incorrectly (as if the padding bits were # present, but on the wrong end). length = len(string) padding_len = math.ceil(length / 8) * 8 - length return '0' * padding_len + string def bit_string_from_bytes(self, data): """Return a bitstring encoding of bytes in big-endian format.""" value = self.int_from_bytes(data) return self.bit_string_from_int(value) def bytes_from_bit_string(self, string): """Return big-endian format bytes encoded from a bitstring.""" value = self.int_from_bit_string(string) length = math.ceil(len(string) / 8) return value.to_bytes(length, 'big') def asn1_length(self, data): """Return the ASN.1 encoding of the length of some data.""" length = len(data) self.assertGreater(length, 0) if length < 0x80: return bytes([length]) encoding_len = self.length_in_bytes(length) self.assertLess(encoding_len, 0x80, 'item is too long to be ASN.1 encoded') data = self.bytes_from_int(length, length=encoding_len) return bytes([0x80 | encoding_len]) + data @staticmethod def octetstring2key(x, enctype): """This implements the function defined in RFC4556 3.2.3.1 “Using Diffie-Hellman Key Exchange”.""" seedsize = kcrypto.seedsize(enctype) seed = b'' # A counter that cycles through the bytes 0x00–0xff. counter = itertools.cycle(map(lambda x: bytes([x]), range(256))) while len(seed) < seedsize: digest = hashes.Hash(hashes.SHA1(), default_backend()) digest.update(next(counter) + x) seed += digest.finalize() key = kcrypto.random_to_key(enctype, seed[:seedsize]) return RodcPacEncryptionKey(key, kvno=None) def unpad(self, data): """Return unpadded data.""" padding_len = data[-1] expected_padding = bytes([padding_len]) * padding_len self.assertEqual(expected_padding, data[-padding_len:], 'invalid padding bytes') return data[:-padding_len] def try_decode(self, data, module=None): """Try to decode some data of unknown type with various known ASN.1 schemata (optionally restricted to those from a particular module) and print any results that seem promising. For use when debugging. """ if module is None: # Try a couple of known ASN.1 modules. self.try_decode(data, krb5_asn1) self.try_decode(data, pyasn1.type.univ) # It’s helpful to stop and give the user a chance to examine the # results. self.fail('decoding done') names = dir(module) for name in names: item = getattr(module, name) if not callable(item): continue try: decoded = self.der_decode(data, asn1Spec=item()) except Exception: # Initiating the schema or decoding the ASN.1 failed for # whatever reason. pass else: # Decoding succeeded: print the structure to be examined. print(f'\t{name}') pprint(decoded) def cipher_from_algorithm(self, algorithm): if algorithm == str(krb5_asn1.aes256_CBC_PAD): return algorithms.AES if algorithm == str(krb5_asn1.des_EDE3_CBC): return algorithms.TripleDES self.fail(f'unknown cipher algorithm {algorithm}') def hash_from_algorithm(self, algorithm): # Let someone pass in an ObjectIdentifier. algorithm = str(algorithm) if algorithm == str(krb5_asn1.id_sha1): return hashes.SHA1 if algorithm == str(krb5_asn1.sha1WithRSAEncryption): return hashes.SHA1 if algorithm == str(krb5_asn1.rsaEncryption): return hashes.SHA1 if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption): return hashes.SHA256 if algorithm == str(krb5_asn1.id_sha512): return hashes.SHA512 self.fail(f'unknown hash algorithm {algorithm}') def hash_from_algorithm_id(self, algorithm_id): self.assertIsInstance(algorithm_id, dict) hash = self.hash_from_algorithm(algorithm_id['algorithm']) parameters = algorithm_id.get('parameters') if self.strict_checking: self.assertIsNotNone(parameters) if parameters is not None: self.assertEqual(b'\x05\x00', parameters) return hash def create_freshness_token(self, epoch=None, *, offset=None, krbtgt_creds=None): timestamp, usec = self.get_KerberosTimeWithUsec(epoch, offset) # Encode the freshness token as PA-ENC-TS-ENC. ts_enc = self.PA_ENC_TS_ENC_create(timestamp, usec) ts_enc = self.der_encode(ts_enc, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) if krbtgt_creds is None: krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) # Encrypt the freshness token. freshness = self.EncryptedData_create(krbtgt_key, KU_AS_FRESHNESS, ts_enc) freshness_token = self.der_encode(freshness, asn1Spec=krb5_asn1.EncryptedData()) # Prepend a couple of zero bytes. freshness_token = bytes(2) + freshness_token return freshness_token def kpasswd_create(self, subkey, user_data, version, seq_number, ap_req, local_address, remote_address): self.assertIsNotNone(self.s, 'call self.connect() first') timestamp, usec = self.get_KerberosTimeWithUsec() krb_priv = self.KRB_PRIV_create(subkey, user_data, s_address=local_address, timestamp=timestamp, usec=usec, seq_number=seq_number, r_address=remote_address) size = 6 + len(ap_req) + len(krb_priv) self.assertLess(size, 0x10000) msg = bytearray() msg.append(size >> 8) msg.append(size & 0xff) msg.append(version >> 8) msg.append(version & 0xff) msg.append(len(ap_req) >> 8) msg.append(len(ap_req) & 0xff) # Note: for sets, there could be a little-endian four-byte length here. msg.extend(ap_req) msg.extend(krb_priv) return msg def get_enc_part(self, obj, key, usage): self.assertElementEqual(obj, 'pvno', 5) enc_part = obj['enc-part'] self.assertElementEqual(enc_part, 'etype', key.etype) self.assertElementKVNO(enc_part, 'kvno', key.kvno) enc_part = key.decrypt(usage, enc_part['cipher']) return enc_part def kpasswd_exchange(self, ticket, new_password, expected_code, expected_msg, mode, target_princ=None, target_realm=None, ap_options=None, send_seq_number=True): if mode is self.KpasswdMode.SET: version = 0xff80 user_data = self.ChangePasswdDataMS_create(new_password, target_princ, target_realm) elif mode is self.KpasswdMode.CHANGE: self.assertIsNone(target_princ, 'target_princ only valid for pw set') self.assertIsNone(target_realm, 'target_realm only valid for pw set') version = 1 user_data = new_password.encode('utf-8') else: self.fail(f'invalid mode {mode}') subkey = self.RandomKey(kcrypto.Enctype.AES256) if ap_options is None: ap_options = '0' ap_options = str(krb5_asn1.APOptions(ap_options)) kdc_exchange_dict = { 'tgt': ticket, 'authenticator_subkey': subkey, 'auth_data': None, 'ap_options': ap_options, } if send_seq_number: seq_number = random.randint(0, 0xfffffffe) else: seq_number = None ap_req = self.generate_ap_req(kdc_exchange_dict, None, req_body=None, armor=False, usage=KU_AP_REQ_AUTH, seq_number=seq_number) self.connect(self.host, port=464) self.assertIsNotNone(self.s) family = self.s.family if family == socket.AF_INET: addr_type = 2 # IPv4 elif family == socket.AF_INET6: addr_type = 24 # IPv6 else: self.fail(f'unknown family {family}') def create_address(ip): return { 'addr-type': addr_type, 'address': socket.inet_pton(family, ip), } local_ip = self.s.getsockname()[0] local_address = create_address(local_ip) # remote_ip = self.s.getpeername()[0] # remote_address = create_address(remote_ip) # TODO: due to a bug (?), MIT Kerberos will not accept the request # unless r-address is set to our _local_ address. Heimdal, on the other # hand, requires the r-address is set to the remote address (as # expected). To avoid problems, avoid sending r-address for now. remote_address = None msg = self.kpasswd_create(subkey, user_data, version, seq_number, ap_req, local_address, remote_address) self.send_msg(msg) rep_pdu = self.recv_pdu_raw() self._disconnect('transaction done') self.assertIsNotNone(rep_pdu) header = rep_pdu[:6] reply = rep_pdu[6:] reply_len = (header[0] << 8) | header[1] reply_version = (header[2] << 8) | header[3] ap_rep_len = (header[4] << 8) | header[5] self.assertEqual(reply_len, len(rep_pdu)) self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW self.assertLess(ap_rep_len, reply_len) self.assertNotEqual(0x7e, rep_pdu[1]) self.assertNotEqual(0x5e, rep_pdu[1]) if ap_rep_len: # We received an AP-REQ and KRB-PRIV as a response. This may or may # not indicate an error, depending on the status code. ap_rep = reply[:ap_rep_len] krb_priv = reply[ap_rep_len:] key = ticket.session_key ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP()) self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP) enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART) enc_part = self.der_decode( enc_part, asn1Spec=krb5_asn1.EncAPRepPart()) self.assertElementPresent(enc_part, 'ctime') self.assertElementPresent(enc_part, 'cusec') # self.assertElementMissing(enc_part, 'subkey') # TODO # self.assertElementPresent(enc_part, 'seq-number') # TODO try: krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV()) except PyAsn1Error: self.fail() self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV) priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV) priv_enc_part = self.der_decode( priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart()) self.assertElementMissing(priv_enc_part, 'timestamp') self.assertElementMissing(priv_enc_part, 'usec') # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO # self.assertElementMissing(priv_enc_part, 'r-address') # TODO result_data = priv_enc_part['user-data'] else: # We received a KRB-ERROR as a response, indicating an error. krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR()) sname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=['kadmin', 'changepw']) realm = self.get_krbtgt_creds().get_realm().upper() self.assertElementEqual(krb_error, 'pvno', 5) self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR) self.assertElementMissing(krb_error, 'ctime') self.assertElementMissing(krb_error, 'usec') self.assertElementPresent(krb_error, 'stime') self.assertElementPresent(krb_error, 'susec') error_code = krb_error['error-code'] if isinstance(expected_code, int): self.assertEqual(error_code, expected_code) else: self.assertIn(error_code, expected_code) self.assertElementMissing(krb_error, 'crealm') self.assertElementMissing(krb_error, 'cname') self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8')) self.assertElementEqualPrincipal(krb_error, 'sname', sname) self.assertElementMissing(krb_error, 'e-text') result_data = krb_error['e-data'] status = result_data[:2] message = result_data[2:] status_code = (status[0] << 8) | status[1] if isinstance(expected_code, int): self.assertEqual(status_code, expected_code) else: self.assertIn(status_code, expected_code) if not message: self.assertEqual(0, status_code, 'got an error result, but no message') return # Check the first character of the message. if message[0]: if isinstance(expected_msg, bytes): self.assertEqual(message, expected_msg) else: self.assertIn(message, expected_msg) else: # We got AD password policy information. self.assertEqual(30, len(message)) (empty_bytes, min_length, history_length, properties, expire_time, min_age) = struct.unpack('>HIIIQQ', message) def _generic_kdc_exchange(self, kdc_exchange_dict, # required cname=None, # optional realm=None, # required sname=None, # optional from_time=None, # optional till_time=None, # required renew_time=None, # optional etypes=None, # required addresses=None, # optional additional_tickets=None, # optional EncAuthorizationData=None, # optional EncAuthorizationData_key=None, # optional EncAuthorizationData_usage=None): # optional check_error_fn = kdc_exchange_dict['check_error_fn'] check_rep_fn = kdc_exchange_dict['check_rep_fn'] generate_fast_fn = kdc_exchange_dict['generate_fast_fn'] generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn'] generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn'] generate_padata_fn = kdc_exchange_dict['generate_padata_fn'] callback_dict = kdc_exchange_dict['callback_dict'] req_msg_type = kdc_exchange_dict['req_msg_type'] req_asn1Spec = kdc_exchange_dict['req_asn1Spec'] rep_msg_type = kdc_exchange_dict['rep_msg_type'] expected_error_mode = kdc_exchange_dict['expected_error_mode'] kdc_options = kdc_exchange_dict['kdc_options'] pac_request = kdc_exchange_dict['pac_request'] pac_options = kdc_exchange_dict['pac_options'] # Parameters specific to the inner request body inner_req = kdc_exchange_dict['inner_req'] # Parameters specific to the outer request body outer_req = kdc_exchange_dict['outer_req'] if till_time is None: till_time = self.get_KerberosTime(offset=36000) if 'nonce' in kdc_exchange_dict: nonce = kdc_exchange_dict['nonce'] else: nonce = self.get_Nonce() kdc_exchange_dict['nonce'] = nonce req_body = self.KDC_REQ_BODY_create( kdc_options=kdc_options, cname=cname, realm=realm, sname=sname, from_time=from_time, till_time=till_time, renew_time=renew_time, nonce=nonce, etypes=etypes, addresses=addresses, additional_tickets=additional_tickets, EncAuthorizationData=EncAuthorizationData, EncAuthorizationData_key=EncAuthorizationData_key, EncAuthorizationData_usage=EncAuthorizationData_usage) inner_req_body = dict(req_body) if inner_req is not None: for key, value in inner_req.items(): if value is not None: inner_req_body[key] = value else: del inner_req_body[key] if outer_req is not None: for key, value in outer_req.items(): if value is not None: req_body[key] = value else: del req_body[key] additional_padata = [] if pac_request is not None: pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request) additional_padata.append(pa_pac_request) if pac_options is not None: pa_pac_options = self.get_pa_pac_options(pac_options) additional_padata.append(pa_pac_options) if req_msg_type == KRB_AS_REQ: tgs_req = None tgs_req_padata = None else: self.assertEqual(KRB_TGS_REQ, req_msg_type) tgs_req = self.generate_ap_req(kdc_exchange_dict, callback_dict, req_body, armor=False) tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req) if generate_fast_padata_fn is not None: self.assertIsNotNone(generate_fast_fn) # This can alter req_body... fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict, callback_dict, req_body) else: fast_padata = [] if generate_fast_armor_fn is not None: self.assertIsNotNone(generate_fast_fn) fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict, callback_dict, None, armor=True) fast_armor_type = kdc_exchange_dict['fast_armor_type'] fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type, fast_ap_req) else: fast_armor = None if generate_padata_fn is not None: # This can alter req_body... outer_padata, req_body = generate_padata_fn(kdc_exchange_dict, callback_dict, req_body) self.assertIsNotNone(outer_padata) self.assertNotIn(PADATA_KDC_REQ, [pa['padata-type'] for pa in outer_padata], 'Don\'t create TGS-REQ manually') else: outer_padata = None if generate_fast_fn is not None: armor_key = kdc_exchange_dict['armor_key'] self.assertIsNotNone(armor_key) if req_msg_type == KRB_AS_REQ: checksum_blob = self.der_encode( req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) else: self.assertEqual(KRB_TGS_REQ, req_msg_type) checksum_blob = tgs_req checksum = self.Checksum_create(armor_key, KU_FAST_REQ_CHKSUM, checksum_blob) fast_padata += additional_padata fast = generate_fast_fn(kdc_exchange_dict, callback_dict, inner_req_body, fast_padata, fast_armor, checksum) else: fast = None padata = [] if tgs_req_padata is not None: padata.append(tgs_req_padata) if fast is not None: padata.append(fast) if outer_padata is not None: padata += outer_padata if fast is None: padata += additional_padata if not padata: padata = None kdc_exchange_dict['req_padata'] = padata kdc_exchange_dict['fast_padata'] = fast_padata kdc_exchange_dict['req_body'] = inner_req_body req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type, padata=padata, req_body=req_body, asn1Spec=req_asn1Spec()) kdc_exchange_dict['req_obj'] = req_obj to_rodc = kdc_exchange_dict['to_rodc'] rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc) self.assertIsNotNone(rep) msg_type = self.getElementValue(rep, 'msg-type') self.assertIsNotNone(msg_type) expected_msg_type = None if check_error_fn is not None: expected_msg_type = KRB_ERROR self.assertIsNone(check_rep_fn) self.assertNotEqual(0, len(expected_error_mode)) self.assertNotIn(0, expected_error_mode) if check_rep_fn is not None: expected_msg_type = rep_msg_type self.assertIsNone(check_error_fn) self.assertEqual(0, len(expected_error_mode)) self.assertIsNotNone(expected_msg_type) if msg_type == KRB_ERROR: error_code = self.getElementValue(rep, 'error-code') fail_msg = f'Got unexpected error: {error_code}' else: fail_msg = f'Expected to fail with error: {expected_error_mode}' self.assertEqual(msg_type, expected_msg_type, fail_msg) if msg_type == KRB_ERROR: return check_error_fn(kdc_exchange_dict, callback_dict, rep) return check_rep_fn(kdc_exchange_dict, callback_dict, rep) def as_exchange_dict(self, creds=None, client_cert=None, expected_crealm=None, expected_cname=None, expected_anon=False, expected_srealm=None, expected_sname=None, expected_account_name=None, expected_groups=None, unexpected_groups=None, expected_upn_name=None, expected_sid=None, expected_requester_sid=None, expected_domain_sid=None, expected_device_domain_sid=None, expected_supported_etypes=None, expected_flags=None, unexpected_flags=None, ticket_decryption_key=None, expect_ticket_checksum=None, expect_full_checksum=None, generate_fast_fn=None, generate_fast_armor_fn=None, generate_fast_padata_fn=None, fast_armor_type=FX_FAST_ARMOR_AP_REQUEST, generate_padata_fn=None, check_error_fn=None, check_rep_fn=None, check_kdc_private_fn=None, check_patypes=True, callback_dict=None, expected_error_mode=0, expect_status=None, expected_status=None, expected_salt=None, authenticator_subkey=None, preauth_key=None, armor_key=None, armor_tgt=None, armor_subkey=None, auth_data=None, kdc_options='', inner_req=None, outer_req=None, pac_request=None, pac_options=None, ap_options=None, fast_ap_options=None, strict_edata_checking=True, using_pkinit=PkInit.NOT_USED, pk_nonce=None, expect_edata=None, expect_pac=True, expect_client_claims=None, expect_device_info=None, expect_device_claims=None, expect_upn_dns_info_ex=None, expect_pac_attrs=None, expect_pac_attrs_pac_request=None, expect_requester_sid=None, rc4_support=True, expected_client_claims=None, unexpected_client_claims=None, expected_device_claims=None, unexpected_device_claims=None, expect_resource_groups_flag=None, expected_device_groups=None, expected_extra_pac_buffers=None, to_rodc=False): if expected_error_mode == 0: expected_error_mode = () elif not isinstance(expected_error_mode, collections.abc.Container): expected_error_mode = (expected_error_mode,) kdc_exchange_dict = { 'req_msg_type': KRB_AS_REQ, 'req_asn1Spec': krb5_asn1.AS_REQ, 'rep_msg_type': KRB_AS_REP, 'rep_asn1Spec': krb5_asn1.AS_REP, 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart, 'creds': creds, 'client_cert': client_cert, 'expected_crealm': expected_crealm, 'expected_cname': expected_cname, 'expected_anon': expected_anon, 'expected_srealm': expected_srealm, 'expected_sname': expected_sname, 'expected_account_name': expected_account_name, 'expected_groups': expected_groups, 'unexpected_groups': unexpected_groups, 'expected_upn_name': expected_upn_name, 'expected_sid': expected_sid, 'expected_requester_sid': expected_requester_sid, 'expected_domain_sid': expected_domain_sid, 'expected_device_domain_sid': expected_device_domain_sid, 'expected_supported_etypes': expected_supported_etypes, 'expected_flags': expected_flags, 'unexpected_flags': unexpected_flags, 'ticket_decryption_key': ticket_decryption_key, 'expect_ticket_checksum': expect_ticket_checksum, 'expect_full_checksum': expect_full_checksum, 'generate_fast_fn': generate_fast_fn, 'generate_fast_armor_fn': generate_fast_armor_fn, 'generate_fast_padata_fn': generate_fast_padata_fn, 'fast_armor_type': fast_armor_type, 'generate_padata_fn': generate_padata_fn, 'check_error_fn': check_error_fn, 'check_rep_fn': check_rep_fn, 'check_kdc_private_fn': check_kdc_private_fn, 'check_patypes': check_patypes, 'callback_dict': callback_dict, 'expected_error_mode': expected_error_mode, 'expect_status': expect_status, 'expected_status': expected_status, 'expected_salt': expected_salt, 'authenticator_subkey': authenticator_subkey, 'preauth_key': preauth_key, 'armor_key': armor_key, 'armor_tgt': armor_tgt, 'armor_subkey': armor_subkey, 'auth_data': auth_data, 'kdc_options': kdc_options, 'inner_req': inner_req, 'outer_req': outer_req, 'pac_request': pac_request, 'pac_options': pac_options, 'ap_options': ap_options, 'fast_ap_options': fast_ap_options, 'strict_edata_checking': strict_edata_checking, 'using_pkinit': using_pkinit, 'pk_nonce': pk_nonce, 'expect_edata': expect_edata, 'expect_pac': expect_pac, 'expect_client_claims': expect_client_claims, 'expect_device_info': expect_device_info, 'expect_device_claims': expect_device_claims, 'expect_upn_dns_info_ex': expect_upn_dns_info_ex, 'expect_pac_attrs': expect_pac_attrs, 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request, 'expect_requester_sid': expect_requester_sid, 'rc4_support': rc4_support, 'expected_client_claims': expected_client_claims, 'unexpected_client_claims': unexpected_client_claims, 'expected_device_claims': expected_device_claims, 'unexpected_device_claims': unexpected_device_claims, 'expect_resource_groups_flag': expect_resource_groups_flag, 'expected_device_groups': expected_device_groups, 'expected_extra_pac_buffers': expected_extra_pac_buffers, 'to_rodc': to_rodc } if callback_dict is None: callback_dict = {} return kdc_exchange_dict def tgs_exchange_dict(self, creds=None, expected_crealm=None, expected_cname=None, expected_anon=False, expected_srealm=None, expected_sname=None, expected_account_name=None, expected_groups=None, unexpected_groups=None, expected_upn_name=None, expected_sid=None, expected_requester_sid=None, expected_domain_sid=None, expected_device_domain_sid=None, expected_supported_etypes=None, expected_flags=None, unexpected_flags=None, ticket_decryption_key=None, expect_ticket_checksum=None, expect_full_checksum=None, generate_fast_fn=None, generate_fast_armor_fn=None, generate_fast_padata_fn=None, fast_armor_type=FX_FAST_ARMOR_AP_REQUEST, generate_padata_fn=None, check_error_fn=None, check_rep_fn=None, check_kdc_private_fn=None, check_patypes=True, expected_error_mode=0, expect_status=None, expected_status=None, callback_dict=None, tgt=None, armor_key=None, armor_tgt=None, armor_subkey=None, authenticator_subkey=None, auth_data=None, body_checksum_type=None, kdc_options='', inner_req=None, outer_req=None, pac_request=None, pac_options=None, ap_options=None, fast_ap_options=None, strict_edata_checking=True, expect_edata=None, expect_pac=True, expect_client_claims=None, expect_device_info=None, expect_device_claims=None, expect_upn_dns_info_ex=None, expect_pac_attrs=None, expect_pac_attrs_pac_request=None, expect_requester_sid=None, expected_proxy_target=None, expected_transited_services=None, rc4_support=True, expected_client_claims=None, unexpected_client_claims=None, expected_device_claims=None, unexpected_device_claims=None, expect_resource_groups_flag=None, expected_device_groups=None, expected_extra_pac_buffers=None, to_rodc=False): if expected_error_mode == 0: expected_error_mode = () elif not isinstance(expected_error_mode, collections.abc.Container): expected_error_mode = (expected_error_mode,) kdc_exchange_dict = { 'req_msg_type': KRB_TGS_REQ, 'req_asn1Spec': krb5_asn1.TGS_REQ, 'rep_msg_type': KRB_TGS_REP, 'rep_asn1Spec': krb5_asn1.TGS_REP, 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart, 'creds': creds, 'expected_crealm': expected_crealm, 'expected_cname': expected_cname, 'expected_anon': expected_anon, 'expected_srealm': expected_srealm, 'expected_sname': expected_sname, 'expected_account_name': expected_account_name, 'expected_groups': expected_groups, 'unexpected_groups': unexpected_groups, 'expected_upn_name': expected_upn_name, 'expected_sid': expected_sid, 'expected_requester_sid': expected_requester_sid, 'expected_domain_sid': expected_domain_sid, 'expected_device_domain_sid': expected_device_domain_sid, 'expected_supported_etypes': expected_supported_etypes, 'expected_flags': expected_flags, 'unexpected_flags': unexpected_flags, 'ticket_decryption_key': ticket_decryption_key, 'expect_ticket_checksum': expect_ticket_checksum, 'expect_full_checksum': expect_full_checksum, 'generate_fast_fn': generate_fast_fn, 'generate_fast_armor_fn': generate_fast_armor_fn, 'generate_fast_padata_fn': generate_fast_padata_fn, 'fast_armor_type': fast_armor_type, 'generate_padata_fn': generate_padata_fn, 'check_error_fn': check_error_fn, 'check_rep_fn': check_rep_fn, 'check_kdc_private_fn': check_kdc_private_fn, 'check_patypes': check_patypes, 'callback_dict': callback_dict, 'expected_error_mode': expected_error_mode, 'expect_status': expect_status, 'expected_status': expected_status, 'tgt': tgt, 'body_checksum_type': body_checksum_type, 'armor_key': armor_key, 'armor_tgt': armor_tgt, 'armor_subkey': armor_subkey, 'auth_data': auth_data, 'authenticator_subkey': authenticator_subkey, 'kdc_options': kdc_options, 'inner_req': inner_req, 'outer_req': outer_req, 'pac_request': pac_request, 'pac_options': pac_options, 'ap_options': ap_options, 'fast_ap_options': fast_ap_options, 'strict_edata_checking': strict_edata_checking, 'expect_edata': expect_edata, 'expect_pac': expect_pac, 'expect_client_claims': expect_client_claims, 'expect_device_info': expect_device_info, 'expect_device_claims': expect_device_claims, 'expect_upn_dns_info_ex': expect_upn_dns_info_ex, 'expect_pac_attrs': expect_pac_attrs, 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request, 'expect_requester_sid': expect_requester_sid, 'expected_proxy_target': expected_proxy_target, 'expected_transited_services': expected_transited_services, 'rc4_support': rc4_support, 'expected_client_claims': expected_client_claims, 'unexpected_client_claims': unexpected_client_claims, 'expected_device_claims': expected_device_claims, 'unexpected_device_claims': unexpected_device_claims, 'expect_resource_groups_flag': expect_resource_groups_flag, 'expected_device_groups': expected_device_groups, 'expected_extra_pac_buffers': expected_extra_pac_buffers, 'to_rodc': to_rodc } if callback_dict is None: callback_dict = {} return kdc_exchange_dict def generic_check_kdc_rep(self, kdc_exchange_dict, callback_dict, rep): expected_crealm = kdc_exchange_dict['expected_crealm'] expected_anon = kdc_exchange_dict['expected_anon'] expected_srealm = kdc_exchange_dict['expected_srealm'] expected_sname = kdc_exchange_dict['expected_sname'] ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key'] check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn'] rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec'] msg_type = kdc_exchange_dict['rep_msg_type'] armor_key = kdc_exchange_dict['armor_key'] self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP padata = self.getElementValue(rep, 'padata') if self.strict_checking: self.assertElementEqualUTF8(rep, 'crealm', expected_crealm) if self.cname_checking: if expected_anon: expected_cname = self.PrincipalName_create( name_type=NT_WELLKNOWN, names=['WELLKNOWN', 'ANONYMOUS']) else: expected_cname = kdc_exchange_dict['expected_cname'] self.assertElementEqualPrincipal(rep, 'cname', expected_cname) self.assertElementPresent(rep, 'ticket') ticket = self.getElementValue(rep, 'ticket') ticket_encpart = None ticket_cipher = None self.assertIsNotNone(ticket) if ticket is not None: # Never None, but gives indentation self.assertElementEqual(ticket, 'tkt-vno', 5) self.assertElementEqualUTF8(ticket, 'realm', expected_srealm) self.assertElementEqualPrincipal(ticket, 'sname', expected_sname) self.assertElementPresent(ticket, 'enc-part') ticket_encpart = self.getElementValue(ticket, 'enc-part') self.assertIsNotNone(ticket_encpart) if ticket_encpart is not None: # Never None, but gives indentation self.assertElementPresent(ticket_encpart, 'etype') kdc_options = kdc_exchange_dict['kdc_options'] pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1 expect_kvno = (pos >= len(kdc_options) or kdc_options[pos] != '1') if expect_kvno: # 'unspecified' means present, with any value != 0 self.assertElementKVNO(ticket_encpart, 'kvno', self.unspecified_kvno) else: # For user-to-user, don't expect a kvno. self.assertElementMissing(ticket_encpart, 'kvno') self.assertElementPresent(ticket_encpart, 'cipher') ticket_cipher = self.getElementValue(ticket_encpart, 'cipher') self.assertElementPresent(rep, 'enc-part') encpart = self.getElementValue(rep, 'enc-part') encpart_cipher = None self.assertIsNotNone(encpart) if encpart is not None: # Never None, but gives indentation self.assertElementPresent(encpart, 'etype') self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect') self.assertElementPresent(encpart, 'cipher') encpart_cipher = self.getElementValue(encpart, 'cipher') if self.padata_checking: self.check_reply_padata(kdc_exchange_dict, callback_dict, encpart, padata) ticket_checksum = None # Get the decryption key for the encrypted part encpart_decryption_key, encpart_decryption_usage = ( self.get_preauth_key(kdc_exchange_dict)) pa_dict = self.get_pa_dict(padata) pk_as_rep = pa_dict.get(PADATA_PK_AS_REP) if pk_as_rep is not None: pk_as_rep_asn1_spec = krb5_asn1.PA_PK_AS_REP reply_key_pack_asn1_spec = krb5_asn1.ReplyKeyPack pk_win2k = False else: pk_as_rep = pa_dict.get(PADATA_PK_AS_REP_19) pk_as_rep_asn1_spec = krb5_asn1.PA_PK_AS_REP_Win2k reply_key_pack_asn1_spec = krb5_asn1.ReplyKeyPack_Win2k pk_win2k = True if pk_as_rep is not None: pk_as_rep = self.der_decode(pk_as_rep, asn1Spec=pk_as_rep_asn1_spec()) using_pkinit = kdc_exchange_dict['using_pkinit'] if using_pkinit is PkInit.PUBLIC_KEY: content_info = self.der_decode( pk_as_rep['encKeyPack'], asn1Spec=krb5_asn1.ContentInfo()) self.assertEqual(str(krb5_asn1.id_envelopedData), content_info['contentType']) content = self.der_decode(content_info['content'], asn1Spec=krb5_asn1.EnvelopedData()) self.assertEqual(0, content['version']) originator_info = content['originatorInfo'] self.assertFalse(originator_info.get('certs')) self.assertFalse(originator_info.get('crls')) self.assertFalse(content.get('unprotectedAttrs')) encrypted_content_info = content['encryptedContentInfo'] recipient_infos = content['recipientInfos'] self.assertEqual(1, len(recipient_infos)) ktri = recipient_infos[0]['ktri'] if self.strict_checking: self.assertEqual(0, ktri['version']) private_key = encpart_decryption_key self.assertIsInstance(private_key, asymmetric.rsa.RSAPrivateKey) client_subject_key_id = ( x509.SubjectKeyIdentifier.from_public_key( private_key.public_key())) # Check that the client certificate is named as the recipient. ktri_rid = ktri['rid'] try: issuer_and_serial_number = ktri_rid[ 'issuerAndSerialNumber'] except KeyError: subject_key_id = ktri_rid['subjectKeyIdentifier'] self.assertEqual(subject_key_id, client_subject_key_id.digest) else: client_certificate = kdc_exchange_dict['client_cert'] self.assertIsNotNone(issuer_and_serial_number['issuer']) self.assertEqual(issuer_and_serial_number['serialNumber'], client_certificate.serial_number) key_encryption_algorithm = ktri['keyEncryptionAlgorithm'] self.assertEqual(str(krb5_asn1.rsaEncryption), key_encryption_algorithm['algorithm']) if self.strict_checking: self.assertEqual( b'\x05\x00', key_encryption_algorithm.get('parameters')) encrypted_key = ktri['encryptedKey'] # Decrypt the key. pad_len = 256 - len(encrypted_key) if pad_len: encrypted_key = bytes(pad_len) + encrypted_key decrypted_key = private_key.decrypt( encrypted_key, padding=asymmetric.padding.PKCS1v15()) self.assertEqual(str(krb5_asn1.id_signedData), encrypted_content_info['contentType']) encrypted_content = encrypted_content_info['encryptedContent'] encryption_algorithm = encrypted_content_info[ 'contentEncryptionAlgorithm'] cipher_algorithm = self.cipher_from_algorithm(encryption_algorithm['algorithm']) # This will serve as the IV. parameters = self.der_decode( encryption_algorithm['parameters'], asn1Spec=krb5_asn1.CMSCBCParameter()) # Decrypt the content. cipher = Cipher(cipher_algorithm(decrypted_key), modes.CBC(parameters), default_backend()) decryptor = cipher.decryptor() decrypted_content = decryptor.update(encrypted_content) decrypted_content += decryptor.finalize() # The padding doesn’t fully comply to PKCS7 with a specified # blocksize, so we must unpad the data ourselves. decrypted_content = self.unpad(decrypted_content) signed_data = None signed_data_rfc2315 = None first_tag = decrypted_content[0] if first_tag == 0x30: # ASN.1 SEQUENCE tag signed_data = decrypted_content else: # Windows encodes the ASN.1 incorrectly, neglecting to add # the SEQUENCE tag. We’ll have to prepend it ourselves in # order for the decoding to work. encoded_len = self.asn1_length(decrypted_content) decrypted_content = bytes([0x30]) + encoded_len + ( decrypted_content) if first_tag == 0x02: # ASN.1 INTEGER tag # The INTEGER tag indicates that the data is encoded # with the earlier variant of the SignedData ASN.1 # schema specified in RFC2315, as per [MS-PKCA] 2.2.4 # (PA-PK-AS-REP). signed_data_rfc2315 = decrypted_content elif first_tag == 0x06: # ASN.1 OBJECT IDENTIFIER tag # The OBJECT IDENTIFIER tag indicates that the data is # encoded as SignedData and wrapped in a ContentInfo # structure, which we shall have to decode first. This # seems to be the case when the supportedCMSTypes field # in the client’s AuthPack is missing or empty. content_info = self.der_decode( decrypted_content, asn1Spec=krb5_asn1.ContentInfo()) self.assertEqual(str(krb5_asn1.id_signedData), content_info['contentType']) signed_data = content_info['content'] else: self.fail(f'got reply with unknown initial tag ' f'({first_tag})') if signed_data is not None: signed_data = self.der_decode( signed_data, asn1Spec=krb5_asn1.SignedData()) encap_content_info = signed_data['encapContentInfo'] content_type = encap_content_info['eContentType'] content = encap_content_info['eContent'] elif signed_data_rfc2315 is not None: signed_data = self.der_decode( signed_data_rfc2315, asn1Spec=krb5_asn1.SignedData_RFC2315()) encap_content_info = signed_data['contentInfo'] content_type = encap_content_info['contentType'] content = self.der_decode( encap_content_info['content'], asn1Spec=pyasn1.type.univ.OctetString()) else: self.fail('we must have got SignedData') self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData), content_type) reply_key_pack = self.der_decode( content, asn1Spec=reply_key_pack_asn1_spec()) req_obj = kdc_exchange_dict['req_obj'] req_asn1Spec = kdc_exchange_dict['req_asn1Spec'] req_obj = self.der_encode(req_obj, asn1Spec=req_asn1Spec()) reply_key = reply_key_pack['replyKey'] # Reply the encpart decryption key with the decrypted key from # the reply. encpart_decryption_key = self.SessionKey_create( etype=reply_key['keytype'], contents=reply_key['keyvalue'], kvno=None) if not pk_win2k: as_checksum = reply_key_pack['asChecksum'] # Verify the checksum over the AS request body. kcrypto.verify_checksum(as_checksum['cksumtype'], encpart_decryption_key.key, KU_PKINIT_AS_REQ, req_obj, as_checksum['checksum']) elif using_pkinit is PkInit.DIFFIE_HELLMAN: content_info = self.der_decode( pk_as_rep['dhInfo']['dhSignedData'], asn1Spec=krb5_asn1.ContentInfo()) self.assertEqual(str(krb5_asn1.id_signedData), content_info['contentType']) signed_data = self.der_decode(content_info['content'], asn1Spec=krb5_asn1.SignedData()) encap_content_info = signed_data['encapContentInfo'] content = encap_content_info['eContent'] self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData), encap_content_info['eContentType']) dh_key_info = self.der_decode( content, asn1Spec=krb5_asn1.KDCDHKeyInfo()) self.assertNotIn('dhKeyExpiration', dh_key_info) dh_private_key = encpart_decryption_key self.assertIsInstance(dh_private_key, asymmetric.dh.DHPrivateKey) self.assertElementEqual(dh_key_info, 'nonce', kdc_exchange_dict['pk_nonce']) dh_public_key_data = self.bytes_from_bit_string( dh_key_info['subjectPublicKey']) dh_public_key_decoded = self.der_decode( dh_public_key_data, asn1Spec=krb5_asn1.DHPublicKey()) dh_numbers = dh_private_key.parameters().parameter_numbers() public_numbers = asymmetric.dh.DHPublicNumbers( dh_public_key_decoded, dh_numbers) dh_public_key = public_numbers.public_key(default_backend()) # Perform the Diffie-Hellman key exchange. shared_secret = dh_private_key.exchange(dh_public_key) # Pad the shared secret out to the length of ‘p’. p_len = self.length_in_bytes(dh_numbers.p) padding_len = p_len - len(shared_secret) self.assertGreaterEqual(padding_len, 0) padded_shared_secret = bytes(padding_len) + shared_secret reply_key_enc_type = self.expected_etype(kdc_exchange_dict) # At the moment, we don’t specify a nonce in the request, so we # can assume these are empty. client_nonce = b'' server_nonce = b'' ciphertext = padded_shared_secret + client_nonce + server_nonce # Replace the encpart decryption key with the key derived from # the Diffie-Hellman key exchange. encpart_decryption_key = self.octetstring2key( ciphertext, reply_key_enc_type) else: self.fail(f'invalid value for using_pkinit: {using_pkinit}') self.assertEqual(3, signed_data['version']) digest_algorithms = signed_data['digestAlgorithms'] self.assertEqual(1, len(digest_algorithms)) digest_algorithm = digest_algorithms[0] # Ensure the hash algorithm is valid. _ = self.hash_from_algorithm_id(digest_algorithm) self.assertFalse(signed_data.get('crls')) signer_infos = signed_data['signerInfos'] self.assertEqual(1, len(signer_infos)) signer_info = signer_infos[0] self.assertEqual(1, signer_info['version']) # Get the certificate presented by the KDC. kdc_certificates = signed_data['certificates'] self.assertEqual(1, len(kdc_certificates)) kdc_certificate = self.der_encode( kdc_certificates[0], asn1Spec=krb5_asn1.CertificateChoices()) kdc_certificate = x509.load_der_x509_certificate(kdc_certificate, default_backend()) # Verify that the KDC’s certificate is named as the signer. sid = signer_info['sid'] try: issuer_and_serial_number = sid['issuerAndSerialNumber'] except KeyError: extension = kdc_certificate.extensions.get_extension_for_oid( x509.oid.ExtensionOID.SUBJECT_KEY_IDENTIFIER) cert_subject_key_id = extension.value.digest self.assertEqual(sid['subjectKeyIdentifier'], cert_subject_key_id) else: self.assertIsNotNone(issuer_and_serial_number['issuer']) self.assertEqual(issuer_and_serial_number['serialNumber'], kdc_certificate.serial_number) digest_algorithm = signer_info['digestAlgorithm'] digest_hash_fn = self.hash_from_algorithm_id(digest_algorithm) signed_attrs = signer_info['signedAttrs'] self.assertEqual(2, len(signed_attrs)) signed_attr0 = signed_attrs[0] self.assertEqual(str(krb5_asn1.id_contentType), signed_attr0['type']) signed_attr0_values = signed_attr0['values'] self.assertEqual(1, len(signed_attr0_values)) signed_attr0_value = self.der_decode( signed_attr0_values[0], asn1Spec=krb5_asn1.ContentType()) if using_pkinit is PkInit.DIFFIE_HELLMAN: self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData), signed_attr0_value) else: self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData), signed_attr0_value) signed_attr1 = signed_attrs[1] self.assertEqual(str(krb5_asn1.id_messageDigest), signed_attr1['type']) signed_attr1_values = signed_attr1['values'] self.assertEqual(1, len(signed_attr1_values)) message_digest = self.der_decode(signed_attr1_values[0], krb5_asn1.MessageDigest()) signature_algorithm = signer_info['signatureAlgorithm'] hash_fn = self.hash_from_algorithm_id(signature_algorithm) # Compute the hash of the content to be signed. With the # Diffie-Hellman key exchange, this signature is over the type # KDCDHKeyInfo; otherwise, it is over the type ReplyKeyPack. digest = hashes.Hash(digest_hash_fn(), default_backend()) digest.update(content) digest = digest.finalize() # Verify the hash. Note: this is a non–constant time comparison. self.assertEqual(digest, message_digest) # Re-encode the attributes ready for verifying the signature. cms_attrs = self.der_encode(signed_attrs, asn1Spec=krb5_asn1.CMSAttributes()) # Verify the signature. kdc_public_key = kdc_certificate.public_key() kdc_public_key.verify( signer_info['signature'], cms_attrs, asymmetric.padding.PKCS1v15(), hash_fn()) self.assertFalse(signer_info.get('unsignedAttrs')) if armor_key is not None: if PADATA_FX_FAST in pa_dict: fx_fast_data = pa_dict[PADATA_FX_FAST] fast_response = self.check_fx_fast_data(kdc_exchange_dict, fx_fast_data, armor_key, finished=True) if 'strengthen-key' in fast_response: strengthen_key = self.EncryptionKey_import( fast_response['strengthen-key']) encpart_decryption_key = ( self.generate_strengthen_reply_key( strengthen_key, encpart_decryption_key)) fast_finished = fast_response.get('finished') if fast_finished is not None: ticket_checksum = fast_finished['ticket-checksum'] self.check_rep_padata(kdc_exchange_dict, callback_dict, fast_response['padata'], error_code=0) ticket_private = None if ticket_decryption_key is not None: self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype) self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno) ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher) ticket_private = self.der_decode( ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart()) encpart_private = None self.assertIsNotNone(encpart_decryption_key) if encpart_decryption_key is not None: self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype) if self.strict_checking: self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno) rep_decpart = encpart_decryption_key.decrypt( encpart_decryption_usage, encpart_cipher) # MIT KDC encodes both EncASRepPart and EncTGSRepPart with # application tag 26 try: encpart_private = self.der_decode( rep_decpart, asn1Spec=rep_encpart_asn1Spec()) except Exception: encpart_private = self.der_decode( rep_decpart, asn1Spec=krb5_asn1.EncTGSRepPart()) kdc_exchange_dict['reply_key'] = encpart_decryption_key self.assertIsNotNone(check_kdc_private_fn) if check_kdc_private_fn is not None: check_kdc_private_fn(kdc_exchange_dict, callback_dict, rep, ticket_private, encpart_private, ticket_checksum) return rep def check_fx_fast_data(self, kdc_exchange_dict, fx_fast_data, armor_key, finished=False, expect_strengthen_key=True): fx_fast_data = self.der_decode(fx_fast_data, asn1Spec=krb5_asn1.PA_FX_FAST_REPLY()) enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep'] self.assertEqual(enc_fast_rep['etype'], armor_key.etype) fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher']) fast_response = self.der_decode(fast_rep, asn1Spec=krb5_asn1.KrbFastResponse()) if expect_strengthen_key and self.strict_checking: self.assertIn('strengthen-key', fast_response) if finished: self.assertIn('finished', fast_response) # Ensure that the nonce matches the nonce in the body of the request # (RFC6113 5.4.3). nonce = kdc_exchange_dict['nonce'] self.assertEqual(nonce, fast_response['nonce']) return fast_response def generic_check_kdc_private(self, kdc_exchange_dict, callback_dict, rep, ticket_private, encpart_private, ticket_checksum): kdc_options = kdc_exchange_dict['kdc_options'] canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1 canonicalize = (canon_pos < len(kdc_options) and kdc_options[canon_pos] == '1') renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1 renewable = (renewable_pos < len(kdc_options) and kdc_options[renewable_pos] == '1') renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1 renew = (renew_pos < len(kdc_options) and kdc_options[renew_pos] == '1') expect_renew_till = renewable or renew expected_crealm = kdc_exchange_dict['expected_crealm'] expected_cname = kdc_exchange_dict['expected_cname'] expected_srealm = kdc_exchange_dict['expected_srealm'] expected_sname = kdc_exchange_dict['expected_sname'] ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key'] rep_msg_type = kdc_exchange_dict['rep_msg_type'] expected_flags = kdc_exchange_dict.get('expected_flags') unexpected_flags = kdc_exchange_dict.get('unexpected_flags') ticket = self.getElementValue(rep, 'ticket') if ticket_checksum is not None: armor_key = kdc_exchange_dict['armor_key'] self.verify_ticket_checksum(ticket, ticket_checksum, armor_key) to_rodc = kdc_exchange_dict['to_rodc'] if to_rodc: krbtgt_creds = self.get_rodc_krbtgt_creds() else: krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) krbtgt_keys = [krbtgt_key] if not self.strict_checking: krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds( krbtgt_creds, etype=kcrypto.Enctype.RC4) krbtgt_keys.append(krbtgt_key_rc4) if self.expect_pac and self.is_tgs(expected_sname): expect_pac = True else: expect_pac = kdc_exchange_dict['expect_pac'] ticket_session_key = None if ticket_private is not None: self.assertElementFlags(ticket_private, 'flags', expected_flags, unexpected_flags) self.assertElementPresent(ticket_private, 'key') ticket_key = self.getElementValue(ticket_private, 'key') self.assertIsNotNone(ticket_key) if ticket_key is not None: # Never None, but gives indentation self.assertElementPresent(ticket_key, 'keytype') self.assertElementPresent(ticket_key, 'keyvalue') ticket_session_key = self.EncryptionKey_import(ticket_key) self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm) if self.cname_checking: self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname) self.assertElementPresent(ticket_private, 'transited') self.assertElementPresent(ticket_private, 'authtime') if self.strict_checking: self.assertElementPresent(ticket_private, 'starttime') self.assertElementPresent(ticket_private, 'endtime') if self.strict_checking: if expect_renew_till: self.assertElementPresent(ticket_private, 'renew-till') else: self.assertElementMissing(ticket_private, 'renew-till') if self.strict_checking: self.assertElementMissing(ticket_private, 'caddr') if expect_pac is not None: if expect_pac: self.assertElementPresent(ticket_private, 'authorization-data', expect_empty=not expect_pac) else: # It is more correct to not have an authorization-data # present than an empty one. # # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193 v = self.getElementValue(ticket_private, 'authorization-data') if v is not None: self.assertElementPresent(ticket_private, 'authorization-data', expect_empty=True) encpart_session_key = None if encpart_private is not None: self.assertElementPresent(encpart_private, 'key') encpart_key = self.getElementValue(encpart_private, 'key') self.assertIsNotNone(encpart_key) if encpart_key is not None: # Never None, but gives indentation self.assertElementPresent(encpart_key, 'keytype') self.assertElementPresent(encpart_key, 'keyvalue') encpart_session_key = self.EncryptionKey_import(encpart_key) self.assertElementPresent(encpart_private, 'last-req') expected_nonce = kdc_exchange_dict.get('pk_nonce') if not expected_nonce: expected_nonce = kdc_exchange_dict['nonce'] self.assertElementEqual(encpart_private, 'nonce', expected_nonce) if rep_msg_type == KRB_AS_REP: if self.strict_checking: self.assertElementPresent(encpart_private, 'key-expiration') else: self.assertElementMissing(encpart_private, 'key-expiration') self.assertElementFlags(encpart_private, 'flags', expected_flags, unexpected_flags) self.assertElementPresent(encpart_private, 'authtime') if self.strict_checking: self.assertElementPresent(encpart_private, 'starttime') self.assertElementPresent(encpart_private, 'endtime') if self.strict_checking: if expect_renew_till: self.assertElementPresent(encpart_private, 'renew-till') else: self.assertElementMissing(encpart_private, 'renew-till') self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm) self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname) if self.strict_checking: self.assertElementMissing(encpart_private, 'caddr') sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict) sent_enc_pa_rep = self.sent_enc_pa_rep(kdc_exchange_dict) enc_padata = self.getElementValue(encpart_private, 'encrypted-pa-data') if (canonicalize or '1' in sent_pac_options or ( rep_msg_type == KRB_AS_REP and sent_enc_pa_rep)): if self.strict_checking: self.assertIsNotNone(enc_padata) if enc_padata is not None: enc_pa_dict = self.get_pa_dict(enc_padata) if self.strict_checking: if canonicalize: self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict) else: self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict) if '1' in sent_pac_options: self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict) else: self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict) if rep_msg_type == KRB_AS_REP and sent_enc_pa_rep: self.assertIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict) else: self.assertNotIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict) if PADATA_SUPPORTED_ETYPES in enc_pa_dict: expected_supported_etypes = kdc_exchange_dict[ 'expected_supported_etypes'] (supported_etypes,) = struct.unpack( '= ( claims.CLAIM_UPPER_COMPRESSION_THRESHOLD): self.assertEqual( claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF, compression_format, f'{claims_type} unexpectedly not compressed ' f'({uncompressed_size} bytes uncompressed)') claims_set = claims_metadata.claims_set.claims.claims self.assertIsNotNone(claims_set, f'got empty CLAIMS_SET_NDR inner ' f'structure {empty_msg}') claims_arrays = claims_set.claims_arrays self.assertIsNotNone(claims_arrays, f'got empty CLAIMS_SET structure ' f'{empty_msg}') self.assertGreater(len(claims_arrays), 0, f'got empty claims array {empty_msg}') self.assertEqual(len(claims_arrays), claims_set.claims_array_count, f'{claims_type} arrays size mismatch') got_claims = {} for claims_array in claims_arrays: claim_entries = claims_array.claim_entries self.assertIsNotNone(claim_entries, f'got empty CLAIMS_ARRAY structure ' f'{empty_msg}') self.assertGreater(len(claim_entries), 0, f'got empty claim entries array ' f'{empty_msg}') self.assertEqual(len(claim_entries), claims_array.claims_count, f'{claims_type} entries array size ' f'mismatch') for entry in claim_entries: if unexpected_claims is not None: self.assertNotIn(entry.id, unexpected_claims, f'got unexpected {claims_type} ' f'in PAC') if expected_claims is None: continue expected_claim = expected_claims.get(entry.id) if expected_claim is None: continue self.assertNotIn(entry.id, got_claims, f'got duplicate {claims_type}') self.assertIsNotNone(entry.values.values, f'got {claims_type} with no ' f'values') self.assertGreater(len(entry.values.values), 0, f'got empty {claims_type} values ' f'array') self.assertEqual(len(entry.values.values), entry.values.value_count, f'{claims_type} values array size ' f'mismatch') expected_claim_values = expected_claim.get('values') self.assertIsNotNone(expected_claim_values, f'got expected {claims_type} ' f'with no values') values = type(expected_claim_values)( entry.values.values) got_claims[entry.id] = { 'source_type': claims_array.claims_source_type, 'type': entry.type, 'values': values, } self.assertEqual(expected_claims, got_claims or None, f'{claims_type} did not match expectations') elif pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO: device_info = pac_buffer.info.info self.check_device_info(device_info, kdc_exchange_dict) elif pac_buffer.type == krb5pac.PAC_TYPE_CREDENTIAL_INFO: credential_info = pac_buffer.info expected_etype = self.expected_etype(kdc_exchange_dict) self.assertEqual(0, credential_info.version) self.assertEqual(expected_etype, credential_info.encryption_type) encrypted_data = credential_info.encrypted_data reply_key = kdc_exchange_dict['reply_key'] data = reply_key.decrypt(KU_NON_KERB_SALT, encrypted_data) credential_data_ndr = ndr_unpack( krb5pac.PAC_CREDENTIAL_DATA_NDR, data) credential_data = credential_data_ndr.ctr.data self.assertEqual(1, credential_data.credential_count) self.assertEqual(credential_data.credential_count, len(credential_data.credentials)) package = credential_data.credentials[0] self.assertEqual('NTLM', str(package.package_name)) ntlm_blob = bytes(package.credential) ntlm_package = ndr_unpack(krb5pac.PAC_CREDENTIAL_NTLM_SECPKG, ntlm_blob) self.assertEqual(0, ntlm_package.version) self.assertEqual(krb5pac.PAC_CREDENTIAL_NTLM_HAS_NT_HASH, ntlm_package.flags) creds = kdc_exchange_dict['creds'] nt_password = bytes(ntlm_package.nt_password.hash) self.assertEqual(creds.get_nt_hash(), nt_password) lm_password = bytes(ntlm_package.lm_password.hash) self.assertEqual(bytes(16), lm_password) def generic_check_kdc_error(self, kdc_exchange_dict, callback_dict, rep, inner=False): rep_msg_type = kdc_exchange_dict['rep_msg_type'] expected_anon = kdc_exchange_dict['expected_anon'] expected_srealm = kdc_exchange_dict['expected_srealm'] expected_sname = kdc_exchange_dict['expected_sname'] expected_error_mode = kdc_exchange_dict['expected_error_mode'] sent_fast = self.sent_fast(kdc_exchange_dict) fast_armor_type = kdc_exchange_dict['fast_armor_type'] self.assertElementEqual(rep, 'pvno', 5) self.assertElementEqual(rep, 'msg-type', KRB_ERROR) error_code = self.getElementValue(rep, 'error-code') self.assertIn(error_code, expected_error_mode) if self.strict_checking: self.assertElementMissing(rep, 'ctime') self.assertElementMissing(rep, 'cusec') self.assertElementPresent(rep, 'stime') self.assertElementPresent(rep, 'susec') # error-code checked above if expected_anon and not inner: expected_cname = self.PrincipalName_create( name_type=NT_WELLKNOWN, names=['WELLKNOWN', 'ANONYMOUS']) self.assertElementEqualPrincipal(rep, 'cname', expected_cname) elif self.strict_checking: self.assertElementMissing(rep, 'cname') if self.strict_checking: self.assertElementMissing(rep, 'crealm') self.assertElementEqualUTF8(rep, 'realm', expected_srealm) self.assertElementEqualPrincipal(rep, 'sname', expected_sname) self.assertElementMissing(rep, 'e-text') expect_status = kdc_exchange_dict['expect_status'] expected_status = kdc_exchange_dict['expected_status'] expect_edata = kdc_exchange_dict['expect_edata'] if expect_edata is None: expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS and (not sent_fast or fast_armor_type is None or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST) and not inner) if inner and expect_edata is self.expect_padata_outer: expect_edata = False if not expect_edata: self.assertFalse(expect_status) if self.strict_checking or expect_status is False: self.assertElementMissing(rep, 'e-data') return rep edata = self.getElementValue(rep, 'e-data') if self.strict_checking or expect_status: self.assertIsNotNone(edata) if edata is not None: try: error_data = self.der_decode( edata, asn1Spec=krb5_asn1.KERB_ERROR_DATA()) except PyAsn1Error: if expect_status: # The test requires that the KDC be declared to support # NTSTATUS values in e-data to proceed. self.assertTrue( self.expect_nt_status, 'expected status code (which, according to ' 'EXPECT_NT_STATUS=0, the KDC does not support)') self.fail('expected to get status code') rep_padata = self.der_decode( edata, asn1Spec=krb5_asn1.METHOD_DATA()) self.assertGreater(len(rep_padata), 0) if sent_fast: self.assertEqual(1, len(rep_padata)) rep_pa_dict = self.get_pa_dict(rep_padata) self.assertIn(PADATA_FX_FAST, rep_pa_dict) armor_key = kdc_exchange_dict['armor_key'] self.assertIsNotNone(armor_key) fast_response = self.check_fx_fast_data( kdc_exchange_dict, rep_pa_dict[PADATA_FX_FAST], armor_key, expect_strengthen_key=False) rep_padata = fast_response['padata'] etype_info2 = self.check_rep_padata(kdc_exchange_dict, callback_dict, rep_padata, error_code) kdc_exchange_dict['preauth_etype_info2'] = etype_info2 else: self.assertTrue(self.expect_nt_status, 'got status code, but EXPECT_NT_STATUS=0') if expect_status is not None: self.assertTrue(expect_status, 'got unexpected status code') self.assertEqual(KERB_ERR_TYPE_EXTENDED, error_data['data-type']) extended_error = error_data['data-value'] self.assertEqual(12, len(extended_error)) status = int.from_bytes(extended_error[:4], 'little') flags = int.from_bytes(extended_error[8:], 'little') self.assertEqual(expected_status, status) if rep_msg_type == KRB_TGS_REP: self.assertEqual(3, flags) else: self.assertEqual(1, flags) return rep def check_reply_padata(self, kdc_exchange_dict, callback_dict, encpart, rep_padata): expected_patypes = () sent_fast = self.sent_fast(kdc_exchange_dict) rep_msg_type = kdc_exchange_dict['rep_msg_type'] if sent_fast: expected_patypes += (PADATA_FX_FAST,) elif rep_msg_type == KRB_AS_REP: if self.sent_pk_as_req(kdc_exchange_dict): expected_patypes += PADATA_PK_AS_REP, elif self.sent_pk_as_req_win2k(kdc_exchange_dict): expected_patypes += PADATA_PK_AS_REP_19, else: chosen_etype = self.getElementValue(encpart, 'etype') self.assertIsNotNone(chosen_etype) if chosen_etype in {kcrypto.Enctype.AES256, kcrypto.Enctype.AES128}: expected_patypes += (PADATA_ETYPE_INFO2,) preauth_key = kdc_exchange_dict['preauth_key'] self.assertIsInstance(preauth_key, Krb5EncryptionKey) if preauth_key.etype == kcrypto.Enctype.RC4 and rep_padata is None: rep_padata = () elif rep_msg_type == KRB_TGS_REP: if expected_patypes == () and rep_padata is None: rep_padata = () if not self.strict_checking and rep_padata is None: rep_padata = () self.assertIsNotNone(rep_padata) got_patypes = tuple(pa['padata-type'] for pa in rep_padata) self.assertSequenceElementsEqual(expected_patypes, got_patypes, # Windows does not add this. unchecked={PADATA_PKINIT_KX}) if len(expected_patypes) == 0: return None pa_dict = self.get_pa_dict(rep_padata) etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2) if etype_info2 is not None: etype_info2 = self.der_decode(etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2()) self.assertEqual(len(etype_info2), 1) elem = etype_info2[0] e = self.getElementValue(elem, 'etype') self.assertEqual(e, chosen_etype) salt = self.getElementValue(elem, 'salt') self.assertIsNotNone(salt) expected_salt = kdc_exchange_dict['expected_salt'] if expected_salt is not None: self.assertEqual(salt, expected_salt) s2kparams = self.getElementValue(elem, 's2kparams') if self.strict_checking: self.assertIsNone(s2kparams) @staticmethod def greatest_common_etype(etypes, proposed_etypes): return max(filter(lambda e: e in etypes, proposed_etypes), default=None) @staticmethod def first_common_etype(etypes, proposed_etypes): return next(filter(lambda e: e in etypes, proposed_etypes), None) def supported_aes_rc4_etypes(self, kdc_exchange_dict): creds = kdc_exchange_dict['creds'] supported_etypes = self.get_default_enctypes(creds) rc4_support = kdc_exchange_dict['rc4_support'] aes_etypes = set() if kcrypto.Enctype.AES256 in supported_etypes: aes_etypes.add(kcrypto.Enctype.AES256) if kcrypto.Enctype.AES128 in supported_etypes: aes_etypes.add(kcrypto.Enctype.AES128) rc4_etypes = set() if rc4_support and kcrypto.Enctype.RC4 in supported_etypes: rc4_etypes.add(kcrypto.Enctype.RC4) return aes_etypes, rc4_etypes def greatest_aes_rc4_etypes(self, kdc_exchange_dict): req_body = kdc_exchange_dict['req_body'] proposed_etypes = req_body['etype'] aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(kdc_exchange_dict) expected_aes = self.greatest_common_etype(aes_etypes, proposed_etypes) expected_rc4 = self.greatest_common_etype(rc4_etypes, proposed_etypes) return expected_aes, expected_rc4 def expected_etype(self, kdc_exchange_dict): req_body = kdc_exchange_dict['req_body'] proposed_etypes = req_body['etype'] aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes( kdc_exchange_dict) return self.first_common_etype(aes_etypes | rc4_etypes, proposed_etypes) def check_rep_padata(self, kdc_exchange_dict, callback_dict, rep_padata, error_code): rep_msg_type = kdc_exchange_dict['rep_msg_type'] sent_fast = self.sent_fast(kdc_exchange_dict) sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict) if rep_msg_type == KRB_TGS_REP: self.assertTrue(sent_fast) rc4_support = kdc_exchange_dict['rc4_support'] expected_aes, expected_rc4 = self.greatest_aes_rc4_etypes( kdc_exchange_dict) expect_etype_info2 = () expect_etype_info = False if expected_aes is not None: expect_etype_info2 += (expected_aes,) if expected_rc4 is not None: if error_code != 0: expect_etype_info2 += (expected_rc4,) if expected_aes is None: expect_etype_info = True if expect_etype_info: self.assertGreater(len(expect_etype_info2), 0) sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict) check_patypes = kdc_exchange_dict['check_patypes'] if check_patypes: expected_patypes = () if sent_fast and error_code != 0: expected_patypes += (PADATA_FX_ERROR,) expected_patypes += (PADATA_FX_COOKIE,) if rep_msg_type == KRB_TGS_REP: if ('1' in sent_pac_options and error_code not in (0, KDC_ERR_GENERIC)): expected_patypes += (PADATA_PAC_OPTIONS,) elif error_code != KDC_ERR_GENERIC: if expect_etype_info: expected_patypes += (PADATA_ETYPE_INFO,) if len(expect_etype_info2) != 0: expected_patypes += (PADATA_ETYPE_INFO2,) sent_freshness = self.sent_freshness(kdc_exchange_dict) if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW, KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED): if sent_fast: expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,) else: expected_patypes += (PADATA_ENC_TIMESTAMP,) if not sent_enc_challenge: expected_patypes += (PADATA_PK_AS_REQ,) if not sent_freshness: expected_patypes += (PADATA_PK_AS_REP_19,) if sent_freshness: expected_patypes += PADATA_AS_FRESHNESS, if (self.kdc_fast_support and not sent_fast and not sent_enc_challenge): expected_patypes += (PADATA_FX_FAST,) expected_patypes += (PADATA_FX_COOKIE,) require_strict = {PADATA_FX_COOKIE, PADATA_FX_FAST, PADATA_PAC_OPTIONS, PADATA_PK_AS_REP_19, PADATA_PK_AS_REQ, PADATA_PKINIT_KX, PADATA_GSS} strict_edata_checking = kdc_exchange_dict['strict_edata_checking'] if not strict_edata_checking: require_strict.add(PADATA_ETYPE_INFO2) require_strict.add(PADATA_ENCRYPTED_CHALLENGE) got_patypes = tuple(pa['padata-type'] for pa in rep_padata) self.assertSequenceElementsEqual(expected_patypes, got_patypes, require_strict=require_strict, unchecked={PADATA_PW_SALT}) if not expected_patypes: return None pa_dict = self.get_pa_dict(rep_padata) enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP) if enc_timestamp is not None: self.assertEqual(len(enc_timestamp), 0) pk_as_req = pa_dict.get(PADATA_PK_AS_REQ) if pk_as_req is not None: self.assertEqual(len(pk_as_req), 0) pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19) if pk_as_rep19 is not None: self.assertEqual(len(pk_as_rep19), 0) freshness_token = pa_dict.get(PADATA_AS_FRESHNESS) if freshness_token is not None: self.assertEqual(bytes(2), freshness_token[:2]) freshness = self.der_decode(freshness_token[2:], asn1Spec=krb5_asn1.EncryptedData()) krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) self.assertElementEqual(freshness, 'etype', krbtgt_key.etype) self.assertElementKVNO(freshness, 'kvno', krbtgt_key.kvno) # Decrypt the freshness token. ts_enc = krbtgt_key.decrypt(KU_AS_FRESHNESS, freshness['cipher']) # Ensure that we can decode it as PA-ENC-TS-ENC. ts_enc = self.der_decode(ts_enc, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) freshness_time = self.get_EpochFromKerberosTime( ts_enc['patimestamp']) freshness_time += ts_enc['pausec'] / 1e6 # Ensure that it is reasonably close to the current time (within # five minutes, to allow for clock skew). current_time = datetime.datetime.now( datetime.timezone.utc).timestamp() self.assertLess(current_time - 5 * 60, freshness_time) self.assertLess(freshness_time, current_time + 5 * 60) kdc_exchange_dict['freshness_token'] = freshness_token fx_fast = pa_dict.get(PADATA_FX_FAST) if fx_fast is not None: self.assertEqual(len(fx_fast), 0) fast_cookie = pa_dict.get(PADATA_FX_COOKIE) if fast_cookie is not None: kdc_exchange_dict['fast_cookie'] = fast_cookie fast_error = pa_dict.get(PADATA_FX_ERROR) if fast_error is not None: fast_error = self.der_decode(fast_error, asn1Spec=krb5_asn1.KRB_ERROR()) self.generic_check_kdc_error(kdc_exchange_dict, callback_dict, fast_error, inner=True) pac_options = pa_dict.get(PADATA_PAC_OPTIONS) if pac_options is not None: pac_options = self.der_decode( pac_options, asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) self.assertElementEqual(pac_options, 'options', sent_pac_options) enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE) if enc_challenge is not None: if not sent_enc_challenge: self.assertEqual(len(enc_challenge), 0) else: armor_key = kdc_exchange_dict['armor_key'] self.assertIsNotNone(armor_key) preauth_key, _ = self.get_preauth_key(kdc_exchange_dict) kdc_challenge_key = self.generate_kdc_challenge_key( armor_key, preauth_key) # Ensure that the encrypted challenge FAST factor is supported # (RFC6113 5.4.6). if self.strict_checking: self.assertNotEqual(len(enc_challenge), 0) if len(enc_challenge) != 0: encrypted_challenge = self.der_decode( enc_challenge, asn1Spec=krb5_asn1.EncryptedData()) self.assertEqual(encrypted_challenge['etype'], kdc_challenge_key.etype) challenge = kdc_challenge_key.decrypt( KU_ENC_CHALLENGE_KDC, encrypted_challenge['cipher']) challenge = self.der_decode( challenge, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) # Retrieve the returned timestamp. rep_patime = challenge['patimestamp'] self.assertIn('pausec', challenge) # Ensure the returned time is within five minutes of the # current time. rep_time = self.get_EpochFromKerberosTime(rep_patime) current_time = time.time() self.assertLess(current_time - 300, rep_time) self.assertLess(rep_time, current_time + 300) etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2) if etype_info2 is not None: etype_info2 = self.der_decode(etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2()) self.assertGreaterEqual(len(etype_info2), 1) if self.strict_checking: self.assertEqual(len(etype_info2), len(expect_etype_info2)) for i in range(0, len(etype_info2)): e = self.getElementValue(etype_info2[i], 'etype') if self.strict_checking: self.assertEqual(e, expect_etype_info2[i]) salt = self.getElementValue(etype_info2[i], 'salt') if e == kcrypto.Enctype.RC4: if self.strict_checking: self.assertIsNone(salt) else: self.assertIsNotNone(salt) expected_salt = kdc_exchange_dict['expected_salt'] if expected_salt is not None: self.assertEqual(salt, expected_salt) s2kparams = self.getElementValue(etype_info2[i], 's2kparams') if self.strict_checking: self.assertIsNone(s2kparams) etype_info = pa_dict.get(PADATA_ETYPE_INFO) if etype_info is not None: etype_info = self.der_decode(etype_info, asn1Spec=krb5_asn1.ETYPE_INFO()) self.assertEqual(len(etype_info), 1) e = self.getElementValue(etype_info[0], 'etype') self.assertEqual(e, kcrypto.Enctype.RC4) if rc4_support: self.assertEqual(e, expect_etype_info2[0]) salt = self.getElementValue(etype_info[0], 'salt') if self.strict_checking: self.assertIsNotNone(salt) self.assertEqual(len(salt), 0) return etype_info2 def generate_simple_fast(self, kdc_exchange_dict, _callback_dict, req_body, fast_padata, fast_armor, checksum, fast_options=''): armor_key = kdc_exchange_dict['armor_key'] fast_req = self.KRB_FAST_REQ_create(fast_options, fast_padata, req_body) fast_req = self.der_encode(fast_req, asn1Spec=krb5_asn1.KrbFastReq()) fast_req = self.EncryptedData_create(armor_key, KU_FAST_ENC, fast_req) fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor, checksum, fast_req) fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req) fx_fast_request = self.der_encode( fx_fast_request, asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST()) fast_padata = self.PA_DATA_create(PADATA_FX_FAST, fx_fast_request) return fast_padata def generate_ap_req(self, kdc_exchange_dict, _callback_dict, req_body, armor, usage=None, seq_number=None): req_body_checksum = None if armor: self.assertIsNone(req_body) tgt = kdc_exchange_dict['armor_tgt'] authenticator_subkey = kdc_exchange_dict['armor_subkey'] else: tgt = kdc_exchange_dict['tgt'] authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] if req_body is not None: body_checksum_type = kdc_exchange_dict['body_checksum_type'] req_body_blob = self.der_encode( req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) req_body_checksum = self.Checksum_create( tgt.session_key, KU_TGS_REQ_AUTH_CKSUM, req_body_blob, ctype=body_checksum_type) auth_data = kdc_exchange_dict['auth_data'] subkey_obj = None if authenticator_subkey is not None: subkey_obj = authenticator_subkey.export_obj() if seq_number is None: seq_number = random.randint(0, 0xfffffffe) (ctime, cusec) = self.get_KerberosTimeWithUsec() authenticator_obj = self.Authenticator_create( crealm=tgt.crealm, cname=tgt.cname, cksum=req_body_checksum, cusec=cusec, ctime=ctime, subkey=subkey_obj, seq_number=seq_number, authorization_data=auth_data) authenticator_blob = self.der_encode( authenticator_obj, asn1Spec=krb5_asn1.Authenticator()) if usage is None: usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH authenticator = self.EncryptedData_create(tgt.session_key, usage, authenticator_blob) if armor: ap_options = kdc_exchange_dict['fast_ap_options'] else: ap_options = kdc_exchange_dict['ap_options'] if ap_options is None: ap_options = str(krb5_asn1.APOptions('0')) ap_req_obj = self.AP_REQ_create(ap_options=ap_options, ticket=tgt.ticket, authenticator=authenticator) ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ()) return ap_req def generate_simple_tgs_padata(self, kdc_exchange_dict, callback_dict, req_body): ap_req = self.generate_ap_req(kdc_exchange_dict, callback_dict, req_body, armor=False) pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) padata = [pa_tgs_req] return padata, req_body def get_preauth_key(self, kdc_exchange_dict): msg_type = kdc_exchange_dict['rep_msg_type'] if msg_type == KRB_AS_REP: key = kdc_exchange_dict['preauth_key'] usage = KU_AS_REP_ENC_PART else: # KRB_TGS_REP authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] if authenticator_subkey is not None: key = authenticator_subkey usage = KU_TGS_REP_ENC_PART_SUB_KEY else: tgt = kdc_exchange_dict['tgt'] key = tgt.session_key usage = KU_TGS_REP_ENC_PART_SESSION self.assertIsNotNone(key) return key, usage def generate_armor_key(self, subkey, session_key): armor_key = kcrypto.cf2(subkey.key, session_key.key, b'subkeyarmor', b'ticketarmor') armor_key = Krb5EncryptionKey(armor_key, None) return armor_key def generate_strengthen_reply_key(self, strengthen_key, reply_key): strengthen_reply_key = kcrypto.cf2(strengthen_key.key, reply_key.key, b'strengthenkey', b'replykey') strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key, reply_key.kvno) return strengthen_reply_key def generate_client_challenge_key(self, armor_key, longterm_key): client_challenge_key = kcrypto.cf2(armor_key.key, longterm_key.key, b'clientchallengearmor', b'challengelongterm') client_challenge_key = Krb5EncryptionKey(client_challenge_key, None) return client_challenge_key def generate_kdc_challenge_key(self, armor_key, longterm_key): kdc_challenge_key = kcrypto.cf2(armor_key.key, longterm_key.key, b'kdcchallengearmor', b'challengelongterm') kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None) return kdc_challenge_key def verify_ticket_checksum(self, ticket, expected_checksum, armor_key): expected_type = expected_checksum['cksumtype'] self.assertEqual(armor_key.ctype, expected_type) ticket_blob = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket()) checksum = self.Checksum_create(armor_key, KU_FAST_FINISHED, ticket_blob) self.assertEqual(expected_checksum, checksum) def verify_ticket(self, ticket, krbtgt_keys, service_ticket, expect_pac=True, expect_ticket_checksum=True, expect_full_checksum=None): # Decrypt the ticket. key = ticket.decryption_key enc_part = ticket.ticket['enc-part'] self.assertElementEqual(enc_part, 'etype', key.etype) self.assertElementKVNO(enc_part, 'kvno', key.kvno) enc_part = key.decrypt(KU_TICKET, enc_part['cipher']) enc_part = self.der_decode( enc_part, asn1Spec=krb5_asn1.EncTicketPart()) # Fetch the authorization data from the ticket. auth_data = enc_part.get('authorization-data') if expect_pac: self.assertIsNotNone(auth_data) elif auth_data is None: return # Get a copy of the authdata with an empty PAC, and the existing PAC # (if present). empty_pac = self.get_empty_pac() auth_data, pac_data = self.replace_pac(auth_data, empty_pac, expect_pac=expect_pac) if not expect_pac: return # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the # raw type to create a new PAC with zeroed signatures for # verification. This is because on Windows, the resource_groups field # is added to PAC_LOGON_INFO after the info3 field has been created, # which results in a different ordering of pointer values than Samba # (see commit 0e201ecdc53). Using the raw type avoids changing # PAC_LOGON_INFO, so verification against Windows can work. We still # need the PAC_DATA type to retrieve the actual checksums, because the # signatures in the raw type may contain padding bytes. pac = ndr_unpack(krb5pac.PAC_DATA, pac_data) raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW, pac_data) checksums = {} full_checksum_buffer = None for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers): buffer_type = pac_buffer.type if buffer_type in self.pac_checksum_types: self.assertNotIn(buffer_type, checksums, f'Duplicate checksum type {buffer_type}') # Fetch the checksum and the checksum type from the PAC buffer. checksum = pac_buffer.info.signature ctype = pac_buffer.info.type if ctype & 1 << 31: ctype |= -1 << 31 checksums[buffer_type] = checksum, ctype if buffer_type == krb5pac.PAC_TYPE_FULL_CHECKSUM: full_checksum_buffer = raw_pac_buffer elif buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM: # Zero the checksum field so that we can later verify the # checksums. The ticket checksum field is not zeroed. signature = ndr_unpack( krb5pac.PAC_SIGNATURE_DATA, raw_pac_buffer.info.remaining) signature.signature = bytes(len(checksum)) raw_pac_buffer.info.remaining = ndr_pack( signature) # Re-encode the PAC. pac_data = ndr_pack(raw_pac) if full_checksum_buffer is not None: signature = ndr_unpack( krb5pac.PAC_SIGNATURE_DATA, full_checksum_buffer.info.remaining) signature.signature = bytes(len(checksum)) full_checksum_buffer.info.remaining = ndr_pack( signature) # Re-encode the PAC. full_pac_data = ndr_pack(raw_pac) # Verify the signatures. server_checksum, server_ctype = checksums[ krb5pac.PAC_TYPE_SRV_CHECKSUM] key.verify_checksum(KU_NON_KERB_CKSUM_SALT, pac_data, server_ctype, server_checksum) kdc_checksum, kdc_ctype = checksums[ krb5pac.PAC_TYPE_KDC_CHECKSUM] if isinstance(krbtgt_keys, collections.abc.Container): if self.strict_checking: krbtgt_key = krbtgt_keys[0] else: krbtgt_key = next(key for key in krbtgt_keys if key.ctype == kdc_ctype) else: krbtgt_key = krbtgt_keys krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, server_checksum, kdc_ctype, kdc_checksum) if not service_ticket: self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums) self.assertNotIn(krb5pac.PAC_TYPE_FULL_CHECKSUM, checksums) else: ticket_checksum, ticket_ctype = checksums.get( krb5pac.PAC_TYPE_TICKET_CHECKSUM, (None, None)) if expect_ticket_checksum: self.assertIsNotNone(ticket_checksum) elif expect_ticket_checksum is False: self.assertIsNone(ticket_checksum) if ticket_checksum is not None: enc_part['authorization-data'] = auth_data enc_part = self.der_encode(enc_part, asn1Spec=krb5_asn1.EncTicketPart()) krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, enc_part, ticket_ctype, ticket_checksum) full_checksum, full_ctype = checksums.get( krb5pac.PAC_TYPE_FULL_CHECKSUM, (None, None)) if expect_full_checksum: self.assertIsNotNone(full_checksum) elif expect_full_checksum is False: self.assertIsNone(full_checksum) if full_checksum is not None: krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, full_pac_data, full_ctype, full_checksum) def modified_ticket(self, ticket, *, new_ticket_key=None, modify_fn=None, modify_pac_fn=None, exclude_pac=False, allow_empty_authdata=False, update_pac_checksums=None, checksum_keys=None, include_checksums=None): if checksum_keys is None: # A dict containing a key for each checksum type to be created in # the PAC. checksum_keys = {} else: checksum_keys = dict(checksum_keys) if include_checksums is None: # A dict containing a value for each checksum type; True if the # checksum type is to be included in the PAC, False if it is to be # excluded, or None/not present if the checksum is to be included # based on its presence in the original PAC. include_checksums = {} else: include_checksums = dict(include_checksums) # Check that the values passed in by the caller make sense. self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types) self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types) if update_pac_checksums is None: update_pac_checksums = not exclude_pac if exclude_pac: self.assertIsNone(modify_pac_fn) self.assertFalse(update_pac_checksums) if not update_pac_checksums: self.assertFalse(checksum_keys) self.assertFalse(include_checksums) expect_pac = bool(modify_pac_fn) key = ticket.decryption_key if new_ticket_key is None: # Use the same key to re-encrypt the ticket. new_ticket_key = key if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys: # If the server signature key is not present, fall back to the key # used to encrypt the ticket. checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys: # If the ticket signature key is not present, fall back to the key # used for the KDC signature. kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM) if kdc_checksum_key is not None: checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = ( kdc_checksum_key) if krb5pac.PAC_TYPE_FULL_CHECKSUM not in checksum_keys: # If the full signature key is not present, fall back to the key # used for the KDC signature. kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM) if kdc_checksum_key is not None: checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] = ( kdc_checksum_key) # Decrypt the ticket. enc_part = ticket.ticket['enc-part'] self.assertElementEqual(enc_part, 'etype', key.etype) self.assertElementKVNO(enc_part, 'kvno', key.kvno) enc_part = key.decrypt(KU_TICKET, enc_part['cipher']) enc_part = self.der_decode( enc_part, asn1Spec=krb5_asn1.EncTicketPart()) # Modify the ticket here. if callable(modify_fn): enc_part = modify_fn(enc_part) elif modify_fn: for fn in modify_fn: enc_part = fn(enc_part) auth_data = enc_part.get('authorization-data') if expect_pac: self.assertIsNotNone(auth_data) if auth_data is not None: new_pac = None if exclude_pac: need_to_call_replace_pac = True elif not modify_pac_fn and not update_pac_checksums: need_to_call_replace_pac = False else: need_to_call_replace_pac = True # Get a copy of the authdata with an empty PAC, and the # existing PAC (if present). empty_pac = self.get_empty_pac() empty_pac_auth_data, pac_data = self.replace_pac( auth_data, empty_pac, expect_pac=expect_pac) if pac_data is not None: pac = ndr_unpack(krb5pac.PAC_DATA, pac_data) # Modify the PAC here. if callable(modify_pac_fn): pac = modify_pac_fn(pac) elif modify_pac_fn: for fn in modify_pac_fn: pac = fn(pac) if update_pac_checksums: # Get the enc-part with an empty PAC, which is needed # to create a ticket signature. enc_part_to_sign = enc_part.copy() enc_part_to_sign['authorization-data'] = ( empty_pac_auth_data) enc_part_to_sign = self.der_encode( enc_part_to_sign, asn1Spec=krb5_asn1.EncTicketPart()) self.update_pac_checksums(pac, checksum_keys, include_checksums, enc_part_to_sign) # Re-encode the PAC. pac_data = ndr_pack(pac) new_pac = self.AuthorizationData_create(AD_WIN2K_PAC, pac_data) # Replace the PAC in the authorization data and re-add it to the # ticket enc-part. if need_to_call_replace_pac: auth_data, _ = self.replace_pac( auth_data, new_pac, expect_pac=expect_pac, allow_empty_authdata=allow_empty_authdata) enc_part['authorization-data'] = auth_data # Re-encrypt the ticket enc-part with the new key. enc_part_new = self.der_encode(enc_part, asn1Spec=krb5_asn1.EncTicketPart()) enc_part_new = self.EncryptedData_create(new_ticket_key, KU_TICKET, enc_part_new) # Create a copy of the ticket with the new enc-part. new_ticket = ticket.ticket.copy() new_ticket['enc-part'] = enc_part_new new_ticket_creds = KerberosTicketCreds( new_ticket, session_key=ticket.session_key, crealm=ticket.crealm, cname=ticket.cname, srealm=ticket.srealm, sname=ticket.sname, decryption_key=new_ticket_key, ticket_private=enc_part, encpart_private=ticket.encpart_private) return new_ticket_creds def update_pac_checksums(self, pac, checksum_keys, include_checksums, enc_part=None): pac_buffers = pac.buffers checksum_buffers = {} # Find the relevant PAC checksum buffers. for pac_buffer in pac_buffers: buffer_type = pac_buffer.type if buffer_type in self.pac_checksum_types: self.assertNotIn(buffer_type, checksum_buffers, f'Duplicate checksum type {buffer_type}') checksum_buffers[buffer_type] = pac_buffer # Create any additional buffers that were requested but not # present. Conversely, remove any buffers that were requested to be # removed. for buffer_type in self.pac_checksum_types: if buffer_type in checksum_buffers: if include_checksums.get(buffer_type) is False: checksum_buffer = checksum_buffers.pop(buffer_type) pac.num_buffers -= 1 pac_buffers.remove(checksum_buffer) elif include_checksums.get(buffer_type) is True: info = krb5pac.PAC_SIGNATURE_DATA() checksum_buffer = krb5pac.PAC_BUFFER() checksum_buffer.type = buffer_type checksum_buffer.info = info pac_buffers.append(checksum_buffer) pac.num_buffers += 1 checksum_buffers[buffer_type] = checksum_buffer # Fill the relevant checksum buffers. for buffer_type, checksum_buffer in checksum_buffers.items(): checksum_key = checksum_keys[buffer_type] ctype = checksum_key.ctype & ((1 << 32) - 1) if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM: self.assertIsNotNone(enc_part) signature = checksum_key.make_rodc_checksum( KU_NON_KERB_CKSUM_SALT, enc_part) elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM: signature = checksum_key.make_zeroed_checksum() else: signature = checksum_key.make_rodc_zeroed_checksum() checksum_buffer.info.signature = signature checksum_buffer.info.type = ctype # Add the new checksum buffers to the PAC. pac.buffers = pac_buffers # Calculate the full checksum and insert it into the PAC. full_checksum_buffer = checksum_buffers.get( krb5pac.PAC_TYPE_FULL_CHECKSUM) if full_checksum_buffer is not None: full_checksum_key = checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] pac_data = ndr_pack(pac) full_checksum = full_checksum_key.make_rodc_checksum( KU_NON_KERB_CKSUM_SALT, pac_data) full_checksum_buffer.info.signature = full_checksum # Calculate the server and KDC checksums and insert them into the PAC. server_checksum_buffer = checksum_buffers.get( krb5pac.PAC_TYPE_SRV_CHECKSUM) if server_checksum_buffer is not None: server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] pac_data = ndr_pack(pac) server_checksum = server_checksum_key.make_checksum( KU_NON_KERB_CKSUM_SALT, pac_data) server_checksum_buffer.info.signature = server_checksum kdc_checksum_buffer = checksum_buffers.get( krb5pac.PAC_TYPE_KDC_CHECKSUM) if kdc_checksum_buffer is not None: if server_checksum_buffer is None: # There's no server signature to make the checksum over, so # just make the checksum over an empty bytes object. server_checksum = bytes() kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM] kdc_checksum = kdc_checksum_key.make_rodc_checksum( KU_NON_KERB_CKSUM_SALT, server_checksum) kdc_checksum_buffer.info.signature = kdc_checksum def replace_pac(self, auth_data, new_pac, expect_pac=True, allow_empty_authdata=False): if new_pac is not None: self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC) self.assertElementPresent(new_pac, 'ad-data') new_auth_data = [] ad_relevant = None old_pac = None for authdata_elem in auth_data: if authdata_elem['ad-type'] == AD_IF_RELEVANT: ad_relevant = self.der_decode( authdata_elem['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT()) relevant_elems = [] for relevant_elem in ad_relevant: if relevant_elem['ad-type'] == AD_WIN2K_PAC: self.assertIsNone(old_pac, 'Multiple PACs detected') old_pac = relevant_elem['ad-data'] if new_pac is not None: relevant_elems.append(new_pac) else: relevant_elems.append(relevant_elem) if expect_pac: self.assertIsNotNone(old_pac, 'Expected PAC') if relevant_elems or allow_empty_authdata: ad_relevant = self.der_encode( relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT()) authdata_elem = self.AuthorizationData_create( AD_IF_RELEVANT, ad_relevant) else: authdata_elem = None if authdata_elem is not None or allow_empty_authdata: new_auth_data.append(authdata_elem) if expect_pac: self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT') return new_auth_data, old_pac def get_pac(self, auth_data, expect_pac=True): _, pac = self.replace_pac(auth_data, None, expect_pac) return pac def get_ticket_pac(self, ticket, expect_pac=True): auth_data = ticket.ticket_private.get('authorization-data') if expect_pac: self.assertIsNotNone(auth_data) elif auth_data is None: return None return self.get_pac(auth_data, expect_pac=expect_pac) def get_krbtgt_checksum_key(self): krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) return { krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key } def is_tgs_principal(self, principal): if self.is_tgs(principal): return True if self.kadmin_is_tgs and self.is_kadmin(principal): return True return False def is_kadmin(self, principal): name = principal['name-string'][0] return name in ('kadmin', b'kadmin') def is_tgs(self, principal): name_string = principal['name-string'] if 1 <= len(name_string) <= 2: return name_string[0] in ('krbtgt', b'krbtgt') return False def is_tgt(self, ticket): sname = ticket.ticket['sname'] return self.is_tgs(sname) def get_empty_pac(self): return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1)) def get_outer_pa_dict(self, kdc_exchange_dict): return self.get_pa_dict(kdc_exchange_dict['req_padata']) def get_fast_pa_dict(self, kdc_exchange_dict): req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata']) if req_pa_dict: return req_pa_dict return self.get_outer_pa_dict(kdc_exchange_dict) def sent_fast(self, kdc_exchange_dict): outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict) return PADATA_FX_FAST in outer_pa_dict def sent_enc_challenge(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict def sent_enc_pa_rep(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_REQ_ENC_PA_REP in fast_pa_dict def sent_pk_as_req(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_PK_AS_REQ in fast_pa_dict def sent_pk_as_req_win2k(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_PK_AS_REP_19 in fast_pa_dict def sent_freshness(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_AS_FRESHNESS in fast_pa_dict def get_sent_pac_options(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) if PADATA_PAC_OPTIONS not in fast_pa_dict: return '' pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS], asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) pac_options = pac_options['options'] # Mask out unsupported bits. pac_options, remaining = pac_options[:4], pac_options[4:] pac_options += '0' * len(remaining) return pac_options def get_krbtgt_sname(self): krbtgt_creds = self.get_krbtgt_creds() krbtgt_username = krbtgt_creds.get_username() krbtgt_realm = krbtgt_creds.get_realm() krbtgt_sname = self.PrincipalName_create( name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm]) return krbtgt_sname def add_requester_sid(self, pac, sid): pac_buffers = pac.buffers buffer_types = [pac_buffer.type for pac_buffer in pac_buffers] self.assertNotIn(krb5pac.PAC_TYPE_REQUESTER_SID, buffer_types) requester_sid = krb5pac.PAC_REQUESTER_SID() requester_sid.sid = security.dom_sid(sid) requester_sid_buffer = krb5pac.PAC_BUFFER() requester_sid_buffer.type = krb5pac.PAC_TYPE_REQUESTER_SID requester_sid_buffer.info = requester_sid pac_buffers.append(requester_sid_buffer) pac.buffers = pac_buffers pac.num_buffers += 1 return pac def modify_lifetime(self, ticket, lifetime, requester_sid=None): # Get the krbtgt key. krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) checksum_keys = { krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key, } current_time = time.time() # Set authtime and starttime to an hour in the past, to show that they # do not affect ticket rejection. start_time = self.get_KerberosTime(epoch=current_time, offset=-60 * 60) # Set the endtime of the ticket relative to our current time, so that # the ticket has 'lifetime' seconds remaining to live. end_time = self.get_KerberosTime(epoch=current_time, offset=lifetime) # Modify the times in the ticket. def modify_ticket_times(enc_part): enc_part['authtime'] = start_time if 'starttime' in enc_part: enc_part['starttime'] = start_time enc_part['endtime'] = end_time return enc_part # We have to set the times in both the ticket and the PAC, otherwise # Heimdal will complain. def modify_pac_time(pac): pac_buffers = pac.buffers for pac_buffer in pac_buffers: if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME: logon_time = self.get_EpochFromKerberosTime(start_time) pac_buffer.info.logon_time = unix2nttime(logon_time) break else: self.fail('failed to find LOGON_NAME PAC buffer') pac.buffers = pac_buffers return pac def modify_pac_fn(pac): if requester_sid is not None: # Add a requester SID to show that the KDC will then accept # this kpasswd ticket as if it were a TGT. pac = self.add_requester_sid(pac, sid=requester_sid) pac = modify_pac_time(pac) return pac # Do the actual modification. return self.modified_ticket(ticket, new_ticket_key=krbtgt_key, modify_fn=modify_ticket_times, modify_pac_fn=modify_pac_fn, checksum_keys=checksum_keys) def _test_as_exchange(self, cname, realm, sname, till, expected_error_mode, expected_crealm, expected_cname, expected_srealm, expected_sname, expected_salt, etypes, padata, kdc_options, creds=None, renew_time=None, expected_account_name=None, expected_groups=None, unexpected_groups=None, expected_upn_name=None, expected_sid=None, expected_domain_sid=None, expected_flags=None, unexpected_flags=None, expected_supported_etypes=None, preauth_key=None, ticket_decryption_key=None, pac_request=None, pac_options=None, expect_pac=True, expect_pac_attrs=None, expect_pac_attrs_pac_request=None, expect_requester_sid=None, expect_client_claims=None, expect_device_claims=None, expected_client_claims=None, unexpected_client_claims=None, expected_device_claims=None, unexpected_device_claims=None, expect_edata=None, expect_status=None, expected_status=None, rc4_support=True, to_rodc=False): def _generate_padata_copy(_kdc_exchange_dict, _callback_dict, req_body): return padata, req_body if not expected_error_mode: check_error_fn = None check_rep_fn = self.generic_check_kdc_rep else: check_error_fn = self.generic_check_kdc_error check_rep_fn = None if padata is not None: generate_padata_fn = _generate_padata_copy else: generate_padata_fn = None kdc_exchange_dict = self.as_exchange_dict( creds=creds, expected_crealm=expected_crealm, expected_cname=expected_cname, expected_srealm=expected_srealm, expected_sname=expected_sname, expected_account_name=expected_account_name, expected_groups=expected_groups, unexpected_groups=unexpected_groups, expected_upn_name=expected_upn_name, expected_sid=expected_sid, expected_domain_sid=expected_domain_sid, expected_supported_etypes=expected_supported_etypes, ticket_decryption_key=ticket_decryption_key, generate_padata_fn=generate_padata_fn, check_error_fn=check_error_fn, check_rep_fn=check_rep_fn, check_kdc_private_fn=self.generic_check_kdc_private, expected_error_mode=expected_error_mode, expected_salt=expected_salt, expected_flags=expected_flags, unexpected_flags=unexpected_flags, preauth_key=preauth_key, kdc_options=str(kdc_options), pac_request=pac_request, pac_options=pac_options, expect_pac=expect_pac, expect_pac_attrs=expect_pac_attrs, expect_pac_attrs_pac_request=expect_pac_attrs_pac_request, expect_requester_sid=expect_requester_sid, expect_client_claims=expect_client_claims, expect_device_claims=expect_device_claims, expected_client_claims=expected_client_claims, unexpected_client_claims=unexpected_client_claims, expected_device_claims=expected_device_claims, unexpected_device_claims=unexpected_device_claims, expect_edata=expect_edata, expect_status=expect_status, expected_status=expected_status, rc4_support=rc4_support, to_rodc=to_rodc) rep = self._generic_kdc_exchange(kdc_exchange_dict, cname=cname, realm=realm, sname=sname, till_time=till, renew_time=renew_time, etypes=etypes) return rep, kdc_exchange_dict